使用 foreachBatch 寫入任意數據接收器
本文討論如何使用 foreachBatch
結構化串流,將串流查詢的輸出寫入至沒有現有串流接收的數據源。
程序代碼模式 streamingDF.writeStream.foreachBatch(...)
可讓您將批次函式套用至串流查詢每個微批次的輸出數據。 與 foreachBatch
搭配使用的函式需要使用兩個 parameters:
- 具有微批次輸出數據的 DataFrame。
- 微批次的唯一標識碼。
您必須在 foreachBatch
結構化串流中使用 Delta Lake 合併作業。 請參閱 使用 foreachBatch 從串流查詢的 Upsert。
套用其他 DataFrame 作業
串流數據框架不支援許多 DataFrame 和數據集作業,因為 Spark 不支援在這些情況下產生累加計劃。 您可以使用 foreachBatch()
,在每個微批次輸出上套用其中一些作業。 例如,您可以使用 foreachBath()
和 SQL MERGE INTO
作業,將串流匯總的輸出寫入 update 模式中的 Delta table。 如需詳細資訊,請參閱 MERGE INTO。
重要
-
foreachBatch()
只提供至少一次寫入保證。 不過,您可以使用提供給函式的batchId
來進行輸出重複資料刪除,並提供 get 精確一次的保證。 不論是哪一種情況,您都必須自行考慮端對端語意。 -
foreachBatch()
無法使用 連續處理模式 ,因為它基本上依賴串流查詢的微批次執行。 如果您以連續模式寫入資料,請改用foreach()
。
您可以使用 叫用 foreachBatch()
空的數據框架,而且使用者程式代碼必須具有復原性,才能進行適當的作業。 以下顯示一個範例:
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Databricks Runtime 14.0 中的行為變更foreachBatch
在以共用存取模式設定的計算上,Databricks Runtime 14.0 和更新版本會套用下列行為變更:
-
print()
命令會將輸出寫入驅動程式記錄。 - 您無法存取函式內的
dbutils.widgets
子模組。 - 函式中參考的任何檔案、模組或對象都必須可串行化,且可在 Spark 上使用。
重複使用現有的批次數據源
您可以使用 foreachBatch()
,針對可能沒有結構化串流支持的數據接收器,使用現有的批次數據寫入器。 以下是一些範例:
您可以從 使用 foreachBatch()
許多其他批次數據來源。 請參閱 連線至數據源。
寫入多個位置
如果您需要將串流查詢的輸出寫入多個位置,Databricks 建議使用多個結構化串流寫入器來獲得最佳平行處理和輸送量。
使用 foreachBatch
來寫入多個接收會串行化串流寫入的執行,這可能會增加每個微批次的延遲。
如果您使用