Azure Databricks Delta Lake テーブルに Apache Flink® DataStream を組み込む
この例では、Azure Databricks 自動ローダーを使用して、HDInsight on AKS 上の Apache Flink クラスターから Azure ADLS Gen2 のストリーム データを Delta Lake テーブルにシンクする方法を示します。
前提条件
- HDInsight on AKS 上の Apache Flink 1.17.0
- HDInsight 上の Apache Kafka 3.2
- HDInsight on AKS と同じ仮想ネットワーク内の Azure Databricks
- ADLS Gen2 とサービス プリンシパル
Azure Databricks の自動ローダー
Databricks 自動ローダーを使用すると、オブジェクト ストレージへのデータを、Flink アプリケーションから Delta Lake テーブルに容易にストリーミングできます。 自動ローダーは、cloudFiles と呼ばれる構造化ストリーミング ソースを提供します。
Azure Databricks Delta Live Tables で Flink からデータを使用する手順は次のとおりです。
Apache Flink® SQL に Apache Kafka® テーブルを作成する
この手順では、Flink SQL 上に Kafka テーブルと ADLS Gen2 を作成できます。 このドキュメントでは、airplanes_state_real_time table
を使います。 任意の記事を使用できます。
コード スニペットで、Kafka クラスターを使用してブローカー IP を更新する必要があります。
CREATE TABLE kafka_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'kafka',
'topic' = 'airplanes_state_real_time',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
次に、Flink SQL で ADLSgen2 テーブルを作成できます。
コード スニペットの container-name と storage-account-name を ADLS Gen2 の詳細で更新します。
CREATE TABLE adlsgen2_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'filesystem',
'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
'format' = 'json'
);
また、Flink SQL で ADLSgen2 テーブルに Kafka テーブルを挿入できます。
Flink 上でストリーミング ジョブを検証する
Azure portal 上の Azure Storage で Kafka からのデータ シンクを確認する
Azure Storage と Azure Databricks ノートブックの認証
ADLS Gen2 は、Azure Databricks ノートブックから認証用 Microsoft Entra ID アプリケーション サービス プリンシパルを OAuth 2.0 に提供し、Azure Databricks DBFS にマウントします。
サービス プリンシパルのアプリ ID、テナント ID、秘密鍵を取得します。
Azure portal でストレージ BLOB データ所有者にサービス プリンシパルを付与する
Azure Databricks ノートブックで、ADLS Gen2 を DBFS にマウントする
ノートブックを準備する
次のコードを記述します。
%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")
Delta Live Table パイプラインを定義し、Azure Databricks で実行する
Azure Databricks Notebook で Delta Live Table を確認する
リファレンス
- Apache、Apache Kafka、Kafka、Apache Flink、Flink、関連するオープン ソース プロジェクト名は、Apache Software Foundation (ASF) の商標です。