Dołączanie strumienia danych apache Flink® do tabel usługi Delta Lake usługi Azure Databricks
W tym przykładzie pokazano, jak ujść dane strumienia w usłudze Azure ADLS Gen2 z klastra Apache Flink w usłudze HDInsight w usłudze AKS do tabel usługi Delta Lake przy użyciu usługi Azure Databricks Auto Loader.
Wymagania wstępne
- Apache Flink 1.17.0 w usłudze HDInsight w usłudze AKS
- Platforma Apache Kafka 3.2 w usłudze HDInsight
- Usługa Azure Databricks w tej samej sieci wirtualnej co usługa HDInsight w usłudze AKS
- USŁUGA ADLS Gen2 i jednostka usługi
Moduł automatycznego ładowania usługi Azure Databricks
Usługa Databricks Auto Loader ułatwia przesyłanie strumieniowe danych do magazynu obiektów z aplikacji Flink do tabel usługi Delta Lake. Moduł automatycznego ładowania udostępnia źródło przesyłania strumieniowego ze strukturą o nazwie cloudFiles.
Poniżej przedstawiono procedurę używania danych z funkcji Flink w tabelach na żywo różnicowych usługi Azure Databricks.
Tworzenie tabeli platformy Apache Kafka® w języku Apache Flink® SQL
W tym kroku możesz utworzyć tabelę platformy Kafka i usługę ADLS Gen2 w języku Flink SQL. W tym dokumencie używamy elementu airplanes_state_real_time table
. Możesz użyć dowolnego wybranego artykułu.
Należy zaktualizować adresy IP brokera przy użyciu klastra platformy Kafka we fragmencie kodu.
CREATE TABLE kafka_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'kafka',
'topic' = 'airplanes_state_real_time',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
Następnie możesz utworzyć tabelę ADLSgen2 w języku Flink SQL.
Zaktualizuj nazwę kontenera i nazwę konta magazynu we fragmencie kodu, podając szczegóły usługi ADLS Gen2.
CREATE TABLE adlsgen2_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'filesystem',
'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
'format' = 'json'
);
Ponadto możesz wstawić tabelę Platformy Kafka do tabeli ADLSgen2 w języku Flink SQL.
Weryfikowanie zadania przesyłania strumieniowego na Flink
Sprawdzanie ujścia danych z platformy Kafka w usłudze Azure Storage w witrynie Azure Portal
Uwierzytelnianie notesu usługi Azure Storage i usługi Azure Databricks
Usługa ADLS Gen2 udostępnia protokół OAuth 2.0 z jednostką usługi aplikacji Microsoft Entra na potrzeby uwierzytelniania z notesu usługi Azure Databricks, a następnie zainstaluj go w systemie plików DBFS usługi Azure Databricks.
Pobierzmy identyfikator jednostki usługi, identyfikator dzierżawy i klucz tajny.
Udzielanie jednostki usługi właścicielowi danych obiektu blob usługi Storage w witrynie Azure Portal
Instalowanie usługi ADLS Gen2 w systemie plików DBFS w notesie usługi Azure Databricks
Przygotowywanie notesu
Napiszmy następujący kod:
%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")
Definiowanie potoku usługi Delta Live Table i uruchamianie w usłudze Azure Databricks
Sprawdzanie tabeli delta live w notesie usługi Azure Databricks
Odwołanie
- Apache, Apache Kafka, Kafka, Apache Flink, Flink i skojarzone nazwy projektów typu open source są znakami towarowymi platformy Apache Software Foundation (ASF).