Condividi tramite


Incorporare Apache Flink® DataStream in tabelle Delta Lake di Azure Databricks

Questo esempio illustra come trasferire i dati di flusso in Azure ADLS Gen2 dal cluster Apache Flink su HDInsight su AKS in tabelle Delta Lake usando Auto Loader di Azure Databricks.

Prerequisiti

Caricatore automatico di Azure Databricks

Il caricatore automatico di Databricks semplifica lo streaming dei dati nell'archiviazione di oggetti dalle applicazioni Flink alle tabelle Delta Lake. Caricamento Automatico fornisce un'origine Structured Streaming denominata cloudFiles.

Ecco i passaggi per usare i dati da Flink nelle tabelle live delta di Azure Databricks.

In questo passaggio è possibile creare una tabella Kafka e ADLS Gen2 in Flink SQL. In questo documento viene usato un airplanes_state_real_time table. È possibile usare qualsiasi articolo di propria scelta.

È necessario aggiornare gli IP dei broker nel tuo cluster Kafka nel frammento di codice.

CREATE TABLE kafka_airplanes_state_real_time (
   `date` STRING,
   `geo_altitude` FLOAT,
   `icao24` STRING,
   `latitude` FLOAT,
   `true_track` FLOAT,
   `velocity` FLOAT,
   `spi` BOOLEAN,
   `origin_country` STRING,
   `minute` STRING,
   `squawk` STRING,
   `sensors` STRING,
   `hour` STRING,
   `baro_altitude` FLOAT,
   `time_position` BIGINT,
   `last_contact` BIGINT,
   `callsign` STRING,
   `event_time` STRING,
   `on_ground` BOOLEAN,
   `category` STRING,
   `vertical_rate` FLOAT,
   `position_source` INT,
   `current_time` STRING,
   `longitude` FLOAT
 ) WITH (
    'connector' = 'kafka',  
    'topic' = 'airplanes_state_real_time',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

Successivamente, è possibile creare una tabella ADLSgen2 in Flink SQL.

Aggiornare il nome del contenitore e il nome dell'account di archiviazione nel frammento di codice con i dettagli di ADLS Gen2.

CREATE TABLE adlsgen2_airplanes_state_real_time (
  `date` STRING,
  `geo_altitude` FLOAT,
  `icao24` STRING,
  `latitude` FLOAT,
  `true_track` FLOAT,
  `velocity` FLOAT,
  `spi` BOOLEAN,
  `origin_country` STRING,
  `minute` STRING,
  `squawk` STRING,
  `sensors` STRING,
  `hour` STRING,
  `baro_altitude` FLOAT,
  `time_position` BIGINT,
  `last_contact` BIGINT,
  `callsign` STRING,
  `event_time` STRING,
  `on_ground` BOOLEAN,
  `category` STRING,
  `vertical_rate` FLOAT,
  `position_source` INT,
  `current_time` STRING,
  `longitude` FLOAT
) WITH (
    'connector' = 'filesystem',
    'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
    'format' = 'json'
);

È inoltre possibile inserire la tabella Kafka nella tabella ADLSgen2 in Flink SQL.

Screenshot mostra l'inserimento della tabella Kafka nella tabella ADLSgen2.

Screenshot mostra la convalida del processo di streaming in Flink.

Controllare la destinazione dati da Kafka nell'archiviazione Azure nel portale di Azure

Screenshot mostra il controllo del sink di dati da Kafka in Archiviazione di Azure.

Autenticazione del notebook di Archiviazione di Azure e Azure Databricks

ADLS Gen2 fornisce OAuth 2.0 con l'entità servizio dell'applicazione Microsoft Entra per l'autenticazione da un notebook di Azure Databricks e quindi monta in DBFS di Azure Databricks.

Otteniamo l'ID applicazione dell'entità servizio, l'ID tenant e la chiave segreta.

Screenshot mostra come ottenere l'id appid del servizio, l'ID tenant e la chiave privata.

Concedere il ruolo di proprietario dei dati di archiviazione Blob nel portale Azure

Screenshot mostra l'entità servizio proprietario dei dati del BLOB di archiviazione nel portale di Azure.

montare ADLS Gen2 in DBFS nel notebook di Azure Databricks

screenshot mostra il montaggio di ADLS Gen2 in DBFS nel notebook di Azure Databricks.

Preparare il portatile

Scrivere il codice seguente:

%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")

Definire il Delta Live Table Pipeline ed eseguirlo su Azure Databricks

Screenshot mostra la pipeline Delta Live Table in esecuzione su Azure Databricks.

Screenshot mostra la pipeline di Delta Live Tables in esecuzione su Azure Databricks.

Verifica Delta Live Table nel Notebook di Azure Databricks

Screenshot che mostra il controllo della tabella Delta Live nel notebook di Azure Databricks.

Riferimento