将 Apache Flink® DataStream 合并到 Azure Databricks Delta Lake 表

此示例演示如何使用 Azure Databricks 自动加载功能将 Apache Flink 群集(在 AKS 上的 HDInsight)中的流数据沉入 Azure ADLS Gen2,并写入 Delta Lake 表中。

先决条件

Azure Databricks 自动加载程序

借助 Databricks 自动加载程序,可以轻松地将数据从 Flink 应用程序流式传输到 Delta Lake 表的对象存储中。 自动加载程序 提供名为 cloudFiles 的云文件的结构化流数据源。

下面是如何在 Azure Databricks delta 实时表中使用 Flink 中的数据的步骤。

在此步骤中,可以在 Flink SQL 上创建 Kafka 表和 ADLS Gen2。 在本文档中,我们使用 airplanes_state_real_time table。 可以使用您选择的任何物品。

需要在代码片段中将代理 IP 更新为与 Kafka 群集匹配。

CREATE TABLE kafka_airplanes_state_real_time (
   `date` STRING,
   `geo_altitude` FLOAT,
   `icao24` STRING,
   `latitude` FLOAT,
   `true_track` FLOAT,
   `velocity` FLOAT,
   `spi` BOOLEAN,
   `origin_country` STRING,
   `minute` STRING,
   `squawk` STRING,
   `sensors` STRING,
   `hour` STRING,
   `baro_altitude` FLOAT,
   `time_position` BIGINT,
   `last_contact` BIGINT,
   `callsign` STRING,
   `event_time` STRING,
   `on_ground` BOOLEAN,
   `category` STRING,
   `vertical_rate` FLOAT,
   `position_source` INT,
   `current_time` STRING,
   `longitude` FLOAT
 ) WITH (
    'connector' = 'kafka',  
    'topic' = 'airplanes_state_real_time',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

接下来,可以在 Flink SQL 上创建 ADLSgen2 表。

使用 ADLS Gen2 详细信息更新代码片段中的容器名称和存储帐户名称。

CREATE TABLE adlsgen2_airplanes_state_real_time (
  `date` STRING,
  `geo_altitude` FLOAT,
  `icao24` STRING,
  `latitude` FLOAT,
  `true_track` FLOAT,
  `velocity` FLOAT,
  `spi` BOOLEAN,
  `origin_country` STRING,
  `minute` STRING,
  `squawk` STRING,
  `sensors` STRING,
  `hour` STRING,
  `baro_altitude` FLOAT,
  `time_position` BIGINT,
  `last_contact` BIGINT,
  `callsign` STRING,
  `event_time` STRING,
  `on_ground` BOOLEAN,
  `category` STRING,
  `vertical_rate` FLOAT,
  `position_source` INT,
  `current_time` STRING,
  `longitude` FLOAT
) WITH (
    'connector' = 'filesystem',
    'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
    'format' = 'json'
);

此外,可以将 Kafka 表插入 Flink SQL 上的 ADLSgen2 表。

屏幕截图显示将 Kafka 表插入 ADLSgen2 表中。

屏幕截图显示在 Flink 上验证流式处理作业是否正常运行。

在 Azure 门户的 Azure 存储中检查 Kafka 的数据汇

屏幕截图显示检查 Azure 存储上 Kafka 的数据汇集。

Azure 存储与 Azure Databricks 笔记本的身份验证

ADLS Gen2 通过 Microsoft Entra 应用程序服务主体提供 OAuth 2.0 验证,以便从 Azure Databricks 笔记本进行身份验证,然后挂载到 Azure Databricks 的 DBFS。

让我们获取服务主体 appid、租户 ID 和密钥。

屏幕截图显示获取服务主体 appid、租户 ID 和密钥。

在 Azure 门户上授予服务主体的存储 Blob 数据所有者权限

屏幕截图显示了 Azure 门户上的存储 Blob 数据所有者的服务主体。

在 Azure Databricks 笔记本 将 ADLS Gen2 装载到 DBFS

屏幕截图显示了在 Azure Databricks 笔记本上将 ADLS Gen2 装载到 DBFS 中。

准备笔记本

让我们编写以下代码:

%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")

定义 Delta 增量实时表管道并在 Azure Databricks 上运行

屏幕截图显示了增量实时表管道并在 Azure Databricks 上运行。

屏幕截图显示了 Delta Live Table 管道,并在 Azure Databricks 上运行。

在 Azure Databricks Notebook 上检查 Delta Live Table

屏幕截图显示在 Azure Databricks Notebook 上检查 Delta Live Table。

参考

  • Apache、Apache Kafka、Kafka、Apache Flink、Flink 和关联的开源项目名称 Apache Software Foundation(ASF) 商标。