Incorporer Apache Flink® DataStream dans Azure Databricks Delta Lake tables
Cet exemple montre comment acheminer des données de flux dans Azure ADLS Gen2 à partir d’un cluster Apache Flink sur HDInsight sur AKS vers des tables Delta Lake à l’aide d’Azure Databricks Auto Loader.
Conditions préalables
- Apache Flink 1.17.0 sur HDInsight sur AKS
- Apache Kafka 3.2 sur HDInsight
- Azure Databricks dans le même réseau virtuel que HDInsight sur AKS
- ADLS Gen2 et principal du service
Chargeur automatique Azure Databricks
Databricks Auto Loader facilite l'acheminement du flux de données vers le stockage d'objets, provenant des applications Flink, vers des tables Delta Lake. Auto Loader fournit une source de streaming structurée appelée cloudFiles.
Voici les étapes à suivre pour utiliser les données de Flink dans les tables dynamiques delta Azure Databricks.
Créer une table Apache Kafka® sur Apache Flink® SQL
Dans cette étape, vous pouvez créer une table Kafka et ADLS Gen2 sur Flink SQL. Dans ce document, nous utilisons un airplanes_state_real_time table
. Vous pouvez utiliser n’importe quel article de votre choix.
Vous devez mettre à jour les adresses IP des brokers dans votre cluster Kafka dans l'extrait de code.
CREATE TABLE kafka_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'kafka',
'topic' = 'airplanes_state_real_time',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
Ensuite, vous pouvez créer une table ADLSgen2 sur Flink SQL.
Mettez à jour le nom du conteneur et le nom du compte de stockage dans l’extrait de code avec vos détails ADLS Gen2.
CREATE TABLE adlsgen2_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'filesystem',
'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
'format' = 'json'
);
En outre, vous pouvez insérer une table Kafka dans une table ADLSgen2 sur Flink SQL.
Valider le travail de diffusion en continu sur Flink
Vérifier le puits de données de Kafka dans Azure Storage sur le portail Azure
Authentification de Stockage Azure et du notebook Azure Databricks
ADLS Gen2 fournit OAuth 2.0 à votre principal de service d’application Microsoft Entra pour l’authentification depuis un notebook Azure Databricks et ensuite effectuer le montage sur le DBFS d'Azure Databricks.
Obtenons l'ID d'application du principal de service, l'ID de locataire et la clé secrète.
Accorder le rôle de propriétaire des données BLOB de stockage sur le portail Azure
Monter ADLS Gen2 dans DBFS, sur un notebook Azure Databricks
Préparer un cahier
Nous allons écrire le code suivant :
%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")
Définir le pipeline Delta Live Table et s’exécuter sur Azure Databricks
Vérifier delta Live Table sur Azure Databricks Notebook
Référence
- Apache, Apache Kafka, Kafka, Apache Flink, Flink et les noms de projet open source associés sont marques déposées de Apache Software Foundation (ASF).