Incorporar o Apache Flink® DataStream nas tabelas Delta Lake do Azure Databricks
Este exemplo mostra como coletar dados de fluxo no Azure ADLS Gen2 do cluster Apache Flink no HDInsight no AKS em tabelas Delta Lake usando o Azure Databricks Auto Loader.
Pré-requisitos
- Apache Flink 1.17.0 no HDInsight no AKS
- Apache Kafka 3.2 no HDInsight
- Azure Databricks na mesma rede virtual que o HDInsight no AKS
- ADLS Gen2 e Entidade de Serviço
Azure Databricks Auto Loader
O Databricks Auto Loader facilita o fluxo de dados para o armazenamento de objetos de aplicativos Flink para tabelas Delta Lake. Auto Loader fornece uma fonte de streaming estruturado chamada cloudFiles.
Aqui estão as etapas de como você pode usar dados do Flink em tabelas dinâmicas delta do Azure Databricks.
Criar tabela Apache Kafka® no Apache Flink® SQL
Nesta etapa, você pode criar a tabela Kafka e o ADLS Gen2 no Flink SQL. Neste documento, estamos usando um airplanes_state_real_time table
arquivo . Você pode usar qualquer artigo de sua escolha.
Você precisa atualizar os IPs do broker com seu cluster Kafka no trecho de código.
CREATE TABLE kafka_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'kafka',
'topic' = 'airplanes_state_real_time',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
Em seguida, você pode criar a tabela ADLSgen2 no Flink SQL.
Atualize o nome do contêiner e o nome da conta de armazenamento no trecho de código com os detalhes do ADLS Gen2.
CREATE TABLE adlsgen2_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'filesystem',
'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
'format' = 'json'
);
Além disso, você pode inserir a tabela Kafka na tabela ADLSgen2 no Flink SQL.
Validar o trabalho de streaming no Flink
Verifique o coletor de dados do Kafka no Armazenamento do Azure no portal do Azure
Autenticação do Armazenamento do Azure e do bloco de anotações do Azure Databricks
O ADLS Gen2 fornece o OAuth 2.0 com sua entidade de serviço de aplicativo Microsoft Entra para autenticação de um bloco de anotações do Azure Databricks e, em seguida, monta no DBFS do Azure Databricks.
Vamos obter o princípio de serviço appid, ID do locatário e chave secreta.
Princípio de concessão de serviço ao Proprietário de Dados de Blob de Armazenamento no portal do Azure
Monte o ADLS Gen2 no DBFS, no notebook do Azure Databricks
Preparar bloco de notas
Vamos escrever o seguinte código:
%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")
Definir Delta Live Table Pipeline e executar no Azure Databricks
Verifique Delta Live Table no Azure Databricks Notebook
Referência
- Apache, Apache Kafka, Kafka, Apache Flink, Flink e nomes de projetos de código aberto associados são marcas comerciais da Apache Software Foundation (ASF).