Integrieren von Apache Flink® DataStream in Azure Databricks Delta Lake Tables
In diesem Beispiel wird gezeigt, wie Sie Daten in Azure ADLS Gen2 mithilfe von Azure Databricks Auto Loader aus dem Apache Flink-Cluster der HDInsight on AKS an Delta Lake-Tabellen streamen.
Voraussetzungen
- Apache Flink 1.17.0 in HDInsight on AKS
- Apache Kafka 3.2 in HDInsight
- Azure Databricks im selben virtuellen Netzwerk wie HDInsight auf AKS
- ADLS Gen2 und ein Dienstprinzipal
Azure Databricks Auto Loader
Databricks Auto Loader erleichtert das Streamen von Daten im Objektspeicher von Flink-Anwendungen in Delta Lake-Tabellen. Auto Loader stellt eine strukturierte Streamingquelle namens „cloudFiles“ bereit.
Die folgenden Schritte veranschaulichen, wie Sie Daten aus Flink in Azure Databricks Delta Live Tables verwenden können.
Erstellen einer Apache Kafka®-Tabelle auf Apache Flink® SQL
In diesem Schritt können Sie eine Kafka-Tabelle und eine ADLS Gen2-Instanz in Flink SQL erstellen. In diesem Dokument verwenden wir eine airplanes_state_real_time table
. Sie können einen beliebigen Artikel Ihrer Wahl verwenden.
Sie müssen die Broker-IPs mit Ihrem Kafka-Cluster im Codeschnipsel aktualisieren.
CREATE TABLE kafka_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'kafka',
'topic' = 'airplanes_state_real_time',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
Als Nächstes können Sie eine ADLS Gen2-Tabelle in Flink SQL erstellen.
Aktualisieren Sie den Namen des Containers und des Speicherkontos im Codeschnipsel mit Ihren ADLS Gen2-Daten.
CREATE TABLE adlsgen2_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'filesystem',
'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
'format' = 'json'
);
Darüber hinaus können Sie eine Kafka-Tabelle in eine ADLS Gen2-Tabelle in Flink SQL einfügen.
Überprüfen des Streamingauftrags in Flink
Überprüfen der Datensenke aus Kafka in Azure Storage im Azure-Portal
Authentifizierung von Azure Storage und des Azure Databricks-Notebooks
ADLS Gen2 stellt OAuth 2.0 mit Ihrem Microsoft Entra-Anwendungsdienstprinzipal bereit. Das ermöglicht Ihnen die Authentifizierung aus einem Azure Databricks-Notebook und die Einbindung in Azure Databricks DBFS.
Abrufen der Dienstprinzipal-App-ID, der Mandanten-ID und des geheimen Schlüssels.
Zuweisen der Rolle „Besitzer von Speicherblobdaten“ an den Dienstprinzipal im Azure-Portal
Einbinden von ADLS Gen2 in DBFS in einem Azure Databricks-Notebook
Vorbereiten des Notebooks
Schreiben Sie den folgenden Code:
%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")
Definieren einer Delta Live Tables-Pipeline und Ausführen dieser Pipeline auf Azure Databricks
Überprüfen von Delta Live Tables in einem Azure Databricks-Notebook
Verweis
- Apache, Apache Kafka, Kafka, Apache Flink, Flink und zugehörige Open Source-Projektnamen sind Marken der Apache Software Foundation (ASF).