Compartir a través de


Obtención de datos de streaming en el almacén de lago con streaming estructurado de Spark

Structured Streaming es un motor de procesamiento de flujos escalable y tolerante a errores basado en Spark. Spark se encarga de ejecutar la operación de streaming de forma incremental y continua a medida que los datos continúan llegando.

El streaming estructurado está disponible en Spark 2.2. Desde entonces, ha sido el enfoque recomendado para el streaming de datos. El principio fundamental detrás de la secuencia estructurada es tratar un flujo de datos en directo como una tabla donde los nuevos datos siempre se anexan continuamente, como una nueva fila en una tabla. Hay algunos orígenes de archivos de streaming integrados definidos, como CSV, JSON, ORC, Parquet y compatibilidad integrada con servicios de mensajería como Kafka y Event Hubs.

En este artículo, se proporciona información sobre cómo optimizar el procesamiento y la ingesta de eventos a través del streaming de estructuras de Spark en entornos de producción con un alto rendimiento. Los enfoques sugeridos incluyen:

  • Optimización del rendimiento de streaming de datos
  • Optimización de las operaciones de escritura en la tabla delta y
  • Procesamiento por lotes de eventos

Definiciones de trabajos de Spark y cuadernos de Spark

Los cuadernos de Spark son una excelente herramienta para validar ideas y realizar experimentos para obtener información de los datos o el código. Los cuadernos se utilizan ampliamente en la preparación de datos, la visualización, el aprendizaje automático y otros escenarios de big data. Las definiciones de trabajos de Spark son tareas no interactivas orientadas a código que se ejecutan en un clúster de Spark durante largos períodos. Las definiciones de trabajos de Spark proporcionan solidez y disponibilidad.

Los cuadernos de Spark son un origen excelente para probar la lógica del código y abordar todos los requisitos empresariales. Sin embargo, para mantenerla en ejecución en un escenario de producción, las configuraciones de trabajo de Spark con la política de reintento habilitada son la mejor solución.

Directiva de reintentos para definiciones de trabajos de Spark

En Microsoft Fabric, el usuario puede establecer una directiva de reintento para trabajos de definición de trabajos de Spark. Aunque el script del trabajo puede ser infinito, la infraestructura que ejecuta el script podría incurrir en un problema que requiera detener el trabajo. O el trabajo podría eliminarse debido a las necesidades fundamentales de mantenimiento de la infraestructura. La directiva de reintento permite al usuario establecer reglas para reiniciar automáticamente el trabajo si se detiene debido a cualquier problema subyacente. Los parámetros especifican con qué frecuencia se debe reiniciar el trabajo: hasta un número infinito de reintentos y el tiempo que debe transcurrir entre estos reintentos. De este modo, los usuarios pueden asegurarse de que sus trabajos de definición de trabajo de Spark sigan ejecutándose infinitamente hasta que el usuario decida detenerlos.

Orígenes de streaming

La configuración de streaming con Event Hubs requiere una configuración básica, que incluye el nombre del espacio de nombres de Event Hubs, el nombre del centro, el nombre de la clave de acceso compartido y el grupo de consumidores. Un grupo de consumidores es una vista de un centro de eventos completo. Permite que varias aplicaciones de consumo tengan una vista independiente de la secuencia de eventos y lean la secuencia de forma independiente a su propio ritmo y con sus offsets.

Las particiones son una parte esencial de poder controlar un gran volumen de datos. Un único procesador tiene una capacidad limitada para controlar eventos por segundo, mientras que varios procesadores pueden realizar un mejor trabajo cuando se ejecutan en paralelo. Las particiones permiten procesar grandes volúmenes de eventos en paralelo.

