Incorporación de Apache Flink® DataStream en tablas de Azure Databricks Delta Lake
En este ejemplo se muestra cómo ingerir datos de flujo en Azure ADLS Gen2 desde un clúster de Apache Flink en HDInsight sobre AKS a tablas de Delta Lake mediante Azure Databricks Auto Loader.
Prerrequisitos
- Apache Flink 1.17.0 en HDInsight en AKS
- Apache Kafka 3.2 en HDInsight
- Azure Databricks en la misma red virtual que HDInsight en AKS
- ADLS Gen2 y la entidad de servicio
Cargador automático de Azure Databricks
Databricks Auto Loader facilita la transmisión de datos al almacenamiento de objetos desde aplicaciones de Flink a tablas de Delta Lake. Auto Loader proporciona un origen de Structured Streaming denominado cloudFiles.
Estos son los pasos para usar datos de Flink en tablas dinámicas delta de Azure Databricks.
Creación de una tabla de Apache Kafka® en Apache Flink® SQL
En este paso, puede crear una tabla de Kafka y ADLS Gen2 en Flink SQL. En este documento, se usa un airplanes_state_real_time table
. Puede usar cualquier artículo de su elección.
Debe actualizar las direcciones IP de los brokers del clúster de Kafka en el fragmento de código.
CREATE TABLE kafka_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'kafka',
'topic' = 'airplanes_state_real_time',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
A continuación, puede crear una tabla de ADLSgen2 en Flink SQL.
Actualice el nombre del contenedor y el nombre de la cuenta de almacenamiento en el fragmento de código con sus detalles de ADLS Gen2.
CREATE TABLE adlsgen2_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'filesystem',
'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
'format' = 'json'
);
Además, puede insertar la tabla de Kafka en la tabla ADLSgen2 en Flink SQL.
Validación del trabajo de streaming en Flink
Verificar el destino de datos de Kafka en Azure Storage en el portal de Azure
Autenticación de Azure Storage y cuaderno de Azure Databricks
ADLS Gen2 proporciona OAuth 2.0 a la entidad de servicio de la aplicación Microsoft Entra para la autenticación desde un cuaderno de Azure Databricks y, a continuación, montar en DBFS de Azure Databricks.
Vamos a obtener el identificador de aplicación de la entidad de servicio, el identificador de inquilino y la clave secreta.
Conceder el principio de servicio al propietario de datos de blobs de almacenamiento en el portal de Azure
montar ADLS Gen2 en DBFS, en el cuaderno de Azure Databricks
Preparar cuaderno
Vamos a escribir el código siguiente:
%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")
Definir la canalización de la Tabla Delta Live y ejecutarla en Azure Databricks
Comprobación de Delta Live Table en Azure Databricks Notebook
Referencia
- Apache, Apache Kafka, Kafka, Apache Flink, Flink y los nombres de los proyectos de código abierto asociados son marcas comerciales de la Apache Software Foundation (ASF).