将 Delta 表与流式处理数据配合使用
我们浏览到此点的所有数据都是文件中的静态数据。 但是,许多数据分析方案涉及必须准实时处理的流式处理数据。 例如,可能需要捕获物联网 (IoT) 设备发出的读数,并在它们发生时将其存储在表中。 Spark 以相同的方式处理批处理数据和流式处理数据,使能够使用相同的 API 实时处理流数据。
Spark 结构化流
典型的流处理解决方案涉及到:
- 不断地从源读取数据流。
- (可选)处理数据以选择特定字段、将值聚合与分组,或以其他方式处理数据。
- 将结果写入接收器。
Spark 包括通过 Spark 结构化流式处理对流式处理数据的本机支持,这是一种基于无限数据帧的 API,在该数据帧中捕获流式处理数据进行处理。 Spark 结构化流式处理数据帧可以从多种不同类型的流式处理源读取数据,包括:
- 网络端口
- 实时消息中转服务,例如 Azure 事件中心或 Kafka
- 文件系统位置。
提示
有关 Spark 结构化流式处理的详细信息,请参阅 Spark 文档中的结构化流式处理编程指南。
使用 Delta 表进行流式处理
你可以将 Delta 表用作 Spark 结构化流式处理源或接收器。 例如,可以从 IoT 设备捕获实时数据流,并将流直接写入 Delta 表作为接收器。 然后,可以查询该表以查看最新的流式处理数据。 或者,可以将 Delta 表作为流式处理源读取数据,这样就可以在将新数据添加到该表中时几乎实时报告新数据。
将 Delta 表用作流式处理源
在以下 PySpark 示例中,Delta 表用于存储 Internet 销售订单的详细信息:
%%sql
CREATE TABLE orders_in
(
OrderID INT,
OrderDate DATE,
Customer STRING,
Product STRING,
Quantity INT,
Price DECIMAL
)
USING DELTA;
Internet 订单的假设数据流插入到了 orders_in 表中:
%%sql
INSERT INTO orders_in (OrderID, OrderDate, Customer, Product, Quantity, Price)
VALUES
(3001, '2024-09-01', 'Yang', 'Road Bike Red', 1, 1200),
(3002, '2024-09-01', 'Carlson', 'Mountain Bike Silver', 1, 1500),
(3003, '2024-09-02', 'Wilson', 'Road Bike Yellow', 2, 1350),
(3004, '2024-09-02', 'Yang', 'Road Front Wheel', 1, 115),
(3005, '2024-09-02', 'Rai', 'Mountain Bike Black', 1, NULL);
若要验证,可以从输入表读取和显示数据:
# Read and display the input table
df = spark.read.format("delta").table("orders_in")
display(df)
然后,数据会从 Delta 表加载到流式处理数据帧:
# Load a streaming DataFrame from the Delta table
stream_df = spark.readStream.format("delta") \
.option("ignoreChanges", "true") \
.table("orders_in")
注意
将 Delta 表用作流式处理源时,只能在流中添加“追加”操作。 除非指定 ignoreChanges
或 ignoreDeletes
选项,否则数据修改可能会导致错误。
可以使用应返回 True 的 isStreaming
属性检查流是否正在流式传输:
# Verify that the stream is streaming
stream_df.isStreaming
转换数据流
将数据从 Delta 表读取到流式处理数据帧后,可以使用 Spark 结构化流式处理 API 来处理这些数据。 例如,可以计算每分钟的订单数,并将聚合结果发送到下游进程,以便进行近实时可视化。
在此示例中,筛选了 Price 列中包含 NULL 的所有行,并为 IsBike 和 Total 添加了新列。
from pyspark.sql.functions import col, expr
transformed_df = stream_df.filter(col("Price").isNotNull()) \
.withColumn('IsBike', expr("INSTR(Product, 'Bike') > 0").cast('int')) \
.withColumn('Total', expr("Quantity * Price").cast('decimal'))
将 Delta 表用作流式处理接收器
然后将数据流写入 Delta 表:
# Write the stream to a delta table
output_table_path = 'Tables/orders_processed'
checkpointpath = 'Files/delta/checkpoint'
deltastream = transformed_df.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(output_table_path)
print("Streaming to orders_processed...")
注意
checkpointLocation
选项用于写入跟踪流处理状态的检查点文件。 此文件使你能够在流处理中断的地方从故障中恢复。
流式处理启动后,可以查询 Delta Lake 表以查看输出表中的内容。 在查询表之前,可能会有短暂的延迟。
%%sql
SELECT *
FROM orders_processed
ORDER BY OrderID;
在此查询的结果中,将排除订单 3005,因为它在 Price 列中为 NULL。 此时会显示转换期间添加的两列 - IsBike 和 Total。
OrderID | OrderDate | 客户 | 产品 | 数量 | 价格 | IsBike | 总计 |
---|---|---|---|---|---|---|---|
3001 | 2023-09-01 | Yang | 公路自行车红色 | 1 | 1200 | 1 | 1200 |
3002 | 2023-09-01 | Carlson | 山地自行车银色 | 1 | 1500 | 1 | 1500 |
3003 | 2023-09-02 | Wilson | 公路自行车黄色 | 2 | 1350 | 1 | 2700 |
3004 | 2023-09-02 | Yang | 公路自行车前轮 | 1 | 115 | 0 | 115 |
完成后,停止流式处理数据,以避免使用 stop
方法产生不必要的处理成本:
# Stop the streaming data to avoid excessive processing costs
deltastream.stop()
提示
有关使用 Delta 表进行流式处理数据的详细信息,请参阅 Delta Lake 文档中的表流式处理读取和写入。