Включение Apache Flink® DataStream в таблицы Azure Databricks Delta Lake
В этом примере показано, как данные из потока в Azure ADLS Gen2 от кластера Apache Flink на HDInsight в AKS записываются в таблицы Delta Lake с использованием автозагрузчика Azure Databricks.
Необходимые условия
- Apache Flink 1.17.0 в HDInsight в AKS
- Apache Kafka 3.2 в HDInsight
- Azure Databricks в той же виртуальной сети, что и HDInsight в AKS
- Хранилище данных Azure Gen2 и служебный принципал
Автозагрузчик Azure Databricks
Автозагрузчик Databricks облегчает организацию потоковой загрузки данных из приложений Flink в хранилище объектов и затем в таблицы Delta Lake. Auto Loader предоставляет источник для структурированной потоковой передачи, называемый cloudFiles.
Ниже приведены инструкции по использованию данных из Flink в динамических таблицах Azure Databricks delta.
Создание таблицы Apache Kafka® в Apache Flink® SQL
На этом шаге можно создать таблицу Kafka и ADLS 2-го поколения в Flink SQL. В этом документе мы используем airplanes_state_real_time table
. Вы можете использовать любую статью по своему усмотрению.
Необходимо обновить IP-адреса брокера с помощью кластера Kafka в фрагменте кода.
CREATE TABLE kafka_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'kafka',
'topic' = 'airplanes_state_real_time',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
Затем можно создать таблицу ADLSgen2 в Flink SQL.
Обновите имя контейнера и имя учетной записи хранения в фрагменте кода с подробными сведениями ADLS 2-го поколения.
CREATE TABLE adlsgen2_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'filesystem',
'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
'format' = 'json'
);
Кроме того, можно вставить таблицу Kafka в таблицу ADLSgen2 в Flink SQL.
Проверка задания потоковой передачи в Flink
Проверка приема данных из Kafka в хранилище Azure на портале Azure
Аутентификация службы хранилища Azure и записной книжки Azure Databricks
ADLS Gen2 предоставляет OAuth 2.0 с вашим принципалом службы приложения Microsoft Entra для аутентификации из записной книжки Azure Databricks, а затем монтирования в DBFS Azure Databricks.
Давайте получим идентификатор приложения, ID клиента и секретный ключ учетной записи службы.
принцип предоставления службы владельцу данных BLOB-объектов хранилища на портале Azure
подключение ADLS 2-го поколения к DBFS в записной книжке Azure Databricks
Подготовьте записную книжку
Давайте напишем следующий код:
%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")
Определение конвейера Delta Live Table и запуск на платформе Azure Databricks
Проверка динамической таблицы Delta в записной книжке Azure Databricks
Ссылка
- Apache, Apache Kafka, Kafka, Apache Flink, Flink и связанные имена проектов с открытым кодом являются товарными знакамиApache Software Foundation (ASF).