使用 Delta 表格處理串流資料
我們目前探索的所有資料都是檔案中的靜態資料。 但許多資料分析案例涉及的串流資料必須近即時處理。 例如,您可能需要擷取物聯網 (IoT) 裝置發出的讀取,同時儲存在資料表中儲存這些讀取。 Spark 處理批次資料和串流資料的方式相同,使得串流資料可以使用相同的 API 即時處理。
Spark 結構化串流
典型的串流處理解決方案涉及持續從來源讀取資料流、選擇性處理資料流並選取特定欄位、彙總並為值分組,或操作資料並將結果寫入接收。
Spark 包含原生支援 Spark 結構化串流的串流資料,即以無限資料框架為基礎的 API,在處理期間此 API 會擷取串流資料。 Spark 結構化串流資料框架可以從許多不同類型的串流來源讀取資料,包括:
- 網路連接埠
- 實時訊息代理服務,例如 Azure 事件中樞或 Kafka
- 檔案系統位置。
提示
如需 Spark 結構化串流的詳細資訊,請參閱 Spark 文件中的「結構化串流程式設計指南」。
使用 Delta 資料表進行串流
您可以將 Delta 資料表用作 Spark 結構化串流的來源或接收器。 例如,您可以擷取來自 IoT 裝置的即時資料串流,並將該串流直接寫入 Delta 資料表作爲接收器。 然後,您便可以查詢資料表以查看最新的串流資料。 或者,您可以將 Delta 讀取為串流來源,這可讓您在資料表新增新資料時,近乎實時地進行報告。
將 Delta 資料表用作串流來源
在下列的 PySpark 範例中,Delta 資料表會用來儲存網際網路銷售訂單的詳細資料:
%%sql
CREATE TABLE orders_in
(
OrderID INT,
OrderDate DATE,
Customer STRING,
Product STRING,
Quantity INT,
Price DECIMAL
)
USING DELTA;
網際網路訂單的假設資料流會插入至 orders_in 資料表:
%%sql
INSERT INTO orders_in (OrderID, OrderDate, Customer, Product, Quantity, Price)
VALUES
(3001, '2024-09-01', 'Yang', 'Road Bike Red', 1, 1200),
(3002, '2024-09-01', 'Carlson', 'Mountain Bike Silver', 1, 1500),
(3003, '2024-09-02', 'Wilson', 'Road Bike Yellow', 2, 1350),
(3004, '2024-09-02', 'Yang', 'Road Front Wheel', 1, 115),
(3005, '2024-09-02', 'Rai', 'Mountain Bike Black', 1, NULL);
若要驗證,您可以從輸入資料表讀取和顯示資料:
# Read and display the input table
df = spark.read.format("delta").table("orders_in")
display(df)
然後,會從 Delta 資料表將資料載入串流 DataFrame:
# Load a streaming DataFrame from the Delta table
stream_df = spark.readStream.format("delta") \
.option("ignoreChanges", "true") \
.table("orders_in")
注意
將 Delta 資料表用作串流來源時,串流中只能包含附加作業。 除非您指定 ignoreChanges
或 ignoreDeletes
選項,否則資料修改會發生錯誤。
您可以使用應該傳回 True 的 isStreaming
屬性來檢查資料流是否正在串流:
# Verify that the stream is streaming
stream_df.isStreaming
轉換資料流
從 Delta 資料表將資料讀取到串流 DataFrame 之後,您可以使用 Spark 結構化串流 API 來處理資料。 例如,您可以計算每分鐘下的訂單數量,並將彙總結果傳送至下游流程以實現近乎實時的視覺效果。
在此範例中,會篩選 Price 資料行中有 NULL 的任何資料列,並針對 IsBike 和 Total 加入新資料行。
from pyspark.sql.functions import col, expr
transformed_df = stream_df.filter(col("Price").isNotNull()) \
.withColumn('IsBike', expr("INSTR(Product, 'Bike') > 0").cast('int')) \
.withColumn('Total', expr("Quantity * Price").cast('decimal'))
將 Delta 資料表用作串流接收器
資料流接著會寫入 Delta 資料表:
# Write the stream to a delta table
output_table_path = 'Tables/orders_processed'
checkpointpath = 'Files/delta/checkpoint'
deltastream = transformed_df.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(output_table_path)
print("Streaming to orders_processed...")
注意
checkpointLocation
選項會用來寫入追蹤資料處理狀態的檢查點檔案。 此檔案讓您可以從串流處理中止時復原失敗。
串流流程啟動之後,您可以查詢 Delta Lake 資料表,以查看輸出資料表中的內容。 可能會有短暫的延遲您才能查詢資料表。
%%sql
SELECT *
FROM orders_processed
ORDER BY OrderID;
在此查詢的結果中,會排除訂單 3005,因為它在 Price 資料行中有 NULL。 並且會顯示轉換期間新增的兩個資料行 - IsBike 和 Total。
OrderID | OrderDate | 客戶 | Products | 數量 | 價格 | IsBike | 總數 |
---|---|---|---|---|---|---|---|
3001 | 2023-09-01 | Yang | 道路自行車紅色 | 1 | 1200 | 1 | 1200 |
3002 | 2023-09-01 | Carlson | 登山自行車銀色 | 1 | 1500 | 1 | 1500 |
3003 | 2023-09-02 | Wilson | 道路自行車黃色 | 2 | 1350 | 1 | 2700 |
3004 | 2023-09-02 | Yang | 道路前輪 | 1 | 115 | 0 | 115 |
完成後,使用 stop
方法停止串流資料,以避免不必要的處理成本:
# Stop the streaming data to avoid excessive processing costs
delta_stream.stop()