將 Apache Flink® DataStream 納入 Azure Databricks Delta Lake 數據表
此範例示範如何使用 Azure Databricks 自動載入器,將 AKS 上 HDInsight 中的 Apache Flink 叢集中的數據匯入 Azure ADLS Gen2,然後保存至 Delta Lake 數據表。
先決條件
- 在 AKS 上的 HDInsight 中運行的 Apache Flink 1.17.0
- HDInsight Apache Kafka 3.2
- 在與 AKS 上的 HDInsight 相同的虛擬網路中,Azure Databricks。
- ADLS Gen2 和服務主體
Azure Databricks 自動載入器
Databricks 自動載入器可讓您輕鬆地將數據從 Flink 應用程式串流到物件儲存,然後再導入 Delta Lake 資料表。 自動載入器 提供稱為 cloudFiles 的結構化串流來源。
以下是您在 Azure Databricks Delta 實時資料表中使用 Flink 資料的步驟。
在 Apache Flink® SQL 上建立 Apache Kafka® 數據表
在此步驟中,您可以在 Flink SQL 上建立 Kafka 數據表和 ADLS Gen2。 在本檔中,我們使用 airplanes_state_real_time table
。 您可以使用您選擇的任何文章。
您需要在代碼段中更新與您的 Kafka 叢集匹配的訊息代理 IP。
CREATE TABLE kafka_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'kafka',
'topic' = 'airplanes_state_real_time',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
接下來,您可以在 Flink SQL 上建立 ADLSgen2 數據表。
使用您的 ADLS Gen2 詳細數據,更新代碼段中的 container-name 和 storage-account-name。
CREATE TABLE adlsgen2_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'filesystem',
'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
'format' = 'json'
);
此外,您可以將 Kafka 數據表插入 Flink SQL 上的 ADLSgen2 資料表。
驗證 Flink 上的串流作業
在 Azure 入口網站上檢查 Azure 儲存體中的 Kafka 數據接收點
Azure 儲存體和 Azure Databricks 筆記本的驗證
ADLS Gen2 會為您的 Microsoft Entra 應用程式服務主體提供 OAuth 2.0,以從 Azure Databricks Notebook 進行驗證,然後掛接至 Azure Databricks DBFS。
讓我們取得服務主體 appid、租使用者標識碼和秘密密鑰。
授與服務主體 Azure 入口網站上記憶體 Blob 數據擁有者
在 Azure Databricks 筆記本上將 ADLS Gen2 掛載到 DBFS
準備筆記本
讓我們撰寫下列程式代碼:
%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")
定義 Delta Live Table Pipeline 並在 Azure Databricks 上執行
檢查 Azure Databricks Notebook 上的 Delta Live Table
參考
- Apache、Apache Kafka、Kafka、Apache Flink、Flink 和相關聯的開放原始碼專案名稱 Apache Software Foundation (ASF) 商標。