共用方式為


什麼是可設定狀態的串流?

有狀態的結構化串流查詢需要對中繼狀態資訊進行增量更新,而無狀態結構化串流查詢只追蹤從源頭到匯入端的已處理資料行資訊。

具狀態作業包括串流匯總、串流 dropDuplicates、串流聯結和自定義具狀態應用程式。

具狀態結構化串流查詢所需的中繼狀態資訊,如果設定錯誤,可能會導致非預期的延遲和生產問題。

在 Databricks Runtime 13.3 LTS 和更高版本中,您可以藉由使用 RocksDB 啟用變更記錄檢查點,以降低結構化串流工作負載的檢查點持續時間和端到端延遲。 Databricks 建議為所有結構化串流的狀態查詢啟用變更日誌檢查點。 請參閱 <啟用變更記錄檢查點>。

優化有狀態的結構化串流查詢

管理具可設定狀態的結構化串流查詢的中間狀態資訊,有助於防止非預期的延遲和生產問題。

Databricks 建議:

  • 使用計算最佳化的執行個體作為運算工作者。
  • 將隨機分割區數目設定為叢集中核心數目的 1-2 倍。
  • spark.sql.streaming.noDataMicroBatches.enabled 組態設定為SparkSession中的 false。 這可防止串流微批次引擎處理不包含資料的微批次。 另請注意,將此組態設定為 false 可能會導致使用浮水印或處理時間逾時的具狀態作業,資料輸出會在新數據到達後才發生,而不是立即發生。

Databricks 建議使用 RocksDB 搭配變更記錄檢查點來管理可設定狀態的串流的狀態。 請參閱 《在 Azure Databricks 設定 RocksDB 狀態存放區》。

注意

查詢重新啟動之間無法更改狀態管理配置。 如果查詢已使用預設管理啟動,您必須從頭開始使用新的檢查點位置來變更狀態存放區。

在結構化串流中使用多個可設定狀態的運算符

在 Databricks Runtime 13.3 LTS 和更新版本中,Azure Databricks 提供結構化串流工作負載中可設定狀態的運算符的進階支援。 您現在可以將多個具狀態操作串連在一起,這意指您可以將作業的輸出,如視窗聚合,饋送到另一個具狀態的操作,例如聯結。

在 Databricks Runtime 16.2 和更新版本中,您可以在具有多個具狀態運算符的工作負載中使用 transformWithState。 請參閱 建置自定義具狀態應用程式

下列範例示範您可以使用的數種模式。

重要

使用多個可設定狀態的運算符時,存在下列限制:

  • 不支援舊版自定義具狀態運算元(FlatMapGroupWithStateapplyInPandasWithState
  • 僅支援附加輸出模式。

鏈結時間範圍匯總

Python

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()

Scala

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

兩個不同數據流中的時間範圍匯總,後面接著數據流視窗聯結

Python

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

Scala

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

數據流串流時間間隔聯結,後面接著時間範圍匯總

Python

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

Scala

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

結構化串流的狀態重新平衡

DLT 中的所有串流工作負載預設會啟用狀態重新平衡。 在 Databricks Runtime 11.3 LTS 和更新版本中,您可以在 Spark 叢集設定中設定下列組態選項,以啟用狀態重新平衡:

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

有狀態的重新平衡有利於經歷叢集縮放事件的結構化串流管線。 不論叢集大小變更為何,無狀態串流運算都不會受益。

注意

自動縮放計算資源在縮小結構化串流工作負載的叢集大小方面有其限制。 Databricks 建議針對串流工作負載使用增強自動調整功能的 DLT。 請參閱 透過強化自動縮放優化 DLT 管線的叢集效能。

叢集大小調整活動會觸發狀態重整。 當狀態從雲端記憶體載入到新的執行程式時,微批次在重新平衡事件期間可能會有較高的延遲。