Поделиться через


Включение Apache Flink® DataStream в таблицы Azure Databricks Delta Lake

В этом примере показано, как данные из потока в Azure ADLS Gen2 от кластера Apache Flink на HDInsight в AKS записываются в таблицы Delta Lake с использованием автозагрузчика Azure Databricks.

Необходимые условия

Автозагрузчик Azure Databricks

Автозагрузчик Databricks облегчает организацию потоковой загрузки данных из приложений Flink в хранилище объектов и затем в таблицы Delta Lake. Auto Loader предоставляет источник для структурированной потоковой передачи, называемый cloudFiles.

Ниже приведены инструкции по использованию данных из Flink в динамических таблицах Azure Databricks delta.

На этом шаге можно создать таблицу Kafka и ADLS 2-го поколения в Flink SQL. В этом документе мы используем airplanes_state_real_time table. Вы можете использовать любую статью по своему усмотрению.

Необходимо обновить IP-адреса брокера с помощью кластера Kafka в фрагменте кода.

CREATE TABLE kafka_airplanes_state_real_time (
   `date` STRING,
   `geo_altitude` FLOAT,
   `icao24` STRING,
   `latitude` FLOAT,
   `true_track` FLOAT,
   `velocity` FLOAT,
   `spi` BOOLEAN,
   `origin_country` STRING,
   `minute` STRING,
   `squawk` STRING,
   `sensors` STRING,
   `hour` STRING,
   `baro_altitude` FLOAT,
   `time_position` BIGINT,
   `last_contact` BIGINT,
   `callsign` STRING,
   `event_time` STRING,
   `on_ground` BOOLEAN,
   `category` STRING,
   `vertical_rate` FLOAT,
   `position_source` INT,
   `current_time` STRING,
   `longitude` FLOAT
 ) WITH (
    'connector' = 'kafka',  
    'topic' = 'airplanes_state_real_time',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

Затем можно создать таблицу ADLSgen2 в Flink SQL.

Обновите имя контейнера и имя учетной записи хранения в фрагменте кода с подробными сведениями ADLS 2-го поколения.

CREATE TABLE adlsgen2_airplanes_state_real_time (
  `date` STRING,
  `geo_altitude` FLOAT,
  `icao24` STRING,
  `latitude` FLOAT,
  `true_track` FLOAT,
  `velocity` FLOAT,
  `spi` BOOLEAN,
  `origin_country` STRING,
  `minute` STRING,
  `squawk` STRING,
  `sensors` STRING,
  `hour` STRING,
  `baro_altitude` FLOAT,
  `time_position` BIGINT,
  `last_contact` BIGINT,
  `callsign` STRING,
  `event_time` STRING,
  `on_ground` BOOLEAN,
  `category` STRING,
  `vertical_rate` FLOAT,
  `position_source` INT,
  `current_time` STRING,
  `longitude` FLOAT
) WITH (
    'connector' = 'filesystem',
    'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
    'format' = 'json'
);

Кроме того, можно вставить таблицу Kafka в таблицу ADLSgen2 в Flink SQL.

снимок экрана: вставка таблицы Kafka в таблицу ADLSgen2.

На скриншоте показана проверка задания потоковой обработки в Flink.

Проверка приема данных из Kafka в хранилище Azure на портале Azure

Снимок экрана показывает проверку приемника данных из Kafka в хранилище Azure.

Аутентификация службы хранилища Azure и записной книжки Azure Databricks

ADLS Gen2 предоставляет OAuth 2.0 с вашим принципалом службы приложения Microsoft Entra для аутентификации из записной книжки Azure Databricks, а затем монтирования в DBFS Azure Databricks.

Давайте получим идентификатор приложения, ID клиента и секретный ключ учетной записи службы.

Снимок экрана показывает получение идентификатора приложения, идентификатора клиента и секретного ключа принципала службы.

принцип предоставления службы владельцу данных BLOB-объектов хранилища на портале Azure

снимок экрана: принцип службы владельца данных BLOB-объектов хранилища на портале Azure.

подключение ADLS 2-го поколения к DBFS в записной книжке Azure Databricks

Скриншот, показывающий подключение ADLS Gen2 к DBFS в записной книжке Azure Databricks.

Подготовьте записную книжку

Давайте напишем следующий код:

%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")

Определение конвейера Delta Live Table и запуск на платформе Azure Databricks

Снимок экрана показывает конвейер Delta Live Table и запуск в Azure Databricks.

Снимок экрана показывает конвейер Delta Live Table и запуск в Azure Databricks.

Проверка динамической таблицы Delta в записной книжке Azure Databricks

снимок экрана: проверка Delta Live Table в записной книжке Azure Databricks.

Ссылка