将 Apache Flink® DataStream 合并到 Azure Databricks Delta Lake 表
此示例演示如何使用 Azure Databricks 自动加载功能将 Apache Flink 群集(在 AKS 上的 HDInsight)中的流数据沉入 Azure ADLS Gen2,并写入 Delta Lake 表中。
先决条件
- 在 AKS 上的 HDInsight 上运行的 Apache Flink 1.17.0
- HDInsight 上的 Apache Kafka 3.2
- AKS 上的 HDInsight 所在的同一虚拟网络中的 Azure Databricks
- ADLS Gen2 和服务主体
Azure Databricks 自动加载程序
借助 Databricks 自动加载程序,可以轻松地将数据从 Flink 应用程序流式传输到 Delta Lake 表的对象存储中。 自动加载程序 提供名为 cloudFiles 的云文件的结构化流数据源。
下面是如何在 Azure Databricks delta 实时表中使用 Flink 中的数据的步骤。
在 Apache Flink® SQL 上创建 Apache Kafka® 表
在此步骤中,可以在 Flink SQL 上创建 Kafka 表和 ADLS Gen2。 在本文档中,我们使用 airplanes_state_real_time table
。 可以使用您选择的任何物品。
需要在代码片段中将代理 IP 更新为与 Kafka 群集匹配。
CREATE TABLE kafka_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'kafka',
'topic' = 'airplanes_state_real_time',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
接下来,可以在 Flink SQL 上创建 ADLSgen2 表。
使用 ADLS Gen2 详细信息更新代码片段中的容器名称和存储帐户名称。
CREATE TABLE adlsgen2_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'filesystem',
'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
'format' = 'json'
);
此外,可以将 Kafka 表插入 Flink SQL 上的 ADLSgen2 表。
在 Flink 上验证流式处理作业
在 Azure 门户的 Azure 存储中检查 Kafka 的数据汇
Azure 存储与 Azure Databricks 笔记本的身份验证
ADLS Gen2 通过 Microsoft Entra 应用程序服务主体提供 OAuth 2.0 验证,以便从 Azure Databricks 笔记本进行身份验证,然后挂载到 Azure Databricks 的 DBFS。
让我们获取服务主体 appid、租户 ID 和密钥。
在 Azure 门户上授予服务主体的存储 Blob 数据所有者权限
在 Azure Databricks 笔记本 上 将 ADLS Gen2 装载到 DBFS
准备笔记本
让我们编写以下代码:
%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")
定义 Delta 增量实时表管道并在 Azure Databricks 上运行
在 Azure Databricks Notebook 上检查 Delta Live Table
参考
- Apache、Apache Kafka、Kafka、Apache Flink、Flink 和关联的开源项目名称 Apache Software Foundation(ASF) 商标。