次の方法で共有


Azure Databricks Delta Lake テーブルに Apache Flink® DataStream を組み込む

この例では、Azure Databricks 自動ローダーを使用して、HDInsight on AKS 上の Apache Flink クラスターから Azure ADLS Gen2 のストリーム データを Delta Lake テーブルにシンクする方法を示します。

前提条件

Azure Databricks の自動ローダー

Databricks 自動ローダーを使用すると、オブジェクト ストレージへのデータを、Flink アプリケーションから Delta Lake テーブルに容易にストリーミングできます。 自動ローダーは、cloudFiles と呼ばれる構造化ストリーミング ソースを提供します。

Azure Databricks Delta Live Tables で Flink からデータを使用する手順は次のとおりです。

この手順では、Flink SQL 上に Kafka テーブルと ADLS Gen2 を作成できます。 このドキュメントでは、airplanes_state_real_time table を使います。 任意の記事を使用できます。

コード スニペットで、Kafka クラスターを使用してブローカー IP を更新する必要があります。

CREATE TABLE kafka_airplanes_state_real_time (
   `date` STRING,
   `geo_altitude` FLOAT,
   `icao24` STRING,
   `latitude` FLOAT,
   `true_track` FLOAT,
   `velocity` FLOAT,
   `spi` BOOLEAN,
   `origin_country` STRING,
   `minute` STRING,
   `squawk` STRING,
   `sensors` STRING,
   `hour` STRING,
   `baro_altitude` FLOAT,
   `time_position` BIGINT,
   `last_contact` BIGINT,
   `callsign` STRING,
   `event_time` STRING,
   `on_ground` BOOLEAN,
   `category` STRING,
   `vertical_rate` FLOAT,
   `position_source` INT,
   `current_time` STRING,
   `longitude` FLOAT
 ) WITH (
    'connector' = 'kafka',  
    'topic' = 'airplanes_state_real_time',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

次に、Flink SQL で ADLSgen2 テーブルを作成できます。

コード スニペットの container-name と storage-account-name を ADLS Gen2 の詳細で更新します。

CREATE TABLE adlsgen2_airplanes_state_real_time (
  `date` STRING,
  `geo_altitude` FLOAT,
  `icao24` STRING,
  `latitude` FLOAT,
  `true_track` FLOAT,
  `velocity` FLOAT,
  `spi` BOOLEAN,
  `origin_country` STRING,
  `minute` STRING,
  `squawk` STRING,
  `sensors` STRING,
  `hour` STRING,
  `baro_altitude` FLOAT,
  `time_position` BIGINT,
  `last_contact` BIGINT,
  `callsign` STRING,
  `event_time` STRING,
  `on_ground` BOOLEAN,
  `category` STRING,
  `vertical_rate` FLOAT,
  `position_source` INT,
  `current_time` STRING,
  `longitude` FLOAT
) WITH (
    'connector' = 'filesystem',
    'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
    'format' = 'json'
);

また、Flink SQL で ADLSgen2 テーブルに Kafka テーブルを挿入できます。

ADLSgen2 テーブルへの Kafka テーブルの挿入を示すスクリーンショット。

Flink でストリーミング ジョブを検証する方法を示すスクリーンショット。

Azure portal 上の Azure Storage で Kafka からのデータ シンクを確認する

Azure Storage 上の Kafka からのデータ シンクの確認を示すスクリーンショット。

Azure Storage と Azure Databricks ノートブックの認証

ADLS Gen2 は、Azure Databricks ノートブックから認証用 Microsoft Entra ID アプリケーション サービス プリンシパルを OAuth 2.0 に提供し、Azure Databricks DBFS にマウントします。

サービス プリンシパルのアプリ ID、テナント ID、秘密鍵を取得します。

サービス プリンシパルのアプリ ID、テナント ID、秘密鍵ーの取得を示すスクリーンショット。

Azure portal でストレージ BLOB データ所有者にサービス プリンシパルを付与する

Azure portal のストレージ BLOB データ所有者のサービス プリンシパルを示すスクリーンショット。

Azure Databricks ノートブックで、ADLS Gen2 を DBFS にマウントする

Azure Databricks ノートブックで、ADLS Gen2 を DBFS にマウントする方法を示すスクリーンショット。

ノートブックを準備する

次のコードを記述します。

%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")

Delta Live Table パイプラインを定義し、Azure Databricks で実行する

Delta Live Table パイプラインを定義し、Azure Databricks で実行する方法を示すスクリーンショット。

Delta Live Table Pipeline と Azure Databricks で実行されるパイプラインを示すスクリーンショット。

Azure Databricks Notebook で Delta Live Table を確認する

Azure Databricks Notebook で Delta Live Table を確認する方法を示すスクリーンショット。

リファレンス

  • Apache、Apache Kafka、Kafka、Apache Flink、Flink、関連するオープン ソース プロジェクト名は、Apache Software Foundation (ASF) の商標です。