Si se usan demasiadas particiones con una tasa de ingesta baja, los lectores de particiones tratan con una pequeña parte de estos datos, lo que provoca un procesamiento no óptimo. El número ideal de particiones depende directamente de la velocidad de procesamiento deseada. Si desea ampliar su procesamiento de eventos, considere la posibilidad de añadir más particiones. No hay ningún límite de rendimiento específico en una partición. Sin embargo, el rendimiento agregado en su espacio de nombres está limitado por el número de unidades de rendimiento. A medida que aumente el número de unidades de rendimiento en su espacio de nombres, es posible que desee particiones adicionales para permitir que los lectores concurrentes alcancen su máximo rendimiento.

La recomendación es investigar y probar el mejor número de particiones para el escenario de rendimiento. Pero es habitual ver escenarios con un alto rendimiento mediante 32 o más particiones.

El conector Azure Event Hubs para Apache Spark (azure-event-hubs-spark) se recomienda para conectar la aplicación Spark a Azure Event Hubs.

Almacén de lago de datos como receptor de streaming

Delta Lake es una capa de almacenamiento de código abierto que proporciona transacciones ACID (atomicidad, coherencia, aislamiento y durabilidad) sobre las soluciones de almacenamiento de lago de datos. Delta Lake también admite el control de metadatos escalables, la evolución del esquema, el viaje en tiempo (control de versiones de datos), el formato abierto y otras características.

En Ingeniería de datos de Fabric, Delta Lake se usa para:

  • Actualizar/insertar (upsert) fácilmente y eliminar datos mediante Spark SQL.
  • Compacte los datos para minimizar el tiempo dedicado a consultar los datos.
  • Vea el estado de las tablas antes y después de ejecutar las operaciones.
  • Recupere un historial de las operaciones realizadas en las tablas.

Delta se añade como uno de los posibles formatos de receptores de salidas usados en writeStream. Para obtener más información sobre los receptores de salida existentes, consulte Guía de programación de streaming estructurado de Spark.

En el ejemplo siguiente se muestra cómo es posible transmitir datos a Delta Lake.

import pyspark.sql.functions as f 
from pyspark.sql.types import * 

df = spark \ 
  .readStream \ 
  .format("eventhubs") \ 
  .options(**ehConf) \ 
  .load()  

Schema = StructType([StructField("<column_name_01>", StringType(), False), 
                     StructField("<column_name_02>", StringType(), False), 
                     StructField("<column_name_03>", DoubleType(), True), 
                     StructField("<column_name_04>", LongType(), True), 
                     StructField("<column_name_05>", LongType(), True)]) 

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .toTable("deltaeventstable") 

Acerca del código recortado en el ejemplo:

  • format() es la instrucción que define el formato de salida de los datos.
  • outputMode() define en qué forma se escriben las nuevas filas en la transmisión (es decir, añadir, sobrescribir).
  • toTable() persiste los datos transmitidos en una tabla Delta creada utilizando el valor pasado como parámetro.

Optimización de las escrituras de Delta

La partición de datos es un elemento fundamental para crear una solución de streaming robusta: la partición mejora la organización de los datos y el rendimiento. Los archivos se fragmentan fácilmente tras las operaciones Delta, lo que da lugar a demasiados archivos pequeños. Y los archivos demasiado grandes también son un problema, debido al largo tiempo para escribirlos en el disco. El desafío de la creación de particiones de datos es encontrar el equilibrio adecuado que da lugar a tamaños de archivo óptimos. Spark admite la creación de particiones en memoria y en el disco. Los datos con particiones correctas pueden proporcionar el mejor rendimiento al conservar los datos en Delta Lake y consultar datos desde Delta Lake.

  • Al particionar datos en disco, puede elegir cómo particionar los datos basándose en columnas utilizando partitionBy(). partitionBy() es una función que se usa para crear particiones de un modelo semántico grande en archivos más pequeños en función de una o varias columnas que se proporcionan al escribir en el disco. La creación de particiones es una manera de mejorar el rendimiento de la consulta al trabajar con un modelo semántico grande. Evite elegir una columna que genere particiones demasiado pequeñas o demasiado grandes. Defina una partición basada en un conjunto de columnas con una buena cardinalidad y divida los datos en archivos de tamaño óptimo.
  • La partición de los datos en memoria puede realizarse utilizando las transformaciones repartition() o coalesce(), distribuyendo los datos en múltiples nodos trabajadores y creando múltiples tareas que puedan leer y procesar datos en paralelo utilizando los fundamentos de Resilient Distributed Dataset (RDD). Permite dividir el modelo semántico en particiones lógicas, que se pueden calcular en distintos nodos del clúster.
    • repartition() se utiliza para aumentar o disminuir el número de particiones en memoria. Repartition reorganiza todos los datos de la red y los reparte entre todas las particiones.
    • coalesce() se utiliza solo para disminuir eficientemente el número de particiones. Es una versión optimizada de repartition() donde el movimiento de datos a través de todas las particiones es menor usando coalesce().

