使用水印優化 Delta 實時表中的有狀態處理
若要有效管理狀態中保存的數據,請在 Delta Live Tables 中執行具狀態的流處理時使用水印,包括匯總、聯結和去重。 本文說明如何在 Delta Live Tables 查詢中使用浮浮水印,並包含建議作業的範例。
注意
若要確保執行匯總的查詢會逐步處理,且不會在每次更新時完全重新計算,您必須使用水印。
什麼是水印?
在串流處理中,浮水印 是 Apache Spark 功能,可在執行匯總等具狀態作業時定義處理數據的以時間為基礎的閾值。 抵達的數據會經過處理,直到達到臨界值為止,此時臨界值所定義的時間範圍就會關閉。 浮浮水印可用來避免查詢處理期間發生問題,主要是在處理較大的數據集或長時間執行的處理時。 這些問題可能包括產生結果的高延遲,甚至記憶體不足 (OOM) 錯誤,因為處理期間保持狀態的數據量。 由於串流數據原本就未排序,浮水印也支援正確計算時間範圍匯總等作業。
若要深入瞭解如何在串流處理中使用浮浮水印,請參閱 Apache Spark 結構化串流 中的浮浮水印和 套用水印來控制數據處理閾值。
如何定義浮水印?
您可以藉由指定時間戳字段和值來定義浮水印,代表 延遲數據到達的時間閾值。 如果數據在定義的時間臨界值之後到達,則會將其視為遲到。 例如,如果閾值定義為 10 分鐘,可能會卸除 10 分鐘閾值之後抵達的記錄。
由於在定義的臨界值之後抵達的記錄可能會遭到捨棄,因此選取符合延遲與正確性需求的閾值非常重要。 選擇較小的臨界值會導致更快發出記錄,但也表示較晚的記錄更有可能遭到捨棄。 較大的臨界值表示較長的等候時間,但可能更完整的數據。 由於狀態大小較大,因此較大的臨界值可能也需要額外的運算資源。 因為臨界值取決於您的數據和處理需求,因此測試和監視您的處理對於判斷最佳閾值很重要。
您可以使用 Python 中的 withWatermark()
函式來定義浮水印。 在 SQL 中,使用 WATERMARK
子句來定義浮水印:
Python
withWatermark("timestamp", "3 minutes")
SQL
WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES
搭配流與流間的聯結使用水印
針對數據流聯結,您必須在聯結的兩側定義浮水印和時間間隔子句。 因為每個聯結來源對數據的視圖都不完整,因此需要時間間隔子句告知串流引擎何時無法進行進一步的匹配。 時間間隔子句必須使用用來定義浮水印的相同欄位。
因為有時候每個數據流都需要不同的浮浮列印臨界值,因此數據流不需要有相同的臨界值。 為了避免遺漏數據,串流引擎會根據最慢的數據流維護一個全域水位線。
下列範例會聯結廣告曝光串流,以及使用者點擊廣告的串流。 在此範例中,點擊必須在曝光后的 3 分鐘內發生。 經過 3 分鐘的時間間隔之後,卸除無法再比對之狀態的數據列。
Python
import dlt
dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
clicksDf = (read_stream("rawClicks")
.withWatermark("clickTimestamp", "3 minutes")
)
impressionsDf = (read_stream("rawAdImpressions")
.withWatermark("impressionTimestamp", "3 minutes")
)
joinDf = impressionsDf.alias("imp").join(
clicksDf.alias("click"),
expr("""
imp.userId = click.userId AND
clickAdId = impressionAdId AND
clickTimestamp >= impressionTimestamp AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
"""),
"inner"
).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")
return joinDf
SQL
CREATE OR REFRESH STREAMING TABLE
silver.adImpressionClicks
AS SELECT
imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
(bronze.rawAdImpressions)
WATERMARK
impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
(bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
imp.userId = click.userId
AND
clickAdId = impressionAdId
AND
clickTimestamp >= impressionTimestamp
AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
使用浮浮浮水印執行視窗式匯總
串流數據的常見具狀態作業是視窗式匯總。 視窗匯總類似於群組匯總,不同之處在於會針對屬於定義視窗一部分的數據列集傳回匯總值。
視窗可以定義為特定長度,而且匯總作業可以在屬於該視窗的所有數據列上執行。 Spark 串流支援三種類型的視窗:
- 輪轉 (固定) 視窗:一系列的固定大小、非重疊和連續時間間隔。 輸入記錄只屬於單一視窗。
- 滑動視窗:類似於輪轉視窗,滑動視窗是固定大小的,但視窗可以重疊,而記錄可以落入多個視窗。
當數據抵達超過視窗結尾與浮水印長度之和時,視窗將不再接收新數據,匯總結果將被發出,並且視窗的狀態會被卸除。
下列範例會使用固定視窗每 5 分鐘計算曝光數的總和。 在此範例中,select 子句會使用別名 impressions_window
,然後視窗本身會定義為 GROUP BY
子句的一部分。 此視窗必須以與浮水印相同的時間戳記欄為基礎,例如在此範例中的 clickTimestamp
欄。
CREATE OR REFRESH STREAMING TABLE
gold.adImpressionSeconds
AS SELECT
impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
(silver.adImpressionClicks)
WATERMARK
clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
impressionAdId, window(clickTimestamp, "5 minutes")
Python 中的類似範例,用來計算每小時固定時段的利潤:
import dlt
@dlt.table()
def profit_by_hour():
return (
spark.readStream.table("sales")
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour").alias("time"))
.aggExpr("sum(profit) AS profit")
)
重複資料刪除串流記錄
結構化串流具有完全相同的一次處理保證,但不會自動從數據源取消重複的記錄。 例如,由於許多消息佇列至少有一次保證,因此從其中一個消息佇列讀取時,應該預期重複的記錄。 您可以使用 函 dropDuplicatesWithinWatermark()
式來取消任何指定字段上的重複記錄,即使某些欄位不同(例如事件時間或抵達時間)也會從數據流中移除重複專案。 您必須指定浮水印以使用 dropDuplicatesWithinWatermark()
函式。 所有抵達由水印指定的時間範圍內的重複數據都會被丟棄。
已排序的數據很重要,因為順序錯亂的數據會導致浮水印值不正確地向前跳躍。 然後,當較舊的數據到達時,會被視為遲到且已卸除。 使用 withEventTimeOrder
選項,根據浮水印中指定的時間戳,依序處理初始快照。 選項withEventTimeOrder
可以在定義數據集的程式代碼中,或使用 在管線設定宣告。 例如:
{
"spark_conf": {
"spark.databricks.delta.withEventTimeOrder.enabled": "true"
}
}
注意
此選項 withEventTimeOrder
僅支援 Python。
在下列範例中,數據會依 clickTimestamp
排序處理,而在彼此 5 秒內到達且包含重複 userId
和 clickAdId
欄位的記錄將被刪除。
clicksDedupDf = (
spark.readStream.table
.option("withEventTimeOrder", "true")
.table("rawClicks")
.withWatermark("clickTimestamp", "5 seconds")
.dropDuplicatesWithinWatermark(["userId", "clickAdId"]))
優化管線組態以進行具狀態處理
為了協助防止生產問題和過度延遲,Databricks 建議為您的具狀態串流處理啟用 RocksDB 型狀態管理,特別是如果您的處理需要節省大量的中繼狀態。
無伺服器管線會自動管理狀態存放區設定。
您可以在部署管線之前設定下列設定,以啟用 RocksDB 型狀態管理:
{
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
}
}
若要深入瞭解 RocksDB 狀態存放區,包括 RocksDB 的設定建議,請參閱在 Azure Databricks 上設定 RocksDB 狀態存放區。