Införliva Apache Flink® DataStream i Azure Databricks Delta Lake-tabeller
Det här exemplet visar hur du kan överföra dataström i Azure ADLS Gen2 från ett Apache Flink-kluster i HDInsight på AKS till Delta Lake-tabeller med hjälp av Azure Databricks Auto Loader.
Förutsättningar
- Apache Flink 1.17.0 på HDInsight på AKS
- Apache Kafka 3.2 på HDInsight
- Azure Databricks i samma virtuella nätverk som HDInsight på AKS
- ADLS Gen2 och tjänstehuvud
Azure Databricks Auto Loader
Databricks Auto Loader gör det enkelt att strömma data till objektlagring från Flink-program till Delta Lake-tabeller. Auto Loader tillhandahåller en strukturerad strömningskälla med namnet cloudFiles.
Här följer stegen för hur du kan använda data från Flink i Azure Databricks delta live-tabeller.
Skapa Apache Kafka-tabell® i Apache Flink® SQL
I det här steget kan du skapa Kafka-tabellen och ADLS Gen2 på Flink SQL. I det här dokumentet använder vi en airplanes_state_real_time table
. Du kan använda valfri artikel.
Du måste uppdatera koordinator-IP-adresserna med ditt Kafka-kluster i kodfragmentet.
CREATE TABLE kafka_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'kafka',
'topic' = 'airplanes_state_real_time',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
Sedan kan du skapa en ADLSgen2-tabell i Flink SQL.
Uppdatera containernamnet och lagringskontonamnet i kodfragmentet med din ADLS Gen2-information.
CREATE TABLE adlsgen2_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'filesystem',
'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
'format' = 'json'
);
Dessutom kan du infoga Kafka-tabellen i ADLSgen2-tabellen i Flink SQL.
Verifiera direktuppspelningsjobbet på Flink
Kontrollera datalagringsenhet från Kafka i Azure Storage via Azure-portalen
Autentisering av Azure Storage och Azure Databricks Notebook
ADLS Gen2 tillhandahåller OAuth 2.0 med ditt tjänsthuvudnamn för Microsoft Entra-programtjänsten för autentisering från en notebook i Azure Databricks och monterar sedan i Azure Databricks DBFS.
Vi hämtar AppID för tjänstens princip, klientorganisationens-ID och nyckel.
Bevilja tjänstprincipen Lagringsblobdataägare på Azure-portalen
Montera ADLS Gen2 i DBFS i Azure Databricks notebook
Förbered anteckningsbok
Nu ska vi skriva följande kod:
%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")
Definiera Delta Live Table Pipeline och kör på Azure Databricks
Kontrollera Delta Live Table i Azure Databricks Notebook
Hänvisning
- Apache, Apache Kafka, Kafka, Apache Flink, Flink och associerade projektnamn med öppen källkod är varumärken som tillhör Apache Software Foundation (ASF).