次の方法で共有


非同期の進行状況の追跡とは

重要

この機能は、パブリック プレビューにあります。

非同期の進行状況の追跡により、構造化ストリーミング パイプラインは、マイクロバッチ内の実際のデータ処理と非同期的に並行して進行状況をチェックポイント処理できるため、offsetLogcommitLogの維持に関連する待機時間が短縮されます。

非同期進行状況追跡

手記

非同期の進行状況の追跡は、Trigger.once トリガーまたは Trigger.availableNow トリガーでは機能しません。 これらのトリガーでこの機能を有効にしようとすると、クエリエラーが発生します。

非同期進行状況の追跡は、待機時間を短縮するためにどのように機能しますか?

構造化ストリーミングは、クエリ処理の進行状況インジケーターとしてオフセットの永続化と管理に依存します。 Offset 管理操作は、これらの操作が完了するまでデータ処理を実行できないため、処理の待機時間に直接影響します。 非同期の進行状況の追跡により、構造化ストリーミング パイプラインは、これらの offset 管理操作の影響を受けることなく、進行状況をチェックポイント処理できます。

チェックポイントの頻度を構成する必要があるタイミング

ユーザーは、進行状況がチェックポイント処理される頻度を構成できます。 チェックポイント頻度の既定の設定は、ほとんどのクエリに適したスループットを提供します。 頻度の構成は、offset 管理操作が処理できるよりも高い速度で行われるシナリオに役立ちます。これによって、offset 管理操作のバックログが増え続けます。 この増大するバックログを阻止するために、データ処理はブロックまたは遅くなり、基本的に処理動作を元に戻して、非同期の進行状況追跡の利点を排除します。

手記

障害復旧時間は、チェックポイント間隔の時間の増加に伴って増加します。 障害が発生した場合、パイプラインは、前の正常なチェックポイントの前にすべてのデータを再処理する必要があります。 ユーザーは、通常の処理中の待機時間の短縮と障害が発生した場合の復旧時間との間で、このトレードオフを考慮できます。

非同期の進行状況の追跡に関連付けられている構成は何ですか?

オプション 価値 既定値 説明
asyncProgressTrackingEnabled 真/偽 false 非同期の進行状況追跡を有効または無効にする
asyncProgressTrackingCheckpointIntervalMs ミリ秒 1000 オフセットのコミットと完了コミットの間の間隔

ユーザーが非同期の進行状況追跡を有効にする方法

ユーザーは、次のコードのようなコードを使用して、この機能を有効にすることができます。

val stream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("subscribe", "in")
      .load()

val query = stream.writeStream
     .format("kafka")
        .option("topic", "out")
     .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "true")
     .start()

非同期の進行状況の追跡をオフにする

非同期進行状況の追跡が有効になっている場合、フレームワークはすべてのバッチの進行状況をチェックポイント処理しません。 これに対処するには、非同期の進行状況の追跡を無効にする前に、次の設定で少なくとも 2 つのマイクロバッチを処理します。

  • .option("asyncProgressTrackingEnabled", "true")
  • .option("asyncProgressTrackingCheckpointIntervalMs", 0)

少なくとも 2 つのマイクロバッチの処理が完了したら、クエリを停止します。 これで、非同期進行状況の追跡を安全に無効にし、クエリを再起動できます。

この手順を完了せずに非同期の進行状況の追跡を無効にした場合は、次のエラーが発生する可能性があります。

java.lang.IllegalStateException: batch x doesn't exist

ドライバー ログに、次のエラーが表示される場合があります。

The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.

このセクションの手順に従って非同期の進行状況の追跡を無効にすると、これらのエラーに対処し、ストリーミング ワークロードを修復できます。

非同期進行状況の追跡に関する制限事項

この機能には、次の制限があります。

  • 非同期進行状況の追跡は、Kafka をシンクとして使用する場合にのみステートレス パイプラインでサポートされます。
  • 障害が発生した場合、バッチのoffset範囲が変更される可能性があるため、非同期進行状況追跡では、厳密に 1 回のエンド ツー エンド処理は保証されません。 Kafka などの一部のシンクでは、厳密に 1 回の保証は提供されません。