Поделиться через


Включение Apache Flink® DataStream в таблицы Azure Databricks Delta Lake

В этом примере показано, как приемник потоковых данных в Azure ADLS 2-го поколения из кластера Apache Flink в HDInsight в AKS в таблицы Delta Lake с помощью автозагрузчика Azure Databricks.

Необходимые компоненты

Автозагрузчик Azure Databricks

Автозагрузчик Databricks упрощает потоковую передачу данных в хранилище объектов из приложений Flink в таблицы Delta Lake. Автозагрузчик предоставляет источник структурированной потоковой передачи, называемый cloudFiles.

Ниже приведены инструкции по использованию данных из Flink в динамических таблицах Azure Databricks delta.

На этом шаге можно создать таблицу Kafka и ADLS 2-го поколения в Flink SQL. В этом документе мы используем .airplanes_state_real_time table Вы можете использовать любую статью по своему усмотрению.

Необходимо обновить IP-адреса брокера с помощью кластера Kafka в фрагменте кода.

CREATE TABLE kafka_airplanes_state_real_time (
   `date` STRING,
   `geo_altitude` FLOAT,
   `icao24` STRING,
   `latitude` FLOAT,
   `true_track` FLOAT,
   `velocity` FLOAT,
   `spi` BOOLEAN,
   `origin_country` STRING,
   `minute` STRING,
   `squawk` STRING,
   `sensors` STRING,
   `hour` STRING,
   `baro_altitude` FLOAT,
   `time_position` BIGINT,
   `last_contact` BIGINT,
   `callsign` STRING,
   `event_time` STRING,
   `on_ground` BOOLEAN,
   `category` STRING,
   `vertical_rate` FLOAT,
   `position_source` INT,
   `current_time` STRING,
   `longitude` FLOAT
 ) WITH (
    'connector' = 'kafka',  
    'topic' = 'airplanes_state_real_time',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

Затем можно создать таблицу ADLSgen2 в Flink SQL.

Обновите имя контейнера и имя учетной записи хранения в фрагменте кода с подробными сведениями ADLS 2-го поколения.

CREATE TABLE adlsgen2_airplanes_state_real_time (
  `date` STRING,
  `geo_altitude` FLOAT,
  `icao24` STRING,
  `latitude` FLOAT,
  `true_track` FLOAT,
  `velocity` FLOAT,
  `spi` BOOLEAN,
  `origin_country` STRING,
  `minute` STRING,
  `squawk` STRING,
  `sensors` STRING,
  `hour` STRING,
  `baro_altitude` FLOAT,
  `time_position` BIGINT,
  `last_contact` BIGINT,
  `callsign` STRING,
  `event_time` STRING,
  `on_ground` BOOLEAN,
  `category` STRING,
  `vertical_rate` FLOAT,
  `position_source` INT,
  `current_time` STRING,
  `longitude` FLOAT
) WITH (
    'connector' = 'filesystem',
    'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
    'format' = 'json'
);

Кроме того, можно вставить таблицу Kafka в таблицу ADLSgen2 в Flink SQL.

Снимок экрана: вставка таблицы Kafka в таблицу ADLSgen2.

Снимок экрана: проверка задания потоковой передачи в Flink.

Проверьте приемник данных из Kafka в служба хранилища Azure на портал Azure

Снимок экрана: приемник данных проверка из Kafka на служба хранилища Azure.

Проверка подлинности записной книжки служба хранилища Azure и Azure Databricks

ADLS 2-го поколения предоставляет OAuth 2.0 субъекту-службе приложений Microsoft Entra для проверки подлинности из записной книжки Azure Databricks, а затем подключиться к DBFS Azure Databricks.

Давайте получим идентификатор приложения, идентификатор клиента и секретный ключ принципа службы.

Снимок экрана: получение идентификатора приложения, идентификатора клиента и секретного ключа принципа службы.

Предоставление принципу службы служба хранилища владельца данных BLOB-объектов портал Azure

Снимок экрана: принцип службы служба хранилища владельца данных BLOB-объектов в портал Azure.

Подключение ADLS 2-го поколения к DBFS в записной книжке Azure Databricks

Снимок экрана: подключение ADLS 2-го поколения к DBFS в записной книжке Azure Databricks.

Подготовка записной книжки

Давайте напишем следующий код:

%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")

Определение конвейера динамической таблицы Delta Live Table и запуск в Azure Databricks

Снимок экрана: конвейер разностной динамической таблицы и запуск в Azure Databricks.

Снимок экрана: конвейер разностной динамической таблицы и запуск в Azure Databricks.

Проверка динамической таблицы Delta в записной книжке Azure Databricks

Снимок экрана: проверка Delta Live Table в Записной книжке Azure Databricks.

Справочные материалы