Apache Flink® DataStream を Azure Databricks Delta Lake Tables に組み込む
この例では、Azure Databricks 自動ローダーを使用して、AKS 上の HDInsight 上の Apache Flink クラスターから Delta Lake テーブルに Azure ADLS Gen2 のストリーム データをシンクする方法を示します。
前提 条件
- AKS 上の HDInsight での Apache Flink 1.17.0
- Apache Kafka 3.2 を HDInsight で
- AKS 上の HDInsight と同じ仮想ネットワークにある Azure Databricks
- ADLS Gen2 とサービスプリンシパル
Azure Databricks 自動ローダー
Databricks 自動ローダーを使用すると、Flink アプリケーションから Delta Lake テーブルにデータランドをオブジェクト ストレージに簡単にストリーミングできます。 自動ローダー は、cloudFiles と呼ばれる構造化ストリーミング ソースを提供します。
Azure Databricks デルタ ライブ テーブルで Flink のデータを使用する手順を次に示します。
Apache Flink® SQL で Apache Kafka® テーブルを作成する
この手順では、Flink SQL で Kafka テーブルと ADLS Gen2 を作成できます。 このドキュメントでは、airplanes_state_real_time table
を使用しています。 任意の記事を使用できます。
コード スニペットで、Kafka クラスターでブローカー IP を更新する必要があります。
CREATE TABLE kafka_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'kafka',
'topic' = 'airplanes_state_real_time',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
次に、Flink SQL で ADLSgen2 テーブルを作成できます。
コード スニペットのコンテナー名とストレージ アカウント名を、ADLS Gen2 の詳細で更新します。
CREATE TABLE adlsgen2_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'filesystem',
'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
'format' = 'json'
);
さらに、Flink SQL の ADLSgen2 テーブルに Kafka テーブルを挿入できます。
Flink でストリーミング ジョブを検証する
Azure portal 上の Azure Storage の Kafka からのデータ シンクを確認する
Azure Storage と Azure Databricks ノートブックの認証
ADLS Gen2 は、Azure Databricks ノートブックからの認証のために、Microsoft Entra アプリケーション サービス プリンシパルを使用して OAuth 2.0 を提供し、その後 Azure Databricks DBFS にマウントします。
サービスプリンシパルのアプリID、テナントID、そして秘密鍵を取得してみましょう。
Azure portal のストレージ BLOB データ所有者にサービス プリンシパルを付与する
Azure Databricks ノートブック で ADLS Gen2 を DBFS にマウントする
ノートブック の準備
次のコードを記述してみましょう。
%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")
Delta Live テーブル パイプラインを定義し、Azure Databricks で実行する
Azure Databricks Notebook で Delta Live テーブルを確認する
参考
- Apache、Apache Kafka、Kafka、Apache Flink、Flink、および関連するオープンソースプロジェクト名は、Apache Software Foundation (ASF) の商標です。