共用方式為


在 DLT 中使用水印來優化具狀態處理

若要有效管理狀態儲存的資料,請在 DLT 中進行狀態數據流處理時使用水印,包括匯總、連接和資料去重。 本文說明如何在 DLT 查詢中使用水印,並包含建議作業的範例。

注意

若要確保執行聚合的查詢以逐步方式處理,而不是每次更新時都重新計算,您必須使用水印。

什麼是水印?

在串流處理中,浮水印 是 Apache Spark 功能,可在執行匯總等具狀態作業時定義處理數據的以時間為基礎的閾值。 抵達的數據會經過處理,直到達到臨界值為止,此時臨界值所定義的時間範圍就會關閉。 浮浮水印可用來避免查詢處理期間發生問題,主要是在處理較大的數據集或長時間執行的處理時。 這些問題可能包括產生結果的高延遲,甚至記憶體不足 (OOM) 錯誤,因為處理期間保持狀態的數據量。 由於串流數據原本就未排序,浮水印也支援正確計算時間範圍匯總等作業。

若要深入瞭解如何在串流處理中使用水印,請參閱 Apache Spark 結構化串流中的 水印套用水印來控制數據處理閾值

如何定義浮水印?

您可以藉由指定時間戳字段和值來定義浮水印,代表 延遲數據到達的時間閾值。 如果數據在定義的時間臨界值之後到達,則會將其視為遲到。 例如,如果時間閾值定義為 10 分鐘,超過 10 分鐘閾值後抵達的記錄可能會被刪除。

由於在定義的臨界值之後抵達的記錄可能會遭到捨棄,因此選取符合延遲與正確性需求的閾值非常重要。 選擇較小的臨界值會導致更快發出記錄,但也表示較晚的記錄更有可能遭到捨棄。 較大的臨界值表示較長的等候時間,但可能更完整的數據。 由於狀態大小較大,因此較大的臨界值可能也需要額外的運算資源。 因為臨界值取決於您的數據和處理需求,因此測試和監視您的處理對於判斷最佳閾值很重要。

您可以使用 Python 中的 withWatermark() 函式來定義浮水印。 在 SQL 中,使用 WATERMARK 子句來定義浮水印:

withWatermark("timestamp", "3 minutes")

SQL

WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES

流與流聯結中使用浮水印

針對數據流聯結,您必須在聯結雙方定義浮水印,並設定時間間隔子句。 因為每個聯結來源都有不完整的數據視圖,因此需要時間區間子句來告知串流引擎何時無法進行進一步的匹配。 時間間隔子句必須使用用來定義浮水印的相同欄位。

因為有時候每個數據流都需要不同的浮水印臨界值,因此數據流不需要有相同的臨界值。 為了避免遺漏數據,串流引擎會根據最慢的數據流維護一個全域水位線。

下列範例將聯結廣告曝光的串流和用戶點擊廣告的串流。 在此範例中,點擊必須在曝光后的 3 分鐘內發生。 經過 3 分鐘的時間間隔後,刪除那些無法再匹配的狀態數據列。

import dlt

dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
  clicksDf = (read_stream("rawClicks")
    .withWatermark("clickTimestamp", "3 minutes")
  )
  impressionsDf = (read_stream("rawAdImpressions")
    .withWatermark("impressionTimestamp", "3 minutes")
  )
  joinDf = impressionsDf.alias("imp").join(
  clicksDf.alias("click"),
  expr("""
    imp.userId = click.userId AND
    clickAdId = impressionAdId AND
    clickTimestamp >= impressionTimestamp AND
    clickTimestamp <= impressionTimestamp + interval 3 minutes
  """),
  "inner"
  ).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")

  return joinDf

SQL

CREATE OR REFRESH STREAMING TABLE
  silver.adImpressionClicks
AS SELECT
  imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
  (bronze.rawAdImpressions)
