Uso de tablas delta con datos de streaming

Completado

Todos los datos que hemos explorado hasta este punto han sido datos estáticos en archivos. Sin embargo, muchos escenarios de análisis de datos implican datos de streaming que se deben procesar casi en tiempo real. Por ejemplo, es posible que tenga que capturar lecturas emitidas por dispositivos de Internet de las cosas (IoT) y almacenarlas en una tabla a medida que se produzcan. Spark procesa datos por lotes y datos de streaming de la misma manera, habilitando el procesamiento de datos de streaming en tiempo real usando la misma API.

Spark Structured Streaming

Una solución típica de procesamiento de flujos implica:

  • Leer constantemente un flujo de datos de un origen.
  • Opcionalmente, procesar los datos para seleccionar campos específicos, agregar y agrupar valores, o manipular los datos.
  • Escribir los resultados en un receptor.

Spark incluye compatibilidad nativa con el streaming de datos a través de Spark Structured Streaming, una API basada en un dataframe sin límite en la que se capturan los datos de streaming para su procesamiento. Un dataframe de Structured Streaming de Spark puede leer datos de muchos tipos diferentes de origen de streaming, incluyendo:

  • Puertos de red
  • Servicios de intermediación de mensajes en tiempo real, como Azure Event Hubs o Kafka
  • Ubicaciones del sistema de archivos.

Sugerencia

Para obtener más información sobre Spark Structured Streaming, consulte la guía de programación de Structured Streaming en la documentación de Spark.

Streaming con tablas delta

Puede usar una tabla delta como origen o receptor para Spark Structured Streaming. Por ejemplo, podría capturar un flujo de datos en tiempo real de un dispositivo IoT y escribir la secuencia directamente en una tabla delta como receptor. A continuación, puede consultar la tabla para ver los datos transmitidos más recientes. O bien, podría leer un delta como origen de streaming, lo que permite la creación de informes casi en tiempo real a medida que se agregan nuevos datos a la tabla.

Uso de una tabla delta como origen de streaming

En el siguiente ejemplo de PySpark, se crea una tabla Delta para almacenar los detalles de los pedidos de venta por Internet:

%%sql
CREATE TABLE orders_in
(
        OrderID INT,
        OrderDate DATE,
        Customer STRING,
        Product STRING,
        Quantity INT,
        Price DECIMAL
)
USING DELTA;

Se inserta un hipotético flujo de datos de pedidos por Internet en la tabla orders_in:

%%sql
INSERT INTO orders_in (OrderID, OrderDate, Customer, Product, Quantity, Price)
VALUES
    (3001, '2024-09-01', 'Yang', 'Road Bike Red', 1, 1200),
    (3002, '2024-09-01', 'Carlson', 'Mountain Bike Silver', 1, 1500),
    (3003, '2024-09-02', 'Wilson', 'Road Bike Yellow', 2, 1350),
    (3004, '2024-09-02', 'Yang', 'Road Front Wheel', 1, 115),
    (3005, '2024-09-02', 'Rai', 'Mountain Bike Black', 1, NULL);

Para verificarlo, puede leer y visualizar los datos de la tabla de entrada:

# Read and display the input table
df = spark.read.format("delta").table("orders_in")

display(df)

Después, los datos se cargan en un dataframe de streaming a partir de la tabla Delta:

# Load a streaming DataFrame from the Delta table
stream_df = spark.readStream.format("delta") \
    .option("ignoreChanges", "true") \
    .table("orders_in")

Nota:

Cuando se usa una tabla delta como origen de streaming, solo se pueden incluir operaciones de anexión en el flujo. Las modificaciones de datos pueden provocar un error a menos que especifique la opción ignoreChanges o ignoreDeletes.

Puede comprobar que se está realizando el streaming de la transmisión usando la propiedad isStreaming, que debería devolver True:

# Verify that the stream is streaming
stream_df.isStreaming

Transformación del flujo de datos

Después de leer los datos de la tabla delta en un dataframe de streaming, puede usar la API de Spark Structured Streaming para procesarlos. Por ejemplo, podría contar el número de pedidos realizados cada minuto y enviar los resultados agregados a un proceso de bajada para la visualización casi en tiempo real.

En este ejemplo, se filtran todas las filas con NULL en la columna Price y se agregan nuevas columnas para IsBike y Total.

from pyspark.sql.functions import col, expr

transformed_df = stream_df.filter(col("Price").isNotNull()) \
    .withColumn('IsBike', expr("INSTR(Product, 'Bike') > 0").cast('int')) \
    .withColumn('Total', expr("Quantity * Price").cast('decimal'))

Uso de una tabla delta como receptor de streaming

El flujo de datos se escribe después en una tabla Delta:

# Write the stream to a delta table
output_table_path = 'Tables/orders_processed'
checkpointpath = 'Files/delta/checkpoint'
deltastream = transformed_df.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(output_table_path)

print("Streaming to orders_processed...")

Nota:

La opción checkpointLocation se usa para escribir un archivo de punto de control que realiza un seguimiento del estado del procesamiento del flujo. Este archivo le permite recuperarse de un error en el punto en el que se dejó el procesamiento de streaming.

Una vez iniciado el proceso de streaming, puede consultar la tabla de Delta Lake para ver qué hay en la tabla de salida. Es posible que se produzca un breve retraso antes de poder consultar la tabla.

%%sql
SELECT *
    FROM orders_processed
    ORDER BY OrderID;

En los resultados de esta consulta, el pedido 3005 queda excluido porque tenía NULL en la columna Price. Y se muestran las dos columnas que se agregaron durante la transformación: IsBike y Total.

OrderID OrderDate Customer Producto Quantity Precio IsBike Total
3001 2023-09-01 Yang Bicicleta de carretera roja 1 1200 1 1200
3002 2023-09-01 Carlson Bicicleta de montaña plateada 1 1.500 1 1.500
3003 2023-09-02 Wilson Bicicleta de carretera amarilla 2 1350 1 2700
3004 2023-09-02 Yang Rueda delantera de carretera 1 115 0 115

Cuando termine, detenga los datos de streaming para evitar costes de procesamiento innecesarios usando el método stop:

# Stop the streaming data to avoid excessive processing costs
deltastream.stop()

Sugerencia

Para más información sobre el uso de tablas delta para datos de streaming, consulte Lecturas y escrituras de streaming de tablas en la documentación de Delta Lake.