Partager via


Incorporer Apache Flink® DataStream dans Azure Databricks Delta Lake tables

Cet exemple montre comment acheminer des données de flux dans Azure ADLS Gen2 à partir d’un cluster Apache Flink sur HDInsight sur AKS vers des tables Delta Lake à l’aide d’Azure Databricks Auto Loader.

Conditions préalables

Chargeur automatique Azure Databricks

Databricks Auto Loader facilite l'acheminement du flux de données vers le stockage d'objets, provenant des applications Flink, vers des tables Delta Lake. Auto Loader fournit une source de streaming structurée appelée cloudFiles.

Voici les étapes à suivre pour utiliser les données de Flink dans les tables dynamiques delta Azure Databricks.

Dans cette étape, vous pouvez créer une table Kafka et ADLS Gen2 sur Flink SQL. Dans ce document, nous utilisons un airplanes_state_real_time table. Vous pouvez utiliser n’importe quel article de votre choix.

Vous devez mettre à jour les adresses IP des brokers dans votre cluster Kafka dans l'extrait de code.

CREATE TABLE kafka_airplanes_state_real_time (
   `date` STRING,
   `geo_altitude` FLOAT,
   `icao24` STRING,
   `latitude` FLOAT,
   `true_track` FLOAT,
   `velocity` FLOAT,
   `spi` BOOLEAN,
   `origin_country` STRING,
   `minute` STRING,
   `squawk` STRING,
   `sensors` STRING,
   `hour` STRING,
   `baro_altitude` FLOAT,
   `time_position` BIGINT,
   `last_contact` BIGINT,
   `callsign` STRING,
   `event_time` STRING,
   `on_ground` BOOLEAN,
   `category` STRING,
   `vertical_rate` FLOAT,
   `position_source` INT,
   `current_time` STRING,
   `longitude` FLOAT
 ) WITH (
    'connector' = 'kafka',  
    'topic' = 'airplanes_state_real_time',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

Ensuite, vous pouvez créer une table ADLSgen2 sur Flink SQL.

Mettez à jour le nom du conteneur et le nom du compte de stockage dans l’extrait de code avec vos détails ADLS Gen2.

CREATE TABLE adlsgen2_airplanes_state_real_time (
  `date` STRING,
  `geo_altitude` FLOAT,
  `icao24` STRING,
  `latitude` FLOAT,
  `true_track` FLOAT,
  `velocity` FLOAT,
  `spi` BOOLEAN,
  `origin_country` STRING,
  `minute` STRING,
  `squawk` STRING,
  `sensors` STRING,
  `hour` STRING,
  `baro_altitude` FLOAT,
  `time_position` BIGINT,
  `last_contact` BIGINT,
  `callsign` STRING,
  `event_time` STRING,
  `on_ground` BOOLEAN,
  `category` STRING,
  `vertical_rate` FLOAT,
  `position_source` INT,
  `current_time` STRING,
  `longitude` FLOAT
) WITH (
    'connector' = 'filesystem',
    'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
    'format' = 'json'
);

En outre, vous pouvez insérer une table Kafka dans une table ADLSgen2 sur Flink SQL.

Capture d’écran montrant l’insertion d’une table Kafka dans une table ADLSgen2.

Capture d’écran montrant la validation du travail de streaming sur Flink.

Vérifier le puits de données de Kafka dans Azure Storage sur le portail Azure

Capture d’écran montrant la vérification du récepteur de données à partir de Kafka sur Azure Storage.

Authentification de Stockage Azure et du notebook Azure Databricks

ADLS Gen2 fournit OAuth 2.0 à votre principal de service d’application Microsoft Entra pour l’authentification depuis un notebook Azure Databricks et ensuite effectuer le montage sur le DBFS d'Azure Databricks.

Obtenons l'ID d'application du principal de service, l'ID de locataire et la clé secrète.

Capture d’écran montrant obtenir l’appid du principe de service, l’ID de locataire et la clé secrète.

Accorder le rôle de propriétaire des données BLOB de stockage sur le portail Azure

Capture d’écran montrant le principe du service propriétaire des données blob du stockage sur le portail Azure.

Monter ADLS Gen2 dans DBFS, sur un notebook Azure Databricks

Capture d’écran montrant monter ADLS Gen2 dans DBFS, sur un notebook Azure Databricks.

Préparer un cahier

Nous allons écrire le code suivant :

%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")

Définir le pipeline Delta Live Table et s’exécuter sur Azure Databricks

La capture d’écran montrant le pipeline Delta Live Table et son exécution sur Azure Databricks.

Capture d’écran de Delta Live Table Pipeline en cours d'exécution sur Azure Databricks.

Vérifier delta Live Table sur Azure Databricks Notebook

Capture d’écran montrant la vérification de Delta Live Table sur Azure Databricks Notebook.

Référence