Freigeben über


Integrieren von Apache Flink® DataStream in Azure Databricks Delta Lake Tables

In diesem Beispiel wird gezeigt, wie Sie Daten in Azure ADLS Gen2 mithilfe von Azure Databricks Auto Loader aus dem Apache Flink-Cluster der HDInsight on AKS an Delta Lake-Tabellen streamen.

Voraussetzungen

Azure Databricks Auto Loader

Databricks Auto Loader erleichtert das Streamen von Daten im Objektspeicher von Flink-Anwendungen in Delta Lake-Tabellen. Auto Loader stellt eine strukturierte Streamingquelle namens „cloudFiles“ bereit.

Die folgenden Schritte veranschaulichen, wie Sie Daten aus Flink in Azure Databricks Delta Live Tables verwenden können.

In diesem Schritt können Sie eine Kafka-Tabelle und eine ADLS Gen2-Instanz in Flink SQL erstellen. In diesem Dokument verwenden wir eine airplanes_state_real_time table. Sie können einen beliebigen Artikel Ihrer Wahl verwenden.

Sie müssen die Broker-IPs mit Ihrem Kafka-Cluster im Codeschnipsel aktualisieren.

CREATE TABLE kafka_airplanes_state_real_time (
   `date` STRING,
   `geo_altitude` FLOAT,
   `icao24` STRING,
   `latitude` FLOAT,
   `true_track` FLOAT,
   `velocity` FLOAT,
   `spi` BOOLEAN,
   `origin_country` STRING,
   `minute` STRING,
   `squawk` STRING,
   `sensors` STRING,
   `hour` STRING,
   `baro_altitude` FLOAT,
   `time_position` BIGINT,
   `last_contact` BIGINT,
   `callsign` STRING,
   `event_time` STRING,
   `on_ground` BOOLEAN,
   `category` STRING,
   `vertical_rate` FLOAT,
   `position_source` INT,
   `current_time` STRING,
   `longitude` FLOAT
 ) WITH (
    'connector' = 'kafka',  
    'topic' = 'airplanes_state_real_time',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

Als Nächstes können Sie eine ADLS Gen2-Tabelle in Flink SQL erstellen.

Aktualisieren Sie den Namen des Containers und des Speicherkontos im Codeschnipsel mit Ihren ADLS Gen2-Daten.

CREATE TABLE adlsgen2_airplanes_state_real_time (
  `date` STRING,
  `geo_altitude` FLOAT,
  `icao24` STRING,
  `latitude` FLOAT,
  `true_track` FLOAT,
  `velocity` FLOAT,
  `spi` BOOLEAN,
  `origin_country` STRING,
  `minute` STRING,
  `squawk` STRING,
  `sensors` STRING,
  `hour` STRING,
  `baro_altitude` FLOAT,
  `time_position` BIGINT,
  `last_contact` BIGINT,
  `callsign` STRING,
  `event_time` STRING,
  `on_ground` BOOLEAN,
  `category` STRING,
  `vertical_rate` FLOAT,
  `position_source` INT,
  `current_time` STRING,
  `longitude` FLOAT
) WITH (
    'connector' = 'filesystem',
    'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
    'format' = 'json'
);

Darüber hinaus können Sie eine Kafka-Tabelle in eine ADLS Gen2-Tabelle in Flink SQL einfügen.

Der Screenshot zeigt das Einfügen einer Kafka-Tabelle in eine ADLSgen2-Tabelle.

Screenshot zeigt die Überprüfung des Streamingauftrags auf Flink.

Überprüfen der Datensenke aus Kafka in Azure Storage im Azure-Portal

Der Screenshot zeigt Datensenker aus Kafka in Azure Storage prüfen.

Authentifizierung von Azure Storage und des Azure Databricks-Notebooks

ADLS Gen2 stellt OAuth 2.0 mit Ihrem Microsoft Entra-Anwendungsdienstprinzipal bereit. Das ermöglicht Ihnen die Authentifizierung aus einem Azure Databricks-Notebook und die Einbindung in Azure Databricks DBFS.

Abrufen der Dienstprinzipal-App-ID, der Mandanten-ID und des geheimen Schlüssels.

Der Screenshot zeigt Abrufen der Dienstprinzipal-App-ID, der Mandanten-ID und des geheimen Schlüssels an.

Zuweisen der Rolle „Besitzer von Speicherblobdaten“ an den Dienstprinzipal im Azure-Portal

Der Screenshot zeigt das Dienstprinzip des Speicher-Blob-Besitzers im Azure-Portal an.

Einbinden von ADLS Gen2 in DBFS in einem Azure Databricks-Notebook

Der Screenshot zeigt ADLS Gen2 in DBFS in einem Azure Databricks-Notebook einlegen an.

Vorbereiten des Notebooks

Schreiben Sie den folgenden Code:

%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")

Definieren einer Delta Live Tables-Pipeline und Ausführen dieser Pipeline auf Azure Databricks

Der Screenshot zeigt die Delta Live Tables-Pipeline und auf Azure Databricks ausführen an.

Der Screenshot zeigt die Delta Live Table Pipeline und Ausführung auf den Azure Databricks an.

Überprüfen von Delta Live Tables in einem Azure Databricks-Notebook

Der Screenshot zeigt Überprüfen von Delta Live Tables in einem Azure Databricks-Notebook an.

Verweis