Compartilhar via


Incorporar o Apache Flink® DataStream em as tabelas Delta Lake do Azure Databricks

Este exemplo mostra como transferir dados de fluxo no Azure Data Lake Storage Gen2 a partir de um cluster Apache Flink no HDInsight em execução no AKS para tabelas do Delta Lake usando o Carregador Automático do Azure Databricks.

Pré-requisitos

Carregador Automático do Azure Databricks

O Auto Loader do Databricks facilita o streaming de dados de aplicativos Flink para o armazenamento em objetos e o despejo em tabelas Delta Lake. do Carregador Automático fornece uma fonte de Streaming Estruturada chamada cloudFiles.

Aqui estão as etapas de como você pode usar dados do Flink em tabelas dinâmicas delta do Azure Databricks.

Nesta etapa, você pode criar a tabela Kafka e o ADLS Gen2 no Flink SQL. Neste documento, estamos usando um airplanes_state_real_time table. Você pode usar qualquer artigo de sua escolha.

Você precisa atualizar os IPs dos brokers no seu cluster Kafka no trecho de código.

CREATE TABLE kafka_airplanes_state_real_time (
   `date` STRING,
   `geo_altitude` FLOAT,
   `icao24` STRING,
   `latitude` FLOAT,
   `true_track` FLOAT,
   `velocity` FLOAT,
   `spi` BOOLEAN,
   `origin_country` STRING,
   `minute` STRING,
   `squawk` STRING,
   `sensors` STRING,
   `hour` STRING,
   `baro_altitude` FLOAT,
   `time_position` BIGINT,
   `last_contact` BIGINT,
   `callsign` STRING,
   `event_time` STRING,
   `on_ground` BOOLEAN,
   `category` STRING,
   `vertical_rate` FLOAT,
   `position_source` INT,
   `current_time` STRING,
   `longitude` FLOAT
 ) WITH (
    'connector' = 'kafka',  
    'topic' = 'airplanes_state_real_time',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

Em seguida, você pode criar a tabela ADLSgen2 no Flink SQL.

Atualize o nome do contêiner e o nome da conta de armazenamento no snippet de código com os detalhes do ADLS Gen2.

CREATE TABLE adlsgen2_airplanes_state_real_time (
  `date` STRING,
  `geo_altitude` FLOAT,
  `icao24` STRING,
  `latitude` FLOAT,
  `true_track` FLOAT,
  `velocity` FLOAT,
  `spi` BOOLEAN,
  `origin_country` STRING,
  `minute` STRING,
  `squawk` STRING,
  `sensors` STRING,
  `hour` STRING,
  `baro_altitude` FLOAT,
  `time_position` BIGINT,
  `last_contact` BIGINT,
  `callsign` STRING,
  `event_time` STRING,
  `on_ground` BOOLEAN,
  `category` STRING,
  `vertical_rate` FLOAT,
  `position_source` INT,
  `current_time` STRING,
  `longitude` FLOAT
) WITH (
    'connector' = 'filesystem',
    'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
    'format' = 'json'
);

Além disso, você pode inserir a tabela Kafka na tabela ADLSgen2 no Flink SQL.

Captura de tela mostra a inserção da tabela Kafka na tabela ADLSgen2.

Captura de tela mostra a validação do trabalho de streaming no Flink.

Verificar o coletor de dados do Kafka no Armazenamento do Azure no portal do Azure

Captura de tela mostra o coletor de dados de verificação do Kafka no Armazenamento do Azure.

Autenticação do Azure Storage e do notebook do ambiente Azure Databricks

O ADLS Gen2 fornece OAuth 2.0 com o principal de serviço do aplicativo Microsoft Entra para autenticação a partir de um notebook Azure Databricks e depois montar no DBFS do Azure Databricks.

Vamos obter o App ID, ID do locatário e a chave secreta do principal de serviço.

A captura de tela mostra o appid, o ID do locatário e a chave secreta do serviço principal.

Princípio de concessão de serviço ao Proprietário do Blob de Dados de Armazenamento no portal do Azure

Captura de tela mostra o princípio do serviço do Proprietário de Dados do Blob de Armazenamento no portal do Azure.

Montar o ADLS Gen2 no DBFS, em um notebook do Azure Databricks

Captura de tela mostra a montagem do ADLS Gen2 no DBFS, no notebook do Azure Databricks.

Preparar caderno

Vamos escrever o seguinte código:

%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")

Definir o Pipeline de Tabela Delta Live e executar no Azure Databricks

A captura de tela mostra o Pipeline de Tabela Delta em execução ao vivo no Azure Databricks.

A captura de tela mostra o pipeline Delta Live Table e sua execução no Azure Databricks.

Verificar Delta Live Table no Notebook do Azure Databricks

A captura de tela mostra a verificação da Tabela Delta Live no Azure Databricks Notebook.

Referência