Combinar ambos enfoques de creación de particiones es una buena solución en escenarios con un alto rendimiento. repartition() crea un número específico de particiones en memoria, mientras que partitionBy() escribe archivos en disco para cada partición de memoria y columna de partición. En el ejemplo siguiente se muestra el uso de ambas estrategias de creación de particiones en el mismo trabajo de Spark: los datos se dividen primero en 48 particiones en memoria (suponiendo que tenemos un total de 48 núcleos de CPU) y, a continuación, se particionan en disco en dos columnas existentes de la carga.

import pyspark.sql.functions as f 
from pyspark.sql.types import * 
import json 

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .repartition(48) \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .toTable("deltaeventstable") 

Escritura optimizada

Otra opción para optimizar las escrituras en Delta Lake es usar la escritura optimizada. Escritura optimizada es una característica opcional que mejora la forma en que los datos se escriben en la tabla Delta. Spark combina o divide las particiones antes de escribir los datos, lo que maximiza el rendimiento de los datos que se escriben en el disco. Sin embargo, incurre en orden aleatorio completo, por lo que para algunas cargas de trabajo puede provocar una degradación del rendimiento. Los trabajos que utilizan coalesce() y/o repartition() para particionar datos en disco pueden ser refactorizados para comenzar a utilizar Optimized Write en su lugar.

El código siguiente es un ejemplo del uso de Escritura optimizada. Tenga en cuenta que se sigue usando partitionBy().

spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", true) 
 
rawData = df \ 
 .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .toTable("deltaeventstable") 

Eventos de procesamiento por lotes

Para minimizar el número de operaciones con el fin de mejorar el tiempo dedicado a la ingesta de datos en Delta Lake, los eventos de procesamiento por lotes son una alternativa práctica.

Los desencadenadores definen la frecuencia con la que se debe ejecutar una consulta de streaming (desencadenada) y emitir nuevos datos. Al configurarse, se define un intervalo de tiempo de procesamiento periódico para los microlotes, la acumulación de datos y los eventos de procesamiento por lotes en pocas operaciones persistentes, en lugar de escribir en disco todo el tiempo.

En el ejemplo siguiente se muestra una consulta de streaming en la que los eventos se procesan periódicamente en intervalos de un minuto.

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .repartition(48) \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .trigger(processingTime="1 minute") \ 
  .toTable("deltaeventstable") 

La ventaja de combinar el procesamiento por lotes de eventos en las operaciones de escritura de tablas delta es que crea archivos Delta más grandes con más datos en ellos, evitando archivos pequeños. Debe analizar la cantidad de datos que se ingieren y encontrar el mejor tiempo de procesamiento para optimizar el tamaño de los archivos Parquet creados por la biblioteca Delta.

Supervisión

Spark 3.1 y versiones superiores tienen incorporada una interfaz de usuario de streaming estructuradoque contiene las siguientes métricas de streaming:

  • Velocidad de entrada
  • Velocidad de proceso
  • Filas de entrada
  • Duración del lote
  • Duración de la operación