将 Delta 表与流式处理数据配合使用

已完成

我们浏览到此点的所有数据都是文件中的静态数据。 但是,许多数据分析方案涉及必须准实时处理的流式处理数据。 例如,可能需要捕获物联网 (IoT) 设备发出的读数,并在它们发生时将其存储在表中。 Spark 以相同的方式处理批处理数据和流式处理数据,使能够使用相同的 API 实时处理流数据。

Spark 结构化流

典型的流处理解决方案涉及到:

  • 不断地从源读取数据流
  • (可选)处理数据以选择特定字段、将值聚合与分组,或以其他方式处理数据。
  • 将结果写入接收器

Spark 包括通过 Spark 结构化流式处理对流式处理数据的本机支持,这是一种基于无限数据帧的 API,在该数据帧中捕获流式处理数据进行处理。 Spark 结构化流式处理数据帧可以从多种不同类型的流式处理源读取数据,包括:

  • 网络端口
  • 实时消息中转服务,例如 Azure 事件中心或 Kafka
  • 文件系统位置。

提示

有关 Spark 结构化流式处理的详细信息,请参阅 Spark 文档中的结构化流式处理编程指南

使用 Delta 表进行流式处理

你可以将 Delta 表用作 Spark 结构化流式处理源或接收器。 例如,可以从 IoT 设备捕获实时数据流,并将流直接写入 Delta 表作为接收器。 然后,可以查询该表以查看最新的流式处理数据。 或者,可以将 Delta 表作为流式处理源读取数据,这样就可以在将新数据添加到该表中时几乎实时报告新数据。

将 Delta 表用作流式处理源

在以下 PySpark 示例中,Delta 表用于存储 Internet 销售订单的详细信息:

%%sql
CREATE TABLE orders_in
(
        OrderID INT,
        OrderDate DATE,
        Customer STRING,
        Product STRING,
        Quantity INT,
        Price DECIMAL
)
USING DELTA;

Internet 订单的假设数据流插入到了 orders_in 表中:

%%sql
INSERT INTO orders_in (OrderID, OrderDate, Customer, Product, Quantity, Price)
VALUES
    (3001, '2024-09-01', 'Yang', 'Road Bike Red', 1, 1200),
    (3002, '2024-09-01', 'Carlson', 'Mountain Bike Silver', 1, 1500),
    (3003, '2024-09-02', 'Wilson', 'Road Bike Yellow', 2, 1350),
    (3004, '2024-09-02', 'Yang', 'Road Front Wheel', 1, 115),
    (3005, '2024-09-02', 'Rai', 'Mountain Bike Black', 1, NULL);

若要验证,可以从输入表读取和显示数据:

# Read and display the input table
df = spark.read.format("delta").table("orders_in")

display(df)

然后,数据会从 Delta 表加载到流式处理数据帧:

# Load a streaming DataFrame from the Delta table
stream_df = spark.readStream.format("delta") \
    .option("ignoreChanges", "true") \
    .table("orders_in")

注意

将 Delta 表用作流式处理源时,只能在流中添加“追加”操作。 除非指定 ignoreChangesignoreDeletes 选项,否则数据修改可能会导致错误。

可以使用应返回 True 的 isStreaming 属性检查流是否正在流式传输:

# Verify that the stream is streaming
stream_df.isStreaming

转换数据流

将数据从 Delta 表读取到流式处理数据帧后,可以使用 Spark 结构化流式处理 API 来处理这些数据。 例如,可以计算每分钟的订单数,并将聚合结果发送到下游进程,以便进行近实时可视化。

在此示例中,筛选了 Price 列中包含 NULL 的所有行,并为 IsBike 和 Total 添加了新列。

from pyspark.sql.functions import col, expr

transformed_df = stream_df.filter(col("Price").isNotNull()) \
    .withColumn('IsBike', expr("INSTR(Product, 'Bike') > 0").cast('int')) \
    .withColumn('Total', expr("Quantity * Price").cast('decimal'))

将 Delta 表用作流式处理接收器

然后将数据流写入 Delta 表:

# Write the stream to a delta table
output_table_path = 'Tables/orders_processed'
checkpointpath = 'Files/delta/checkpoint'
deltastream = transformed_df.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(output_table_path)

print("Streaming to orders_processed...")

注意

checkpointLocation 选项用于写入跟踪流处理状态的检查点文件。 此文件使你能够在流处理中断的地方从故障中恢复。

流式处理启动后,可以查询 Delta Lake 表以查看输出表中的内容。 在查询表之前,可能会有短暂的延迟。

%%sql
SELECT *
    FROM orders_processed
    ORDER BY OrderID;

在此查询的结果中,将排除订单 3005,因为它在 Price 列中为 NULL。 此时会显示转换期间添加的两列 - IsBike 和 Total。

OrderID OrderDate 客户 产品 数量 价格 IsBike 总计
3001 2023-09-01 Yang 公路自行车红色 1 1200 1 1200
3002 2023-09-01 Carlson 山地自行车银色 1 1500 1 1500
3003 2023-09-02 Wilson 公路自行车黄色 2 1350 1 2700
3004 2023-09-02 Yang 公路自行车前轮 1 115 0 115

完成后,停止流式处理数据,以避免使用 stop 方法产生不必要的处理成本:

# Stop the streaming data to avoid excessive processing costs
deltastream.stop()

提示

有关使用 Delta 表进行流式处理数据的详细信息,请参阅 Delta Lake 文档中的表流式处理读取和写入