次の方法で共有


最初の構造化ストリーミング ワークロードを実行する

この記事では、Azure Databricks で最初の構造化ストリーミング クエリを実行するために必要な基本的な概念のコード例と説明を示します。 構造化ストリーミングは、ほぼリアルタイムおよび増分処理ワークロードに使用できます。

構造化ストリーミングは、Delta Live Tables のストリーミング テーブルの動力であるいくつかのテクノロジの 1 つです。 Databricks では、すべての新しい ETL、インジェスト、および構造化ストリーミング ワークロードにデルタ ライブ テーブルを使用することをお勧めします。 「Delta Live Tables とは」を参照してください。

Note

Delta Live Tables にはストリーミング テーブルを宣言するための若干変更された構文が用意されていますが、ストリーミングの読み取りと変換を構成するための一般的な構文は、Azure Databricks のすべてのストリーミング ユース ケースに適用されます。 Delta Live Tables では、状態情報、メタデータ、および多数の構成を管理することで、ストリーミングも簡略化します。

自動ローダーを使用してオブジェクト ストレージからストリーミング データを読み取る

次の例では、cloudFiles を使用して形式とオプションを示す、自動ローダーを使用して JSON データを読み込む方法を示します。 schemaLocation オプションを使用すると、スキーマの推論と進化が可能になります。 Databricks ノートブック セルに次のコードを貼り付け、セルを実行して、raw_df という名前のストリーミング DataFrame を作成します:

file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

raw_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .load(file_path)
)

Azure Databricks での他の読み取り操作と同様に、ストリーミング読み取りを構成しても、実際にはデータは読み込まれません。 ストリームが開始する前に、データに対してアクションをトリガーする必要があります。

Note

ストリーミング DataFrame で display() を呼び出すと、ストリーミング ジョブが開始されます。 ほとんどの構造化ストリーミングのユース ケースでは、ストリームをトリガーするアクションはシンクにデータを書き込む必要があります。 「構造化ストリーミングの運用に関する考慮事項」を参照してください。

ストリーミング変換を実行する

構造化ストリーミングでは、Azure Databricks と Spark SQL で使用できるほとんどの変換がサポートされています。 MLflow モデルを UDF として読み込み、変換としてストリーミング予測を行うことさえできます。

次のコード例では、Spark SQL 関数を使用して、取り込まれた JSON データを追加情報で強化するための単純な変換を完了します:

from pyspark.sql.functions import col, current_timestamp

transformed_df = (raw_df.select(
    "*",
    col("_metadata.file_path").alias("source_file"),
    current_timestamp().alias("processing_time")
    )
)

結果の transformed_df には、データ ソースに到着するたびに各レコードを読み込んで変換するクエリ命令が含まれます。

Note

構造化ストリーミングは、データ ソースを無制限または無限のデータセットとして扱います。 そのため、構造化ストリーミング ワークロードでは、無限の数の項目を並べ替える必要があるため、一部の変換はサポートされていません。

ほとんどの集計と多くの結合では、ウォーターマーク、ウィンドウ、出力モードを使用して状態情報を管理する必要があります。 「透かしを適用してデータ処理のしきい値を制御する」を参照してください。

Delta Lake への増分バッチ書き込みを実行する

次の例では、指定したファイル パスとチェックポイントを使用して Delta Lake に書き込みます。

重要

構成するストリーミング ライターごとに、必ず一意のチェックポイントの場所を指定してください。 チェックポイントはストリームの一意の ID を提供し、処理されたすべてのレコードとストリーミング クエリに関連付けられている状態情報を追跡します。

トリガーの availableNow 設定は、ソース データセットから以前に処理されていないすべてのレコードを処理し、シャットダウンするように構造化ストリームに指示します。そのため、ストリームを実行したままにすることなく、次のコードを安全に実行できます:

target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

transformed_df.writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .option("path", target_path)
    .start()

この例では、データ ソースに新しいレコードが到着しないため、このコードを繰り返し実行しても新しいレコードは取り込まれません。

警告

構造化ストリーミングの実行により、自動終了によってコンピューティング リソースがシャットダウンされるのを防ぐことができます。 予期しないコストを回避するには、必ずストリーミング クエリを終了してください。

Delta Lake からデータを読み取り、変換、Delta Lake に書き込む

Delta Lake には、ソースとシンクの両方として構造化ストリーミングを使用するための広範なサポートがあります。 「差分テーブルのストリーミング読み取りと書き込み」を参照してください。

次の例は、Delta テーブルからすべての新しいレコードを増分読み込みし、それらを別の Delta テーブルのスナップショットと結合して、Delta テーブルに書き込む構文の例を示しています:

(spark.readStream
    .table("<table-name1>")
    .join(spark.read.table("<table-name2>"), on="<id>", how="left")
    .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", "<checkpoint-path>")
    .toTable("<table-name3>")
)

ソース テーブルを読み取り、ターゲット テーブルと指定されたチェックポイントの場所に書き込むには、適切なアクセス許可が構成されている必要があります。 データ ソースとシンクに関連する値を使用して、山かっこ (<>) で示されているすべてのパラメーターを入力します。

Note

Delta Live Table は、Delta Lake パイプラインを作成するための完全な宣言構文を提供し、トリガーやチェックポイントなどのプロパティを自動的に管理します。 「Delta Live Tables とは」を参照してください。

Kafka からデータを読み取り、変換、Kafka に書き込む

Apache Kafka やその他のメッセージング バスは、大規模なデータセットで使用可能な待機時間が最も短い一部を提供します。 Azure Databricks を使用して、Kafka から取り込まれたデータに変換を適用し、Kafka にデータを書き戻すことができます。

Note

クラウド オブジェクト ストレージにデータを書き込むと、待機時間のオーバーヘッドが増加します。 Delta Lake にメッセージング バスからのデータを格納するが、ストリーミング ワークロードに可能な限り短い待機時間が必要な場合、Databricks では、データを Lakehouse に取り込み、ダウンストリーム メッセージング バス シンクにほぼリアルタイムの変換を適用するように個別のストリーミング ジョブを構成することをお勧めします。

次のコード例は、Delta テーブル内のデータと結合し、Kafka に書き戻すことによって Kafka からのデータをエンリッチする簡単なパターンを示しています:

(spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "<topic>")
    .option("startingOffsets", "latest")
    .load()
    .join(spark.read.table("<table-name>"), on="<id>", how="left")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("topic", "<topic>")
    .option("checkpointLocation", "<checkpoint-path>")
    .start()
)

Kafka サービスへのアクセス用に適切なアクセス許可が構成されている必要があります。 データ ソースとシンクに関連する値を使用して、山かっこ (<>) で示されているすべてのパラメーターを入力します。 「Apache Kafka と Azure Databricks を使用したストリーム処理」を参照してください。