Začlenění Apache Flink® DataStreamu do tabulek Azure Databricks Delta Lake
Tento příklad ukazuje, jak streamovat data v Azure ADLS Gen2 z clusteru Apache Flink ve službě HDInsight v AKS do tabulek Delta Lake pomocí automatického zavaděče Azure Databricks.
Požadavky
- Apache Flink 1.17.0 ve službě HDInsight ve službě AKS
- Apache Kafka 3.2 ve službě HDInsight
- Azure Databricks ve stejné virtuální síti jako HDInsight v AKS
- ADLS Gen2 a instanční objekt
Automatický zavaděč Azure Databricks
Databricks Auto Loader usnadňuje streamování dat do úložiště objektů a jejich ukládání do tabulek Delta Lake z Flink aplikací. Automatický zavaděč poskytuje strukturovaný streamovací zdroj označovaný jako cloudFiles.
Tady je postup použití dat z Flinku v živých tabulkách Azure Databricks Delta.
Vytvoření tabulky Apache Kafka® v Apache Flink® SQL
V tomto kroku můžete vytvořit tabulku Kafka a ADLS Gen2 v Flink SQL. V tomto dokumentu používáme airplanes_state_real_time table
. Můžete použít libovolný článek podle svého výběru.
V fragmentu kódu je potřeba aktualizovat IP adresy zprostředkovatele pomocí clusteru Kafka.
CREATE TABLE kafka_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'kafka',
'topic' = 'airplanes_state_real_time',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
Dále můžete vytvořit tabulku ADLSgen2 v Flink SQL.
Aktualizujte název kontejneru a název účtu úložiště v fragmentu kódu podrobnostmi ADLS Gen2.
CREATE TABLE adlsgen2_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'filesystem',
'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
'format' = 'json'
);
Tabulku Kafka můžete dále vložit do tabulky ADLSgen2 v Flink SQL.
Ověření úlohy streamování na Flinku
Kontrola jímky dat ze systému Kafka ve službě Azure Storage na webu Azure Portal
Ověření Azure Storage a poznámkového bloku Azure Databricks
ADLS Gen2 poskytuje OAuth 2.0 s instančním objektem aplikace Microsoft Entra pro ověřování z poznámkového bloku Azure Databricks a pak se připojí k Azure Databricks DBFS.
Získejme ID aplikace, ID tenanta a tajný klíč aplikace služeb.
Udělte oprávnění Služebnímu Principálu jako Vlastník Datových Objektů Blob Úložiště na portálu Azure
Připojit ADLS Gen2 k DBFS v poznámkovém bloku Azure Databricks
Připravte poznámkový blok
Pojďme napsat následující kód:
%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")
Definování kanálu Delta Live Table a spuštění v Azure Databricks
Kontrola Delta Live Table v poznámkovém bloku Azure Databricks
Odkaz
- Názvy projektů Apache, Apache Kafka, Apache Flink, Flink a přidružených opensourcových projektů jsou ochranné známkyApache Software Foundation (ASF).