共用方式為


將 Apache Flink® DataStream 納入 Azure Databricks Delta Lake 數據表

此範例示範如何使用 Azure Databricks 自動載入器,將 AKS 上 HDInsight 中的 Apache Flink 叢集中的數據匯入 Azure ADLS Gen2,然後保存至 Delta Lake 數據表。

先決條件

Azure Databricks 自動載入器

Databricks 自動載入器可讓您輕鬆地將數據從 Flink 應用程式串流到物件儲存,然後再導入 Delta Lake 資料表。 自動載入器 提供稱為 cloudFiles 的結構化串流來源。

以下是您在 Azure Databricks Delta 實時資料表中使用 Flink 資料的步驟。

在此步驟中,您可以在 Flink SQL 上建立 Kafka 數據表和 ADLS Gen2。 在本檔中,我們使用 airplanes_state_real_time table。 您可以使用您選擇的任何文章。

您需要在代碼段中更新與您的 Kafka 叢集匹配的訊息代理 IP。

CREATE TABLE kafka_airplanes_state_real_time (
   `date` STRING,
   `geo_altitude` FLOAT,
   `icao24` STRING,
   `latitude` FLOAT,
   `true_track` FLOAT,
   `velocity` FLOAT,
   `spi` BOOLEAN,
   `origin_country` STRING,
   `minute` STRING,
   `squawk` STRING,
   `sensors` STRING,
   `hour` STRING,
   `baro_altitude` FLOAT,
   `time_position` BIGINT,
   `last_contact` BIGINT,
   `callsign` STRING,
   `event_time` STRING,
   `on_ground` BOOLEAN,
   `category` STRING,
   `vertical_rate` FLOAT,
   `position_source` INT,
   `current_time` STRING,
   `longitude` FLOAT
 ) WITH (
    'connector' = 'kafka',  
    'topic' = 'airplanes_state_real_time',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

接下來,您可以在 Flink SQL 上建立 ADLSgen2 數據表。

使用您的 ADLS Gen2 詳細數據,更新代碼段中的 container-name 和 storage-account-name。

CREATE TABLE adlsgen2_airplanes_state_real_time (
  `date` STRING,
  `geo_altitude` FLOAT,
  `icao24` STRING,
  `latitude` FLOAT,
  `true_track` FLOAT,
  `velocity` FLOAT,
  `spi` BOOLEAN,
  `origin_country` STRING,
  `minute` STRING,
  `squawk` STRING,
  `sensors` STRING,
  `hour` STRING,
  `baro_altitude` FLOAT,
  `time_position` BIGINT,
  `last_contact` BIGINT,
  `callsign` STRING,
  `event_time` STRING,
  `on_ground` BOOLEAN,
  `category` STRING,
  `vertical_rate` FLOAT,
  `position_source` INT,
  `current_time` STRING,
  `longitude` FLOAT
) WITH (
    'connector' = 'filesystem',
    'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
    'format' = 'json'
);

此外,您可以將 Kafka 數據表插入 Flink SQL 上的 ADLSgen2 資料表。

螢幕快照顯示將 Kafka 資料表插入 ADLSgen2 資料表。

螢幕快照顯示驗證 Flink 上的串流作業。

在 Azure 入口網站上檢查 Azure 儲存體中的 Kafka 數據接收點

螢幕快照顯示從 Kafka 檢查數據接收至 Azure 儲存體。

Azure 儲存體和 Azure Databricks 筆記本的驗證

ADLS Gen2 會為您的 Microsoft Entra 應用程式服務主體提供 OAuth 2.0,以從 Azure Databricks Notebook 進行驗證,然後掛接至 Azure Databricks DBFS。

讓我們取得服務主體 appid、租使用者標識碼和秘密密鑰。

螢幕快照顯示取得服務主體 appid、租戶 ID 和密鑰。

授與服務主體 Azure 入口網站上記憶體 Blob 數據擁有者

螢幕快照顯示 Azure 入口網站上儲存體 Blob 數據擁有者的服務原則。

在 Azure Databricks 筆記本上將 ADLS Gen2 掛載到 DBFS

螢幕快照顯示 Azure Databricks 筆記本上將 ADLS Gen2 掛接至 DBFS。

準備筆記本

讓我們撰寫下列程式代碼:

%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")

定義 Delta Live Table Pipeline 並在 Azure Databricks 上執行

螢幕快照顯示 Delta Live Table Pipeline,並在 Azure Databricks 上執行。

螢幕快照顯示 Delta Live Table Pipeline,並在 Azure Databricks 上執行。

檢查 Azure Databricks Notebook 上的 Delta Live Table

螢幕快照顯示在 Azure Databricks Notebook 上檢查 Delta Live Table。

參考

  • Apache、Apache Kafka、Kafka、Apache Flink、Flink 和相關聯的開放原始碼專案名稱 Apache Software Foundation (ASF) 商標。