次の方法で共有


Apache Flink® DataStream を Azure Databricks Delta Lake Tables に組み込む

この例では、Azure Databricks 自動ローダーを使用して、AKS 上の HDInsight 上の Apache Flink クラスターから Delta Lake テーブルに Azure ADLS Gen2 のストリーム データをシンクする方法を示します。

前提 条件

  • AKS 上の HDInsight での Apache Flink 1.17.0
  • Apache Kafka 3.2 を HDInsight
  • AKS 上の HDInsight と同じ仮想ネットワークにある Azure Databricks
  • ADLS Gen2 とサービスプリンシパル

Azure Databricks 自動ローダー

Databricks 自動ローダーを使用すると、Flink アプリケーションから Delta Lake テーブルにデータランドをオブジェクト ストレージに簡単にストリーミングできます。 自動ローダー は、cloudFiles と呼ばれる構造化ストリーミング ソースを提供します。

Azure Databricks デルタ ライブ テーブルで Flink のデータを使用する手順を次に示します。

この手順では、Flink SQL で Kafka テーブルと ADLS Gen2 を作成できます。 このドキュメントでは、airplanes_state_real_time tableを使用しています。 任意の記事を使用できます。

コード スニペットで、Kafka クラスターでブローカー IP を更新する必要があります。

CREATE TABLE kafka_airplanes_state_real_time (
   `date` STRING,
   `geo_altitude` FLOAT,
   `icao24` STRING,
   `latitude` FLOAT,
   `true_track` FLOAT,
   `velocity` FLOAT,
   `spi` BOOLEAN,
   `origin_country` STRING,
   `minute` STRING,
   `squawk` STRING,
   `sensors` STRING,
   `hour` STRING,
   `baro_altitude` FLOAT,
   `time_position` BIGINT,
   `last_contact` BIGINT,
   `callsign` STRING,
   `event_time` STRING,
   `on_ground` BOOLEAN,
   `category` STRING,
   `vertical_rate` FLOAT,
   `position_source` INT,
   `current_time` STRING,
   `longitude` FLOAT
 ) WITH (
    'connector' = 'kafka',  
    'topic' = 'airplanes_state_real_time',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

次に、Flink SQL で ADLSgen2 テーブルを作成できます。

コード スニペットのコンテナー名とストレージ アカウント名を、ADLS Gen2 の詳細で更新します。

CREATE TABLE adlsgen2_airplanes_state_real_time (
  `date` STRING,
  `geo_altitude` FLOAT,
  `icao24` STRING,
  `latitude` FLOAT,
  `true_track` FLOAT,
  `velocity` FLOAT,
  `spi` BOOLEAN,
  `origin_country` STRING,
  `minute` STRING,
  `squawk` STRING,
  `sensors` STRING,
  `hour` STRING,
  `baro_altitude` FLOAT,
  `time_position` BIGINT,
  `last_contact` BIGINT,
  `callsign` STRING,
  `event_time` STRING,
  `on_ground` BOOLEAN,
  `category` STRING,
  `vertical_rate` FLOAT,
  `position_source` INT,
  `current_time` STRING,
  `longitude` FLOAT
) WITH (
    'connector' = 'filesystem',
    'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
    'format' = 'json'
);

さらに、Flink SQL の ADLSgen2 テーブルに Kafka テーブルを挿入できます。

スクリーンショットは、Kafka テーブルを ADLSgen2 テーブルに挿入する方法を示しています。

スクリーンショットは、Flink でのストリーミング ジョブの検証を示しています。

Azure portal 上の Azure Storage の Kafka からのデータ シンクを確認する

スクリーンショットは、Azure Storage 上の Kafka からのデータ シンクの確認を示しています。

Azure Storage と Azure Databricks ノートブックの認証

ADLS Gen2 は、Azure Databricks ノートブックからの認証のために、Microsoft Entra アプリケーション サービス プリンシパルを使用して OAuth 2.0 を提供し、その後 Azure Databricks DBFS にマウントします。

サービスプリンシパルのアプリID、テナントID、そして秘密鍵を取得してみましょう。

スクリーンショットには、サービス プリンシパルの appid、テナント ID、シークレット キーの取得が示されています。

Azure portal のストレージ BLOB データ所有者にサービス プリンシパルを付与する

スクリーンショットは、Azure portal のストレージ BLOB データ所有者のサービス プリンシパルを示しています。

Azure Databricks ノートブック で ADLS Gen2 を DBFS にマウントする

スクリーンショットは、Azure Databricks ノートブックで ADLS Gen2 を DBFS にマウントする方法を示しています。

ノートブック の準備

次のコードを記述してみましょう。

%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")

Delta Live テーブル パイプラインを定義し、Azure Databricks で実行する

スクリーンショットは、Delta Live テーブル パイプラインを示し、Azure Databricks で実行します。

スクリーンショットは、Delta Live テーブル パイプラインを示し、Azure Databricks で実行します。

Azure Databricks Notebook で Delta Live テーブルを確認する

スクリーンショットは、Azure Databricks Notebook の Delta Live テーブルを確認する方法を示しています。

参考

  • Apache、Apache Kafka、Kafka、Apache Flink、Flink、および関連するオープンソースプロジェクト名は、Apache Software Foundation (ASF) の商標です。