Włącz Apache Flink® DataStream do tabel Delta Lake na platformie Azure Databricks
W tym przykładzie pokazano, jak przesyłać dane strumieniowe do usługi Azure ADLS Gen2 z klastra Apache Flink uruchomionego na HDInsight na AKS do tabel Delta Lake przy użyciu Auto Loader w Azure Databricks.
Warunki wstępne
- Apache Flink 1.17.0 na HDInsight na AKS
- Apache Kafka 3.2 na HDInsight
- Azure Databricks w tej samej sieci wirtualnej co HDInsight na AKS
- usługi ADLS Gen2 i podmiotu zabezpieczeń
Moduł automatycznego ładowania usługi Azure Databricks
Usługa Databricks Auto Loader ułatwia przesyłanie strumieniowe danych do magazynu obiektów z aplikacji Flink do tabel usługi Delta Lake. Auto Loader udostępnia ustrukturyzowane źródło przesyłania strumieniowego o nazwie cloudFiles.
Oto kroki, jak można używać danych z Flink w tabelach delta live usługi Azure Databricks.
Tworzenie tabeli Apache Kafka® w Apache Flink® SQL
W tym kroku możesz utworzyć tabelę platformy Kafka i usługę ADLS Gen2 w języku Flink SQL. W tym dokumencie używamy airplanes_state_real_time table
. Możesz użyć dowolnego wybranego artykułu.
Należy zaktualizować adresy IP brokera przy użyciu klastra platformy Kafka we fragmencie kodu.
CREATE TABLE kafka_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'kafka',
'topic' = 'airplanes_state_real_time',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
Następnie możesz utworzyć tabelę ADLSgen2 w języku Flink SQL.
Zaktualizuj nazwę kontenera i nazwę konta magazynu we fragmencie kodu, podając szczegóły usługi ADLS Gen2.
CREATE TABLE adlsgen2_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'filesystem',
'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
'format' = 'json'
);
Ponadto, możesz wstawić tabelę Kafka do tabeli ADLSgen2 w Flink SQL.
Zweryfikuj zadanie przesyłania strumieniowego w Flink.
Sprawdzanie ujścia danych z platformy Kafka w usłudze Azure Storage w witrynie Azure Portal
Uwierzytelnianie w usłudze Azure Storage i notesie Azure Databricks
ADLS Gen2 zapewnia protokół OAuth 2.0 z jednostką usługi aplikacji Microsoft Entra do uwierzytelniania z notesu usług Azure Databricks, a następnie podłączenia do systemu plików DBFS w Azure Databricks.
Pobierzmy identyfikator jednostki usługi, identyfikator dzierżawy i klucz tajny.
Przyznanie zasad usługowych właścicielowi danych obiektów Blob w portalu Azure
Podłączanie ADLS Gen2 do DBFS w notesie Azure Databricks
przygotowywanie notesu
Napiszmy następujący kod:
%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")
Definiuj potok Delta Live Table i uruchom w Azure Databricks
Sprawdź Delta Live Table w Notebooku Azure Databricks
Odniesienie
- Nazwy projektów Apache, Apache Kafka, Kafka, Apache Flink, Flink i powiązane nazwy projektów open source są znakami towarowymiApache Software Foundation (ASF).