Incorporare Apache Flink® DataStream in tabelle Delta Lake di Azure Databricks
Questo esempio illustra come trasferire i dati di flusso in Azure ADLS Gen2 dal cluster Apache Flink su HDInsight su AKS in tabelle Delta Lake usando Auto Loader di Azure Databricks.
Prerequisiti
- Apache Flink 1.17.0 su HDInsight su AKS
- Apache Kafka 3.2 in HDInsight
- Azure Databricks nella stessa rete virtuale di HDInsight su AKS
- ADLS Gen2 e principale del servizio
Caricatore automatico di Azure Databricks
Il caricatore automatico di Databricks semplifica lo streaming dei dati nell'archiviazione di oggetti dalle applicazioni Flink alle tabelle Delta Lake. Caricamento Automatico fornisce un'origine Structured Streaming denominata cloudFiles.
Ecco i passaggi per usare i dati da Flink nelle tabelle live delta di Azure Databricks.
Creare una tabella Apache Kafka® in Apache Flink® SQL
In questo passaggio è possibile creare una tabella Kafka e ADLS Gen2 in Flink SQL. In questo documento viene usato un airplanes_state_real_time table
. È possibile usare qualsiasi articolo di propria scelta.
È necessario aggiornare gli IP dei broker nel tuo cluster Kafka nel frammento di codice.
CREATE TABLE kafka_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'kafka',
'topic' = 'airplanes_state_real_time',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
Successivamente, è possibile creare una tabella ADLSgen2 in Flink SQL.
Aggiornare il nome del contenitore e il nome dell'account di archiviazione nel frammento di codice con i dettagli di ADLS Gen2.
CREATE TABLE adlsgen2_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'filesystem',
'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
'format' = 'json'
);
È inoltre possibile inserire la tabella Kafka nella tabella ADLSgen2 in Flink SQL.
Convalidare il processo di streaming in Flink
Controllare la destinazione dati da Kafka nell'archiviazione Azure nel portale di Azure
Autenticazione del notebook di Archiviazione di Azure e Azure Databricks
ADLS Gen2 fornisce OAuth 2.0 con l'entità servizio dell'applicazione Microsoft Entra per l'autenticazione da un notebook di Azure Databricks e quindi monta in DBFS di Azure Databricks.
Otteniamo l'ID applicazione dell'entità servizio, l'ID tenant e la chiave segreta.
Concedere il ruolo di proprietario dei dati di archiviazione Blob nel portale Azure
montare ADLS Gen2 in DBFS nel notebook di Azure Databricks
Preparare il portatile
Scrivere il codice seguente:
%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")
Definire il Delta Live Table Pipeline ed eseguirlo su Azure Databricks
Verifica Delta Live Table nel Notebook di Azure Databricks
Riferimento
- Apache, Apache Kafka, Kafka, Apache Flink, Flink e i nomi dei progetti open source associati sono marchi di Apache Software Foundation (ASF).