套用浮水印來控制資料處理閾值
本文介紹浮浮水印的基本概念,並提供在一般具狀態串流作業中使用浮浮浮水印的建議。 您必須將浮水印套用至具狀態串流作業,以避免無限地擴充保留在狀態中的數據量,這可能會造成記憶體問題,並在長時間執行的串流作業期間增加處理延遲。
什麼是水印?
結構化串流會使用浮浮浮浮水印來控制要持續處理指定狀態實體更新多久的臨界值。 狀態實體的常見範例包括:
- 時間範圍中的匯總。
- 兩個數據流之間連接的唯一鍵值。
當您宣告浮水印時,您會在串流數據框架上指定時間戳欄位和浮水印閾值。 當新數據送達時,狀態管理員會追蹤指定欄位中最新的時間戳,並在延遲閾值內處理所有記錄。
下列範例會將 10 分鐘的水位線閾值套用至視窗計數:
from pyspark.sql.functions import window
(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)
在此範例中:
-
event_time
欄可用來定義 10 分鐘水印和 5 分鐘的滾動窗口。 - 針對每個非重疊的 5 分鐘視窗,會收集每個
id
觀察到的計數。 - 狀態資訊會針對每個計數進行維護,直到時間範圍結束時,比觀察到的最新
event_time
還要晚 10 分鐘。
重要
水印閾值保證會根據所定義查詢的語意來處理在指定閾值內抵達的記錄。 抵達指定臨界值以外的延遲記錄仍可使用查詢計量進行處理,但這並不保證。
浮浮水印如何影響處理時間和輸送量?
浮水印會與輸出模式互動,以控制資料寫入接收器的時機。 由於浮水印會減少要處理的狀態資訊總量,因此有效使用浮水印對於有效率的具狀態串流輸送量至關重要。
注意
並非所有的輸出模式都支援所有具狀態作業。
視窗匯總的浮浮浮浮水印和輸出模式
下表詳細說明具已定義水印的時間戳聚合查詢處理:
輸出模式 | 行為 |
---|---|
附加 | 一旦水位線閾值通過,數據列就會寫入目標數據表。 所有寫入都會根據延遲閾值延遲。 一旦超過閾值,就會卸除舊的匯總狀態。 |
更新 | 數據列會在計算結果時寫入目標數據表,而且可在新數據送達時更新和覆寫。 一旦超過閾值,就會卸除舊的匯總狀態。 |
完成 | 匯總狀態不會卸除。 目標數據表會在每次觸發程式時被重寫。 |
數據流聯結的浮浮浮浮水印和輸出
多個數據流之間的聯結僅支援附加模式,且相符的記錄會在探索到的每個批次中寫入。 針對內連接,Databricks 建議在每個串流數據源上設定水位閾值。 這可讓舊記錄捨棄狀態資訊。 如果沒有水印,結構化串流會在每次觸發時嘗試聯結每個鍵值來自兩端的聯結。
結構化串流具有特殊的語意,可支援外部聯結。 外部聯結必須進行浮浮水印處理,因為它表示在進行不相符之後,何時必須以 Null 值寫入索引鍵。 請注意,雖然外部聯結對於記錄在數據處理期間永遠不會相符的記錄很有用,因為聯結只會寫入數據表做為附加作業,但直到延遲閾值過後才會記錄此遺漏的數據。
在結構化串流中使用多個浮水印原則控制延遲數據閾值
使用多個結構化串流輸入時,您可以設定多個浮水印來控制延遲抵達數據的容錯閾值。 設定浮浮浮浮水印可讓您控制狀態資訊並影響延遲。
串流查詢可以有多個聯集或聯結在一起的輸入數據流。 每個輸入數據流都可以有不同的延遲數據臨界值,這些閾值必須容許進行具狀態作業。 在每個輸入數據流上使用 來指定這些臨界 withWatermarks("eventTime", delay)
值。 以下是具有 數據流聯結的範例查詢。
val inputStream1 = ... // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
執行查詢時,結構化串流會個別追蹤每個輸入數據流中所看到的最大事件時間,根據對應的延遲計算水位標記,並選擇單一全域水位標記來用於具狀態的操作。 根據預設,選擇最小值作為全域水位線,因為它可確保當其中一個數據流落後於其他數據流時,不會意外捨棄任何數據(例如,其中一個數據流會因上游失敗而停止接收數據)。 換句話說,全域水印會以最慢的數據流速度安全地移動,並因此延遲查詢結果的輸出。
如果您想要取得更快的結果,您可以設定多重浮水印策略,藉由將 SQL 設定從 spark.sql.streaming.multipleWatermarkPolicy
調整為 max
來選擇最大值作為全局浮水印(預設值為 min
)。 這可讓全球水位線以最快的數據流速度移動。 不過,此設定會從最慢的數據流卸除數據。 因此,Databricks 建議您明智地使用此設定。
去除浮水印之內的重複項目
在 Databricks Runtime 13.3 LTS 和以上版本中,您可以使用唯一標識符在水印閾值內去重記錄。
結構化串流提供完全一次的處理保證,但不會自動從數據源刪除記錄。 您可以使用 dropDuplicatesWithinWatermark
在任何指定的欄位上進行去重,從而讓您即使某些欄位不同(例如事件時間或抵達時間)也能從資料流中移除重複的記錄。
保證會在指定水印內抵達的重複記錄被移除。 此保證只有一個方向嚴格,而且可能也會捨棄到達指定閾值以外的重複記錄。 您必須將浮水印的延遲臨界值設定得比重複事件之間的時間戳記的最大差異長,以移除所有重複事件。
您必須指定浮水印以使用 dropDuplicatesWithinWatermark
方法,如下列範例所示:
Python
streamingDf = spark.readStream. ...
# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)
Scala
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])