次の方法で共有


非同期進行状況追跡とは

重要

この機能はパブリック プレビュー段階にあります。

非同期進行状況追跡を使うと、構造化ストリーミング パイプラインで、非同期的に、マイクロバッチ内の実際のデータ処理と並行して、進行状況のチェックポイントを処理でき、offsetLogcommitLog の維持に関連する待ち時間が短縮されます。

非同期進行状況追跡

注意

非同期進行状況追跡は、Trigger.once または Trigger.availableNow トリガーでは機能しません。 これらのトリガーでこの機能を有効にしようとすると、クエリ エラーが発生します。

非同期進行状況追跡によって待ち時間が短縮される仕組み

構造化ストリーミングは、クエリ処理の進行状況インジケーターとしてオフセットの永続化と管理を利用します。 オフセット管理操作は、これらの操作が完了するまでデータ処理を実行できないため、処理の待ち時間に直接影響します。 非同期進行状況追跡を使うと、構造化ストリーミング パイプラインで、これらのオフセット管理操作の影響を受けることなく、進行状況のチェックポイント処理を行うことができます。

チェックポイントの頻度を構成する必要があるとき

ユーザーは、進行状況のチェックポイント処理が行われる頻度を構成できます。 ほとんどのクエリでは、チェックポイント頻度の既定の設定で適切なスループットが提供されます。 頻度の構成が役に立つのは、オフセット管理操作の発生が速すぎて処理が追いつかないシナリオです。この場合、オフセット管理操作のバックログが増え続けます。 バックログの増大を食い止めるため、データ処理はブロックされるか、速度を下げられ、基本的に処理動作が元に戻されて、非同期進行状況追跡のメリットがなくなります。

注意

チェックポイントの間隔が長くなると、障害復旧時間も長くなります。 障害が発生した場合、パイプラインは、前の正常なチェックポイントの前のすべてのデータを再処理する必要があります。 ユーザーは、通常の処理での待ち時間の短縮と、障害発生時の復旧時間の間の、このトレードオフを考慮できます。

非同期進行状況追跡に関連付けられている構成

オプション Default 説明
asyncProgressTrackingEnabled true/false false 非同期進行状況追跡を有効または無効にします
asyncProgressTrackingCheckpointIntervalMs ミリ秒 1000 オフセットのコミットと完了コミットの間の間隔

ユーザーが非同期進行状況追跡を有効にする方法

ユーザーは、次のようなコードを使って、この機能を有効にできます。

val stream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("subscribe", "in")
      .load()

val query = stream.writeStream
     .format("kafka")
        .option("topic", "out")
     .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "true")
     .start()

非同期の進行状況追跡をオフにする

非同期の進行状況追跡が有効になっている場合、フレームワークではすべてのバッチの進行状況がチェックポイント処理されません。 これに対処するには、非同期の進行状況追跡を無効にする前に、次の設定で少なくとも 2 つのマイクロバッチを処理します。

  • .option("asyncProgressTrackingEnabled", "true")
  • .option("asyncProgressTrackingCheckpointIntervalMs", 0)

少なくとも 2 つのマイクロバッチの処理が完了したら、クエリを停止します。 これで、非同期の進行状況追跡を安全に無効にし、クエリを再開できます。

この手順を完了せずに非同期の進行状況追跡を無効にした場合は、次のエラーが発生する可能性があります。

java.lang.IllegalStateException: batch x doesn't exist

ドライバーのログに、次のエラーが表示される場合があります。

The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.

このセクションの手順に従って非同期の進行状況追跡を無効にすると、これらのエラーに対処し、ストリーミング ワークロードを修復できます。

非同期進行状況追跡に関する制限事項

この機能には次の制限があります。

  • 非同期進行状況追跡は、Kafka をシンクとして使用しているステートレス パイプラインでのみサポートされます。
  • 障害が発生した場合、バッチのオフセット範囲が変更される可能性があるため、非同期進行状況追跡では、厳密に 1 回のエンド ツー エンド処理は保証されません。 Kafka などの一部のシンクでは、厳密に 1 回の保証は提供されません。