次の方法で共有


ストリーミング データに対してクエリを実行する

Azure Databricks を使用し、構造化ストリーミングを使用してストリーミング データ ソースに対してクエリを実行できます。 Azure Databricks は、Python と Scala でストリーミング ワークロードに対する広範なサポートを提供し、SQL を使用するほとんどの構造化ストリーミング機能をサポートしています。

次の例は、ノートブックでの対話型開発中にストリーミング データを手動で検査するためにメモリ シンクを使用する方法を示しています。 ノートブック UI の行出力の制限のため、ストリーミング クエリによって読み取られるデータがすべて表示されるとは限らない場合があります。 運用環境のワークロードでは、ストリーミング クエリのトリガーは、ターゲット テーブルまたは外部システムに書き込むことでのみ行う必要があります。

Note

ストリーミング データに対する対話型クエリの SQL サポートは、万能コンピューティングで実行されているノートブックに限定されます。 Databricks SQL または Delta Live Tables でストリーミング テーブルを宣言するときにも、SQL を使用できます。 「Databricks SQL でストリーミング テーブルを使用してデータを読み込む」と「Delta Live Tables とは」を参照してください。

ストリーミング システムからデータに対してクエリを実行する

Azure Databricks には、次のストリーミング システム用のストリーミング データ リーダーが用意されています。

  • Kafka
  • Kinesis
  • PubSub
  • Pulsar

これらのシステムに対してクエリを初期化するときに、構成の詳細を指定する必要があります。これは、構成された環境と読み取り元として選択するシステムによって異なります。 「ストリーミング データ ソースを構成する」を参照してください。

ストリーミング システムを含む一般的なワークロードには、レイクハウスへのデータ インジェストと、外部システムにデータをシンクするためのストリーム処理が含まれます。 ストリーミング ワークロードの詳細については、「Azure Databricks でのストリーミング」を参照してください。

次の例は、Kafka から読み取られる対話型ストリーミングを示しています。

Python

display(spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'latest'
);

テーブルに対してストリーミング読み取りとしてクエリを実行する

Azure Databricks は、既定で Delta Lake を使用してすべてのテーブルを作成します。 Delta テーブルに対してストリーミング クエリを実行すると、テーブルのバージョンがコミットされたときに、クエリが自動的に新しいレコードを選択します。 既定では、ストリーミング クエリは、ソース テーブルに追加されたレコードのみが含まれていると想定します。 更新と削除を含むストリーミング データを操作する必要がある場合、Databricks では Delta Live Tables と APPLY CHANGES INTO を使用することをお勧めします。 APPLY CHANGES API: Delta Live Tables を使用した変更データ キャプチャの簡略化に関する記事を参照してください。

次の例は、テーブルから読み取られる対話型ストリーミングを示しています。

Python

display(spark.readStream.table("table_name"))

SQL

SELECT * FROM STREAM table_name

自動ローダーを使用してクラウド オブジェクト ストレージ内のデータに対してクエリを実行する

自動ローダー (Azure Databricks クラウド データ コネクタ) を使用して、クラウド オブジェクト ストレージからデータをストリーミングできます。 このコネクタは、Unity Catalog ボリュームまたはその他のクラウド オブジェクト ストレージの場所に保存されているファイルとともに使用できます。 Databricks では、クラウド オブジェクト ストレージ内のデータへのアクセスの管理にボリュームを使用することをお勧めします。 「データ ソースに接続する」を参照してください。

Azure Databricks は、一般的な構造化、半構造化、非構造化の形式で格納されているクラウド オブジェクト ストレージ内のデータのストリーミング インジェスト向けに、このコネクタを最適化します。 Databricks では、取り込まれるデータをほぼ生の形式で保存することで、スループットを最大化し、破損したレコードやスキーマの変更による潜在的なデータ損失を最小限に抑えることをお勧めします。

クラウド オブジェクト ストレージからのデータの取り込みに関するその他の推奨事項については、「Databricks レイクハウスにデータを取り込む」をご覧ください。

次の例は、ボリューム内の JSON ファイルのディレクトリから読み取られる対話型ストリーミングを示しています。

Python

display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))

SQL

SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')