共用方式為


什麼是非同步進度追蹤?

重要

這項功能處於 公開預覽狀態

非同步進度追蹤可讓結構化串流管線以非同步方式進行檢查點,並平行處理微批次內的實際資料處理,減少與維護 offsetLogcommitLog 相關聯的延遲。

非同步進度追蹤

注意

非同步進度追蹤不適用於 Trigger.onceTrigger.availableNow 觸發程式。 嘗試使用這些觸發程式啟用這項功能會導致查詢失敗。

非同步進度追蹤如何運作以降低延遲?

結構化串流依賴保存和管理位移作為查詢處理的進度指標。 位移管理作業會直接影響處理延遲,因為除非這些作業完成,否則不會發生任何資料處理。 非同步進度追蹤可讓結構化串流管線進行檢查點進度,而不會受到這些位移管理作業的影響。

何時應該設定檢查點頻率?

使用者可以設定進度檢查點的頻率。 檢查點頻率的預設設定可為大部分查詢提供良好的輸送量。 設定頻率對於位移管理作業的處理速率高於可處理的案例很有説明,這會建立不斷增加的位移管理作業待處理專案。 為了避免這種不斷成長的待處理專案,資料處理會遭到封鎖或變慢,基本上會還原處理行為,以消除非同步進度追蹤的優點。

注意

失敗復原時間隨著檢查點間隔時間增加而增加。 如果失敗,管線必須在先前成功檢查點之前重新處理所有資料。 使用者可以考慮在一般處理期間和復原時間之間降低延遲之間的取捨,以防發生失敗。

哪些設定與非同步進度追蹤相關聯?

選項 價值 預設 描述
asyncProgressTrackingEnabled true/false false 啟用或停用非同步進度追蹤
asyncProgressTrackingCheckpointIntervalMs 毫秒 1000 我們認可位移和完成認可的間隔

使用者如何啟用非同步進度追蹤?

使用者可以使用類似下列程式碼的程式碼來啟用這項功能:

val stream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("subscribe", "in")
      .load()

val query = stream.writeStream
     .format("kafka")
        .option("topic", "out")
     .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "true")
     .start()

關閉非同步進度追蹤

啟用非同步進度追蹤時,架構不會檢查每個批次的檢查點進度。 若要解決此問題,請在停用非同步進度追蹤之前,使用下列設定處理至少兩個微批次:

  • .option("asyncProgressTrackingEnabled", "true")
  • .option("asyncProgressTrackingCheckpointIntervalMs", 0)

在至少兩個微批次完成處理之後,停止查詢。 現在您可以安全地停用非同步進度追蹤並重新啟動查詢。

如果您已停用非同步進度追蹤而不完成此步驟,可能會遇到下列錯誤:

java.lang.IllegalStateException: batch x doesn't exist

在驅動程式記錄中,您可能會看到下列錯誤:

The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.

遵循本節中的指示來停用非同步進度追蹤,可讓您解決這些錯誤並修復串流工作負載。

非同步進度追蹤的限制

這項功能有下列限制:

  • 只有在使用 Kafka 作為接收時,無狀態管線才支援非同步進度追蹤。
  • 一旦端對端處理不保證有非同步進度追蹤,因為批次的位移範圍在失敗時可能會變更。 某些接收,例如 Kafka,永遠不會提供完全一次的保證。