Uso de Delta Lake con datos de streaming
Todos los datos que hemos explorado hasta este punto han sido datos estáticos en archivos. Sin embargo, muchos escenarios de análisis de datos implican datos de streaming que se deben procesar casi en tiempo real. Por ejemplo, es posible que tenga que capturar lecturas emitidas por dispositivos de Internet de las cosas (IoT) y almacenarlas en una tabla a medida que se produzcan.
Spark Structured Streaming
Una solución típica de procesamiento de flujos implica la lectura constante de un flujo de datos desde un origen, el procesamiento opcional para seleccionar campos específicos, la adición o agrupación de valores (u otra manipulación de los datos) y la escritura de los resultados en un receptor.
Spark incluye compatibilidad nativa con el streaming de datos a través de Spark Structured Streaming, una API basada en un dataframe sin límite en la que se capturan los datos de streaming para su procesamiento. Un dataframe de Spark Structured Streaming puede leer datos de muchos tipos diferentes de origen de streaming, como puertos de red, servicios de agente de mensajes en tiempo real (como Azure Event Hubs o Kafka), o ubicaciones del sistema de archivos.
Sugerencia
Para obtener más información sobre Spark Structured Streaming, consulte la guía de programación de Structured Streaming en la documentación de Spark.
Streaming con tablas de Delta Lake
Puede usar una tabla de Delta Lake como origen o receptor para Spark Structured Streaming. Por ejemplo, podría capturar un flujo de datos en tiempo real de un dispositivo IoT y escribir la secuencia directamente en una tabla de Delta Lake como receptor, lo que le permite consultar la tabla para ver los datos transmitidos más recientes. O bien, podría leer una tabla Delta como origen de streaming, lo que le permite informar constantemente de nuevos datos a medida que se agregan a la tabla.
Uso de una tabla de Delta Lake como origen de streaming
En el siguiente ejemplo de PySpark, se usa una tabla de Delta Lake para almacenar los detalles de los pedidos de ventas por Internet. Se crea un flujo que lee datos de la carpeta de tabla de Delta Lake a medida que se anexan nuevos datos.
from pyspark.sql.types import *
from pyspark.sql.functions import *
# Load a streaming dataframe from the Delta Table
stream_df = spark.readStream.format("delta") \
.option("ignoreChanges", "true") \
.load("/delta/internetorders")
# Now you can process the streaming data in the dataframe
# for example, show it:
stream_df.writeStream \
.outputMode("append") \
.format("console") \
.start()
Nota
Cuando se usa una tabla de Delta Lake como origen de streaming, solo se pueden incluir operaciones de anexión en el flujo. Las modificaciones de datos provocarán un error a menos que especifique la opción ignoreChanges
o ignoreDeletes
.
Después de leer los datos de la tabla de Delta Lake en un dataframe de streaming, puede usar la API de Spark Structured Streaming para procesarlos. En el ejemplo anterior, el dataframe simplemente se muestra, pero podría usar Spark Structured Streaming para agregar los datos a través de ventanas temporales (por ejemplo, para contar el número de pedidos realizados cada minuto) y enviar los resultados agregados a un proceso de bajada para la visualización casi en tiempo real.
Uso de una tabla de Delta Lake como receptor de streaming
En el siguiente ejemplo de PySpark, se lee un flujo de datos de archivos JSON en una carpeta. Los datos JSON de cada archivo contienen el estado de un dispositivo IoT con el formato {"device":"Dev1","status":"ok"}
Se agregan nuevos datos al flujo cada vez que se agrega un archivo a la carpeta. El flujo de entrada es un dataframe sin límites, que luego se escribe en formato delta en una ubicación de carpeta para una tabla de Delta Lake.
from pyspark.sql.types import *
from pyspark.sql.functions import *
# Create a stream that reads JSON data from a folder
inputPath = '/streamingdata/'
jsonSchema = StructType([
StructField("device", StringType(), False),
StructField("status", StringType(), False)
])
stream_df = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)
# Write the stream to a delta table
table_path = '/delta/devicetable'
checkpoint_path = '/delta/checkpoint'
delta_stream = stream_df.writeStream.format("delta").option("checkpointLocation", checkpoint_path).start(table_path)
Nota
La opción checkpointLocation
se usa para escribir un archivo de punto de control que realiza un seguimiento del estado del procesamiento del flujo. Este archivo le permite recuperarse de un error en el punto en el que se dejó el procesamiento de streaming.
Una vez iniciado el proceso de streaming, puede consultar la tabla de Delta Lake en la que se escribe la salida de streaming para ver los datos más recientes. Por ejemplo, el código siguiente crea una tabla de catálogo para la carpeta de tabla de Delta Lake y la consulta:
%%sql
CREATE TABLE DeviceTable
USING DELTA
LOCATION '/delta/devicetable';
SELECT device, status
FROM DeviceTable;
Para detener el flujo de datos que se escriben en la tabla de Delta Lake, puede usar el método stop
de la consulta de streaming:
delta_stream.stop()
Sugerencia
Para más información sobre el uso de tablas de Delta Lake para datos de streaming, consulte Lecturas y escrituras de streaming de tablas en la documentación de Delta Lake.