在 DLT 中使用水印來優化具狀態處理
若要有效管理狀態儲存的資料,請在 DLT 中進行狀態數據流處理時使用水印,包括匯總、連接和資料去重。 本文說明如何在 DLT 查詢中使用水印,並包含建議作業的範例。
注意
若要確保執行聚合的查詢以逐步方式處理,而不是每次更新時都重新計算,您必須使用水印。
什麼是水印?
在串流處理中,浮水印 是 Apache Spark 功能,可在執行匯總等具狀態作業時定義處理數據的以時間為基礎的閾值。 抵達的數據會經過處理,直到達到臨界值為止,此時臨界值所定義的時間範圍就會關閉。 浮浮水印可用來避免查詢處理期間發生問題,主要是在處理較大的數據集或長時間執行的處理時。 這些問題可能包括產生結果的高延遲,甚至記憶體不足 (OOM) 錯誤,因為處理期間保持狀態的數據量。 由於串流數據原本就未排序,浮水印也支援正確計算時間範圍匯總等作業。
若要深入瞭解如何在串流處理中使用水印,請參閱 Apache Spark 結構化串流中的 水印 和 套用水印來控制數據處理閾值。
如何定義浮水印?
您可以藉由指定時間戳字段和值來定義浮水印,代表 延遲數據到達的時間閾值。 如果數據在定義的時間臨界值之後到達,則會將其視為遲到。 例如,如果時間閾值定義為 10 分鐘,超過 10 分鐘閾值後抵達的記錄可能會被刪除。
由於在定義的臨界值之後抵達的記錄可能會遭到捨棄,因此選取符合延遲與正確性需求的閾值非常重要。 選擇較小的臨界值會導致更快發出記錄,但也表示較晚的記錄更有可能遭到捨棄。 較大的臨界值表示較長的等候時間,但可能更完整的數據。 由於狀態大小較大,因此較大的臨界值可能也需要額外的運算資源。 因為臨界值取決於您的數據和處理需求,因此測試和監視您的處理對於判斷最佳閾值很重要。
您可以使用 Python 中的 withWatermark()
函式來定義浮水印。 在 SQL 中,使用 WATERMARK
子句來定義浮水印:
蟒
withWatermark("timestamp", "3 minutes")
SQL
WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES
流與流聯結中使用浮水印
針對數據流聯結,您必須在聯結雙方定義浮水印,並設定時間間隔子句。 因為每個聯結來源都有不完整的數據視圖,因此需要時間區間子句來告知串流引擎何時無法進行進一步的匹配。 時間間隔子句必須使用用來定義浮水印的相同欄位。
因為有時候每個數據流都需要不同的浮水印臨界值,因此數據流不需要有相同的臨界值。 為了避免遺漏數據,串流引擎會根據最慢的數據流維護一個全域水位線。
下列範例將聯結廣告曝光的串流和用戶點擊廣告的串流。 在此範例中,點擊必須在曝光后的 3 分鐘內發生。 經過 3 分鐘的時間間隔後,刪除那些無法再匹配的狀態數據列。
蟒
import dlt
dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
clicksDf = (read_stream("rawClicks")
.withWatermark("clickTimestamp", "3 minutes")
)
impressionsDf = (read_stream("rawAdImpressions")
.withWatermark("impressionTimestamp", "3 minutes")
)
joinDf = impressionsDf.alias("imp").join(
clicksDf.alias("click"),
expr("""
imp.userId = click.userId AND
clickAdId = impressionAdId AND
clickTimestamp >= impressionTimestamp AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
"""),
"inner"
).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")
return joinDf
SQL
CREATE OR REFRESH STREAMING TABLE
silver.adImpressionClicks
AS SELECT
imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
(bronze.rawAdImpressions)
WATERMARK
impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
(bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
imp.userId = click.userId
AND
clickAdId = impressionAdId
AND
clickTimestamp >= impressionTimestamp
AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
使用水印執行窗口匯總
串流數據的常見具狀態作業是視窗式匯總。 視窗匯總類似於群組匯總,不同之處在於會針對屬於定義視窗一部分的數據列集傳回匯總值。
視窗可以定義為特定長度,而且匯總作業可以在屬於該視窗的所有數據列上執行。 Spark 串流支援三種類型的視窗:
- 輪轉(固定)視窗:一系列的固定大小、非重疊和連續時間間隔。 輸入記錄只屬於單一視窗。
- 滑動視窗:類似於輪轉視窗,滑動視窗是固定大小的,但視窗可以重疊,而記錄可以落入多個視窗。
當數據到達窗口結尾加上浮水印的長度時,不再接受新的數據,並將發出聚合結果,清除窗口的狀態。
下列範例會使用固定視窗每 5 分鐘計算一次曝光數的總和。 在此範例中,select 子句會使用別名 impressions_window
,然後視窗本身會定義為 GROUP BY
子句的一部分。 此視窗必須以與水印相同的時間戳記欄位為基礎,本範例中的 clickTimestamp
欄位。
CREATE OR REFRESH STREAMING TABLE
gold.adImpressionSeconds
AS SELECT
impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
(silver.adImpressionClicks)
WATERMARK
clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
impressionAdId, window(clickTimestamp, "5 minutes")
Python 中的類似範例,用來計算每小時固定時段的利潤:
import dlt
@dlt.table()
def profit_by_hour():
return (
spark.readStream.table("sales")
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour").alias("time"))
.aggExpr("sum(profit) AS profit")
)
去重串流記錄
結構化串流具有完全相同的一次處理保證,但不會自動從數據源取消重複的記錄。 例如,因為許多消息佇列具有至少一次傳遞保證,因此在從這些佇列讀取時,應預期會出現重複記錄。 您可以使用 dropDuplicatesWithinWatermark()
函式來取消任何指定欄位上的重複記錄,即使某些欄位不同(例如事件時間或抵達時間),也會從數據流中移除重複專案。 您必須指定浮水印以使用 dropDuplicatesWithinWatermark()
函式。 所有在水印指定時間範圍內抵達的重複數據都會被丟棄。
已排序的數據很重要,因為順序錯亂的數據會導致浮水印值不正確地向前跳躍。 然後,當較舊的數據到達時,會被視為延遲並被丟棄。 使用 [withEventTimeOrder
] 選項,根據浮水印中指定的時間戳,依序處理初始快照。 您可以在定義資料集的程式碼中,或在使用 spark.databricks.delta.withEventTimeOrder.enabled
的 管線設定 中宣告 withEventTimeOrder
選項。 例如:
{
"spark_conf": {
"spark.databricks.delta.withEventTimeOrder.enabled": "true"
}
}
注意
withEventTimeOrder
選項僅支援 Python。
在下列範例中,數據按照 clickTimestamp
進行排序和處理,而在 5 秒內相繼到達,且包含重複 userId
和 clickAdId
欄位的記錄將被刪除。
clicksDedupDf = (
spark.readStream.table
.option("withEventTimeOrder", "true")
.table("rawClicks")
.withWatermark("clickTimestamp", "5 seconds")
.dropDuplicatesWithinWatermark(["userId", "clickAdId"]))
優化流程組態以進行具狀態處理
為了協助防止生產問題和過度延遲,Databricks 建議為您的具狀態串流處理啟用 RocksDB 型狀態管理,特別是如果您的處理需要節省大量的中繼狀態。
無伺服器化的管道會自動管理系統狀態存儲配置。
您可以在部署管線之前設定下列設定,以啟用 RocksDB 型狀態管理:
{
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
}
}
若要深入瞭解 RocksDB 狀態存放區,包括 RocksDB 的設定建議,請參閱 在 Azure Databricks 上設定 RocksDB 狀態存放區。