Compartir a través de


Incorporación de Apache Flink® DataStream en tablas Delta Lake de Azure Databricks

En este ejemplo, se muestra cómo recibir datos de streaming en Azure ADLS Gen2 desde el clúster de Apache Flink en HDInsight en AKS en tablas de Delta Lake mediante el Auto Loader de Azure Databricks.

Requisitos previos

Auto Loader de Azure Databricks

El Auto Loader de Databricks facilita la transmisión de datos al almacenamiento de objetos desde aplicaciones de Flink a tablas de Delta Lake. Auto Loader proporciona un origen de flujo estructurado denominado cloudFiles.

Estos son los pasos para usar datos de Flink en tablas de Delta Live Tables de Azure Databricks.

En este paso, puede crear una tabla de Kafka y ADLS Gen2 en SQL de Flink. En este documento, se usa un airplanes_state_real_time table. Puede usar cualquier artículo de su elección.

Debe actualizar las direcciones IP del agente con el clúster de Kafka en el fragmento de código.

CREATE TABLE kafka_airplanes_state_real_time (
   `date` STRING,
   `geo_altitude` FLOAT,
   `icao24` STRING,
   `latitude` FLOAT,
   `true_track` FLOAT,
   `velocity` FLOAT,
   `spi` BOOLEAN,
   `origin_country` STRING,
   `minute` STRING,
   `squawk` STRING,
   `sensors` STRING,
   `hour` STRING,
   `baro_altitude` FLOAT,
   `time_position` BIGINT,
   `last_contact` BIGINT,
   `callsign` STRING,
   `event_time` STRING,
   `on_ground` BOOLEAN,
   `category` STRING,
   `vertical_rate` FLOAT,
   `position_source` INT,
   `current_time` STRING,
   `longitude` FLOAT
 ) WITH (
    'connector' = 'kafka',  
    'topic' = 'airplanes_state_real_time',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

A continuación, puede crear una tabla de ADLSgen2 en SQL de Flink.

Actualice el nombre del contenedor y storage-account-name en el fragmento de código con los detalles de ADLS Gen2.

CREATE TABLE adlsgen2_airplanes_state_real_time (
  `date` STRING,
  `geo_altitude` FLOAT,
  `icao24` STRING,
  `latitude` FLOAT,
  `true_track` FLOAT,
  `velocity` FLOAT,
  `spi` BOOLEAN,
  `origin_country` STRING,
  `minute` STRING,
  `squawk` STRING,
  `sensors` STRING,
  `hour` STRING,
  `baro_altitude` FLOAT,
  `time_position` BIGINT,
  `last_contact` BIGINT,
  `callsign` STRING,
  `event_time` STRING,
  `on_ground` BOOLEAN,
  `category` STRING,
  `vertical_rate` FLOAT,
  `position_source` INT,
  `current_time` STRING,
  `longitude` FLOAT
) WITH (
    'connector' = 'filesystem',
    'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
    'format' = 'json'
);

Además, puede insertar la tabla de Kafka en la tabla ADLSgen2 en SQL de Flink.

Captura de pantalla que muestra la inserción de la tabla de Kafka en la tabla ADLSgen2.

Captura de pantalla que muestra la validación del trabajo de streaming en Flink.

Comprobación del receptor de datos de Kafka en Azure Storage en Azure Portal

Captura de pantalla que muestra la comprobación del receptor de datos de Kafka en Azure Storage.

Autenticación de Azure Storage y de un cuaderno de Azure Databricks

ADLS Gen2 proporciona OAuth 2.0 con la entidad de servicio de la aplicación de Microsoft Entra para la autenticación desde un cuaderno de Azure Databricks y, luego, montar en DBFS de Azure Databricks.

Vamos a obtener el appid de principio de servicio, el identificador de inquilino y la clave secreta.

Captura de pantalla que muestra cómo obtener el identificador de aplicación de la entidad de servicio, el identificador de inquilino y la clave secreta.

Concesión a la entidad de servicio del permiso de propietario de datos de blob de almacenamiento en Azure Portal

Captura de pantalla que muestra el principio de servicio propietario de datos de Blob Storage en Azure Portal.

Montar ADLS Gen2 en DBFS, en el cuaderno de Azure Databricks

Captura de pantalla que muestra el montaje de ADLS Gen2 en DBFS, en el cuaderno de Azure Databricks.

Preparación del notebook

Escribamos el siguiente código:

%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")

Definición de la canalización de la tabla de Delta Live Tables y ejecución en Azure Databricks

Captura de pantalla que muestra Delta Live Table Pipeline y se ejecuta en Azure Databricks.

Captura de pantalla que muestra la canalización Delta Live Table y se ejecuta en Azure Databricks.

Comprobación de la tabla Delta Live Tables en el cuaderno de Azure Databricks

Captura de pantalla que muestra la comprobación de Delta Live Table en Azure Databricks Notebook.

Referencia