Incorporación de Apache Flink® DataStream en tablas Delta Lake de Azure Databricks
En este ejemplo, se muestra cómo recibir datos de streaming en Azure ADLS Gen2 desde el clúster de Apache Flink en HDInsight en AKS en tablas de Delta Lake mediante el Auto Loader de Azure Databricks.
Requisitos previos
- Apache Flink 1.17.0 en HDInsight en AKS
- Apache Kafka 3.2 en HDInsight
- Azure Databricks en la misma red virtual que HDInsight en AKS
- ADLS Gen2 y entidad de servicio
Auto Loader de Azure Databricks
El Auto Loader de Databricks facilita la transmisión de datos al almacenamiento de objetos desde aplicaciones de Flink a tablas de Delta Lake. Auto Loader proporciona un origen de flujo estructurado denominado cloudFiles.
Estos son los pasos para usar datos de Flink en tablas de Delta Live Tables de Azure Databricks.
Creación de una tabla de Apache Kafka® en SQL de Apache Flink®
En este paso, puede crear una tabla de Kafka y ADLS Gen2 en SQL de Flink. En este documento, se usa un airplanes_state_real_time table
. Puede usar cualquier artículo de su elección.
Debe actualizar las direcciones IP del agente con el clúster de Kafka en el fragmento de código.
CREATE TABLE kafka_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'kafka',
'topic' = 'airplanes_state_real_time',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
A continuación, puede crear una tabla de ADLSgen2 en SQL de Flink.
Actualice el nombre del contenedor y storage-account-name en el fragmento de código con los detalles de ADLS Gen2.
CREATE TABLE adlsgen2_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'filesystem',
'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
'format' = 'json'
);
Además, puede insertar la tabla de Kafka en la tabla ADLSgen2 en SQL de Flink.
Validación del trabajo de streaming en Flink
Comprobación del receptor de datos de Kafka en Azure Storage en Azure Portal
Autenticación de Azure Storage y de un cuaderno de Azure Databricks
ADLS Gen2 proporciona OAuth 2.0 con la entidad de servicio de la aplicación de Microsoft Entra para la autenticación desde un cuaderno de Azure Databricks y, luego, montar en DBFS de Azure Databricks.
Vamos a obtener el appid de principio de servicio, el identificador de inquilino y la clave secreta.
Concesión a la entidad de servicio del permiso de propietario de datos de blob de almacenamiento en Azure Portal
Montar ADLS Gen2 en DBFS, en el cuaderno de Azure Databricks
Preparación del notebook
Escribamos el siguiente código:
%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")
Definición de la canalización de la tabla de Delta Live Tables y ejecución en Azure Databricks
Comprobación de la tabla Delta Live Tables en el cuaderno de Azure Databricks
Referencia
- Apache, Apache Kafka, Kafka, Apache Flink, Flink y los nombres de proyecto de código abierto asociados son marcas comerciales de Apache Software Foundation (ASF).