WATERMARK
  impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
  (bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
  imp.userId = click.userId
AND
  clickAdId = impressionAdId
AND
  clickTimestamp >= impressionTimestamp
AND
  clickTimestamp <= impressionTimestamp + interval 3 minutes

使用水印執行窗口匯總

串流數據的常見具狀態作業是視窗式匯總。 視窗匯總類似於群組匯總,不同之處在於會針對屬於定義視窗一部分的數據列集傳回匯總值。

視窗可以定義為特定長度,而且匯總作業可以在屬於該視窗的所有數據列上執行。 Spark 串流支援三種類型的視窗:

  • 輪轉(固定)視窗:一系列的固定大小、非重疊和連續時間間隔。 輸入記錄只屬於單一視窗。
  • 滑動視窗:類似於輪轉視窗,滑動視窗是固定大小的,但視窗可以重疊,而記錄可以落入多個視窗。

當數據到達窗口結尾加上浮水印的長度時,不再接受新的數據,並將發出聚合結果,清除窗口的狀態。

下列範例會使用固定視窗每 5 分鐘計算一次曝光數的總和。 在此範例中,select 子句會使用別名 impressions_window,然後視窗本身會定義為 GROUP BY 子句的一部分。 此視窗必須以與水印相同的時間戳記欄位為基礎,本範例中的 clickTimestamp 欄位。

CREATE OR REFRESH STREAMING TABLE
  gold.adImpressionSeconds
AS SELECT
  impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
  (silver.adImpressionClicks)
WATERMARK
  clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
  impressionAdId, window(clickTimestamp, "5 minutes")

Python 中的類似範例,用來計算每小時固定時段的利潤:

import dlt

@dlt.table()
def profit_by_hour():
  return (
    spark.readStream.table("sales")
      .withWatermark("timestamp", "1 hour")
      .groupBy(window("timestamp", "1 hour").alias("time"))
      .aggExpr("sum(profit) AS profit")
  )

去重串流記錄

結構化串流具有完全相同的一次處理保證,但不會自動從數據源取消重複的記錄。 例如,因為許多消息佇列具有至少一次傳遞保證,因此在從這些佇列讀取時,應預期會出現重複記錄。 您可以使用 dropDuplicatesWithinWatermark() 函式來取消任何指定欄位上的重複記錄,即使某些欄位不同(例如事件時間或抵達時間),也會從數據流中移除重複專案。 您必須指定浮水印以使用 dropDuplicatesWithinWatermark() 函式。 所有在水印指定時間範圍內抵達的重複數據都會被丟棄。

已排序的數據很重要,因為順序錯亂的數據會導致浮水印值不正確地向前跳躍。 然後,當較舊的數據到達時,會被視為延遲並被丟棄。 使用 [withEventTimeOrder] 選項,根據浮水印中指定的時間戳,依序處理初始快照。 您可以在定義資料集的程式碼中,或在使用 spark.databricks.delta.withEventTimeOrder.enabled管線設定 中宣告 withEventTimeOrder 選項。 例如:

{
  "spark_conf": {
    "spark.databricks.delta.withEventTimeOrder.enabled": "true"
  }
}

注意

withEventTimeOrder 選項僅支援 Python。

在下列範例中,數據按照 clickTimestamp進行排序和處理,而在 5 秒內相繼到達,且包含重複 userIdclickAdId 欄位的記錄將被刪除。

clicksDedupDf = (
  spark.readStream.table
    .option("withEventTimeOrder", "true")
    .table("rawClicks")
    .withWatermark("clickTimestamp", "5 seconds")
    .dropDuplicatesWithinWatermark(["userId", "clickAdId"]))

優化流程組態以進行具狀態處理

為了協助防止生產問題和過度延遲,Databricks 建議為您的具狀態串流處理啟用 RocksDB 型狀態管理,特別是如果您的處理需要節省大量的中繼狀態。

無伺服器化的管道會自動管理系統狀態存儲配置。

您可以在部署管線之前設定下列設定,以啟用 RocksDB 型狀態管理:

{
  "configuration": {
    "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
  }
}

若要深入瞭解 RocksDB 狀態存放區,包括 RocksDB 的設定建議,請參閱 在 Azure Databricks 上設定 RocksDB 狀態存放區