Partilhar via


Incorporar o Apache Flink® DataStream nas tabelas Delta Lake do Azure Databricks

Este exemplo demonstra como inserir dados de fluxo no Azure ADLS Gen2 a partir de um cluster Apache Flink no HDInsight em AKS em tabelas Delta Lake, utilizando o Auto Loader do Azure Databricks.

Pré-requisitos

Azure Databricks Auto Loader

O Databricks Auto Loader facilita o fluxo de dados, a partir de aplicativos Flink, para o armazenamento de objetos e depois para as tabelas Delta Lake. Auto Loader fornece uma fonte de streaming estruturado chamada cloudFiles.

Aqui estão as etapas de como você pode usar dados do Flink em tabelas dinâmicas delta do Azure Databricks.

Nesta etapa, você pode criar a tabela Kafka e o ADLS Gen2 no Flink SQL. Neste documento, estamos usando um airplanes_state_real_time table. Você pode usar qualquer artigo de sua escolha.

Você precisa atualizar os IPs do broker com seu cluster Kafka no trecho de código.

CREATE TABLE kafka_airplanes_state_real_time (
   `date` STRING,
   `geo_altitude` FLOAT,
   `icao24` STRING,
   `latitude` FLOAT,
   `true_track` FLOAT,
   `velocity` FLOAT,
   `spi` BOOLEAN,
   `origin_country` STRING,
   `minute` STRING,
   `squawk` STRING,
   `sensors` STRING,
   `hour` STRING,
   `baro_altitude` FLOAT,
   `time_position` BIGINT,
   `last_contact` BIGINT,
   `callsign` STRING,
   `event_time` STRING,
   `on_ground` BOOLEAN,
   `category` STRING,
   `vertical_rate` FLOAT,
   `position_source` INT,
   `current_time` STRING,
   `longitude` FLOAT
 ) WITH (
    'connector' = 'kafka',  
    'topic' = 'airplanes_state_real_time',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

Em seguida, você pode criar a tabela ADLSgen2 no Flink SQL.

Atualize o nome do contêiner e o nome da conta de armazenamento no trecho de código com os detalhes do ADLS Gen2.

CREATE TABLE adlsgen2_airplanes_state_real_time (
  `date` STRING,
  `geo_altitude` FLOAT,
  `icao24` STRING,
  `latitude` FLOAT,
  `true_track` FLOAT,
  `velocity` FLOAT,
  `spi` BOOLEAN,
  `origin_country` STRING,
  `minute` STRING,
  `squawk` STRING,
  `sensors` STRING,
  `hour` STRING,
  `baro_altitude` FLOAT,
  `time_position` BIGINT,
  `last_contact` BIGINT,
  `callsign` STRING,
  `event_time` STRING,
  `on_ground` BOOLEAN,
  `category` STRING,
  `vertical_rate` FLOAT,
  `position_source` INT,
  `current_time` STRING,
  `longitude` FLOAT
) WITH (
    'connector' = 'filesystem',
    'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
    'format' = 'json'
);

Além disso, você pode inserir a tabela Kafka na tabela ADLSgen2 no Flink SQL.

Captura de tela mostra inserir tabela Kafka na tabela ADLSgen2.

Captura de tela mostra a validação do trabalho de streaming no Flink.

Verifique o coletor de dados do Kafka no Armazenamento do Azure no portal do Azure

Captura de tela mostra o coletor de dados de verificação do Kafka no Armazenamento do Azure.

Autenticação do Armazenamento do Azure e do bloco de anotações do Azure Databricks

O ADLS Gen2 fornece o OAuth 2.0 com a entidade de serviço da sua aplicação Microsoft Entra para autenticação a partir de um bloco de anotações do Azure Databricks e, em seguida, monta no sistema de ficheiros do DBFS do Azure Databricks.

Vamos obter o ID da aplicação do principal de serviço, o ID de locatário e a chave secreta.

A captura de ecrã mostra o appid do princípio de serviço, a ID do locatário e a chave secreta.

Princípio de serviço de concessão do Proprietário de Dados de Blob de Armazenamento no portal do Azure

Captura de tela mostra o princípio de serviço do Proprietário de Dados de Blob de Armazenamento no portal do Azure.

Monte o ADLS Gen2 no DBFS, no notebook do Azure Databricks

Captura de tela mostra a montagem do ADLS Gen2 no DBFS, no notebook Azure Databricks.

Preparar notebook

Vamos escrever o seguinte código:

%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")

Definir Delta Live Table Pipeline e executar no Azure Databricks

Captura de tela mostra o Delta Live Table Pipeline e é executado no Azure Databricks.

Captura de tela mostra o Delta Live Table Pipeline e é executado no Azure Databricks.

Verifique a Tabela Delta Live no Notebook do Azure Databricks

A captura de ecrã mostra a verificação da tabela Delta Live no Notebook do Azure Databricks.

Referência

  • Apache, Apache Kafka, Kafka, Apache Flink, Flink e nomes de projetos de código aberto associados são marcas comerciais da Apache Software Foundation (ASF).