共用方式為


使用 foreachBatch 寫入任意數據接收器

本文討論如何使用 foreachBatch 結構化串流,將串流查詢的輸出寫入至沒有現有串流接收的數據源。

程序代碼模式 streamingDF.writeStream.foreachBatch(...) 可讓您將批次函式套用至串流查詢每個微批次的輸出數據。 與 foreachBatch 搭配使用的函式需要使用兩個 parameters:

  • 具有微批次輸出數據的 DataFrame。
  • 微批次的唯一標識碼。

您必須在 foreachBatch 結構化串流中使用 Delta Lake 合併作業。 請參閱 使用 foreachBatch 從串流查詢的 Upsert。

套用其他 DataFrame 作業

串流數據框架不支援許多 DataFrame 和數據集作業,因為 Spark 不支援在這些情況下產生累加計劃。 您可以使用 foreachBatch() ,在每個微批次輸出上套用其中一些作業。 例如,您可以使用 foreachBath() 和 SQL MERGE INTO 作業,將串流匯總的輸出寫入 update 模式中的 Delta table。 如需詳細資訊,請參閱 MERGE INTO

重要

  • foreachBatch() 只提供至少一次寫入保證。 不過,您可以使用提供給函式的 batchId 來進行輸出重複資料刪除,並提供 get 精確一次的保證。 不論是哪一種情況,您都必須自行考慮端對端語意。
  • foreachBatch() 無法使用 連續處理模式 ,因為它基本上依賴串流查詢的微批次執行。 如果您以連續模式寫入資料,請改用 foreach()

您可以使用 叫用 foreachBatch() 空的數據框架,而且使用者程式代碼必須具有復原性,才能進行適當的作業。 以下顯示一個範例:

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Databricks Runtime 14.0 中的行為變更foreachBatch

在以共用存取模式設定的計算上,Databricks Runtime 14.0 和更新版本會套用下列行為變更:

  • print() 命令會將輸出寫入驅動程式記錄。
  • 您無法存取函式內的 dbutils.widgets 子模組。
  • 函式中參考的任何檔案、模組或對象都必須可串行化,且可在 Spark 上使用。

重複使用現有的批次數據源

您可以使用 foreachBatch(),針對可能沒有結構化串流支持的數據接收器,使用現有的批次數據寫入器。 以下是一些範例:

您可以從 使用 foreachBatch()許多其他批次數據來源。 請參閱 連線至數據源

寫入多個位置

如果您需要將串流查詢的輸出寫入多個位置,Databricks 建議使用多個結構化串流寫入器來獲得最佳平行處理和輸送量。

使用 foreachBatch 來寫入多個接收會串行化串流寫入的執行,這可能會增加每個微批次的延遲。

如果您使用 來寫入多個 Delta ,請參閱 foreachBatch中的 等冪 寫入。