次の方法で共有


ストリーミングとインジェストの増分

Azure Databricks では、Apache Spark 構造化ストリーミング を使用して、インジェスト ワークロードに関連する次のような多数の製品をバックアップします:

  • 自動ローダー
  • COPY INTO
  • Delta Live Tables パイプライン
  • Databricks SQL の具体化されたビューとストリーミング テーブル

この記事では、ストリーミングと増分バッチ処理セマンティクスの違いについて説明し、Databricks で目的のセマンティクスのインジェスト ワークロードを構成する方法の大まかな概要について説明します。

ストリーミングと増分バッチ インジェストの違い

考えられるインジェスト ワークフローの構成は、凖リアルタイムの処理から、頻度の低い増分バッチ処理まで多岐に分けられます。 どちらのパターンも Apache Spark 構造化ストリーミングを使用して増分処理を作動させますが、セマンティクスは異なります。 わかりやすくするために、この記事では、凖リアルタイム インジェストを ストリーミング インジェスト、より頻度の低い増分処理を 増分バッチ インジェスト と呼びます。

ストリーミング インジェスト

ストリーミングとは、データ インジェストとテーブルの更新のコンテキストで、Azure Databricks が常時接続インフラストラクチャを使用してマイクロバッチ内のソースからシンクにレコードを取り込む、凖リアルタイムのデータ処理を指します。 ストリーミング ワークロードは、インジェストを停止するエラーが発生しない限り、構成されたデータ ソースから更新プログラムを継続的に取り込みます。

増分バッチ インジェスト

増分バッチ インジェストとは、有効期間の短いジョブのデータ ソースからすべての新しいレコードが処理されるパターンを指します。 増分バッチ インジェストは、多くの場合、スケジュールに従って発生しますが、手動で、またはファイルの到着に基づいてトリガーされることもあります。

増分バッチ インジェスト は、バッチ インジェスト とは異なっており、データ ソース内の新しいレコードが自動的に検出され、既に取り込まれているレコードは無視されます。

ジョブを使用したインジェスト

Databricks ジョブを使用すると、ノートブック、ライブラリ、Delta Live Tables パイプライン、Databricks SQL クエリを含むワークフローを調整し、タスクをスケジュールできます。

Note

すべての Azure Databricks コンピューティングの種類とタスクの種類を使用して、増分バッチ インジェストを構成できます。 ストリーミング インジェストは、クラシック ジョブ コンピューティングと Delta Live Tables の運用環境でのみサポートされます。

ジョブには、次の 2 つの主要な操作モードがあります:

  • 連続ジョブ は、エラーが発生した場合は自動的に再試行します。 このモードは、ストリーミング インジェストを目的としています。
  • トリガーされたジョブ では、トリガーされたときにタスクを実行します。 トリガーには次のものが含まれます:
    • 指定したスケジュールでジョブを実行する時間ベースのトリガー。
    • ファイルが指定した場所に配置されたときにジョブを実行するファイル ベースのトリガー。
    • REST API 呼び出し、Azure Databricks CLI コマンドの実行、ワークスペース UI の [今すぐ実行] ボタンのクリックなど、その他のトリガー。

増分バッチ ワークロードの場合は、次のように、AvailableNow トリガー モードを使用してジョブを構成します:

Python

(df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("table_name")
)

Scala

import org.apache.spark.sql.streaming.Trigger

df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("table_name")

ストリーミング ワークロードの場合、既定のトリガー間隔は processingTime ="500ms"。 次の例は、マイクロバッチを 5 秒ごとに処理する方法を示しています。

Python

(df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(processingTime="5 seconds")
  .toTable("table_name")
)

Scala

import org.apache.spark.sql.streaming.Trigger

df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.ProcessingTime, "5 seconds")
  .toTable("table_name")

重要

サーバーレス ジョブは、構造化ストリーミング の Scala、連続モード、または時間ベースのトリガー間隔をサポートしていません。 凖リアルタイムのインジェスト セマンティクスが必要な場合は、クラシック ジョブを使用します。

Delta Live Tables でのインジェスト

ジョブと同様に、Delta Live Tables パイプラインはトリガー モードまたは連続モードで実行できます。 ストリーミング テーブルを使用した凖リアルタイムのストリーミング セマンティクスの場合は、連続モードを使用します。

ストリーミング テーブルを使用して、クラウド オブジェクト ストレージ、Apache Kafka、Amazon Kinesis、Google Pub/Sub、または Apache Pulsar からのストリーミングまたは増分バッチ インジェストを構成します。

LakeFlow Connect では、Delta Live Tables を使用して、接続されたシステムからのインジェスト パイプラインを構成します。 「LakeFlow Connect」を参照してください。

具体化されたビューでは、バッチ ワークロードと同等の操作セマンティクスが保証されますが、結果を段階的に計算するために多くの操作を最適化できます。 「具体化されたビューの更新操作」を参照してください。

Databricks SQL を使用したインジェスト

ストリーミング テーブルを使用して、クラウド オブジェクト ストレージ、Apache Kafka、Amazon Kinesis、Google Pub/Sub、または Apache Pulsar からの増分バッチ インジェストを構成できます。

具体化されたビューを使用して、指定した一連の操作に対して完全に再生可能なソースからの増分バッチ処理を構成できます。 「具体化されたビューの更新操作」を参照してください。

COPY INTO は、クラウド オブジェクト ストレージ内のデータ ファイルの増分バッチ処理に使い慣れた SQL 構文を提供します。 COPY INTO 動作は、クラウド オブジェクト ストレージのストリーミング テーブルでサポートされるパターンに似ていますが、サポートされているすべてのファイル形式に対してすべての既定の設定が等しいわけではありません。