使用 Delta 表格處理串流資料

已完成

我們目前探索的所有資料都是檔案中的靜態資料。 但許多資料分析案例涉及的串流資料必須近即時處理。 例如,您可能需要擷取物聯網 (IoT) 裝置發出的讀取,同時儲存在資料表中儲存這些讀取。 Spark 處理批次資料和串流資料的方式相同,使得串流資料可以使用相同的 API 即時處理。

Spark 結構化串流

典型的串流處理解決方案涉及持續從來源讀取資料流、選擇性處理資料流並選取特定欄位、彙總並為值分組,或操作資料並將結果寫入接收

Spark 包含原生支援 Spark 結構化串流的串流資料,即以無限資料框架為基礎的 API,在處理期間此 API 會擷取串流資料。 Spark 結構化串流資料框架可以從許多不同類型的串流來源讀取資料,包括:

  • 網路連接埠
  • 實時訊息代理服務,例如 Azure 事件中樞或 Kafka
  • 檔案系統位置。

提示

如需 Spark 結構化串流的詳細資訊,請參閱 Spark 文件中的「結構化串流程式設計指南」。

使用 Delta 資料表進行串流

您可以將 Delta 資料表用作 Spark 結構化串流的來源接收器。 例如,您可以擷取來自 IoT 裝置的即時資料串流,並將該串流直接寫入 Delta 資料表作爲接收器。 然後,您便可以查詢資料表以查看最新的串流資料。 或者,您可以將 Delta 讀取為串流來源,這可讓您在資料表新增新資料時,近乎實時地進行報告。

將 Delta 資料表用作串流來源

在下列的 PySpark 範例中,Delta 資料表會用來儲存網際網路銷售訂單的詳細資料:

%%sql
CREATE TABLE orders_in
(
        OrderID INT,
        OrderDate DATE,
        Customer STRING,
        Product STRING,
        Quantity INT,
        Price DECIMAL
)
USING DELTA;

網際網路訂單的假設資料流會插入至 orders_in 資料表:

%%sql
INSERT INTO orders_in (OrderID, OrderDate, Customer, Product, Quantity, Price)
VALUES
    (3001, '2024-09-01', 'Yang', 'Road Bike Red', 1, 1200),
    (3002, '2024-09-01', 'Carlson', 'Mountain Bike Silver', 1, 1500),
    (3003, '2024-09-02', 'Wilson', 'Road Bike Yellow', 2, 1350),
    (3004, '2024-09-02', 'Yang', 'Road Front Wheel', 1, 115),
    (3005, '2024-09-02', 'Rai', 'Mountain Bike Black', 1, NULL);

若要驗證,您可以從輸入資料表讀取和顯示資料:

# Read and display the input table
df = spark.read.format("delta").table("orders_in")

display(df)

然後,會從 Delta 資料表將資料載入串流 DataFrame:

# Load a streaming DataFrame from the Delta table
stream_df = spark.readStream.format("delta") \
    .option("ignoreChanges", "true") \
    .table("orders_in")

注意

將 Delta 資料表用作串流來源時,串流中只能包含附加作業。 除非您指定 ignoreChangesignoreDeletes 選項,否則資料修改會發生錯誤。

您可以使用應該傳回 True 的 isStreaming 屬性來檢查資料流是否正在串流:

# Verify that the stream is streaming
stream_df.isStreaming

轉換資料流

從 Delta 資料表將資料讀取到串流 DataFrame 之後,您可以使用 Spark 結構化串流 API 來處理資料。 例如,您可以計算每分鐘下的訂單數量,並將彙總結果傳送至下游流程以實現近乎實時的視覺效果。

在此範例中,會篩選 Price 資料行中有 NULL 的任何資料列,並針對 IsBike 和 Total 加入新資料行。

from pyspark.sql.functions import col, expr

transformed_df = stream_df.filter(col("Price").isNotNull()) \
    .withColumn('IsBike', expr("INSTR(Product, 'Bike') > 0").cast('int')) \
    .withColumn('Total', expr("Quantity * Price").cast('decimal'))

將 Delta 資料表用作串流接收器

資料流接著會寫入 Delta 資料表:

# Write the stream to a delta table
output_table_path = 'Tables/orders_processed'
checkpointpath = 'Files/delta/checkpoint'
deltastream = transformed_df.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(output_table_path)

print("Streaming to orders_processed...")

注意

checkpointLocation 選項會用來寫入追蹤資料處理狀態的檢查點檔案。 此檔案讓您可以從串流處理中止時復原失敗。

串流流程啟動之後,您可以查詢 Delta Lake 資料表,以查看輸出資料表中的內容。 可能會有短暫的延遲您才能查詢資料表。

%%sql
SELECT *
    FROM orders_processed
    ORDER BY OrderID;

在此查詢的結果中,會排除訂單 3005,因為它在 Price 資料行中有 NULL。 並且會顯示轉換期間新增的兩個資料行 - IsBike 和 Total。

OrderID OrderDate 客戶 Products 數量 價格 IsBike 總數
3001 2023-09-01 Yang 道路自行車紅色 1 1200 1 1200
3002 2023-09-01 Carlson 登山自行車銀色 1 1500 1 1500
3003 2023-09-02 Wilson 道路自行車黃色 2 1350 1 2700
3004 2023-09-02 Yang 道路前輪 1 115 0 115

完成後,使用 stop 方法停止串流資料,以避免不必要的處理成本:

# Stop the streaming data to avoid excessive processing costs
delta_stream.stop()

提示

如需使用 Delta 資料表來串流資料的詳細資訊,請參閱 Delta Lake 文件中的「資料表串流讀取和寫入」