Apache Flink® DataStream integreren in Azure Databricks Delta Lake tabellen
In dit voorbeeld ziet u hoe u streamgegevens in Azure ADLS Gen2 kunt opslaan vanuit een Apache Flink-cluster op HDInsight op AKS in Delta Lake-tabellen met behulp van Azure Databricks Auto Loader.
Voorwaarden
- Apache Flink 1.17.0 in HDInsight op AKS
- Apache Kafka 3.2 in HDInsight
- Azure Databricks- in hetzelfde virtuele netwerk als HDInsight in AKS
- ADLS Gen2- en service-principal
Azure Databricks Auto Loader
Databricks Auto Loader maakt het eenvoudig om gegevens vanuit Flink-toepassingen naar objectopslag te streamen en ze in Delta Lake-tabellen op te slaan. Auto Loader biedt een Structured Streaming-bron met de naam cloudFiles.
Hier volgen de stappen voor het gebruik van gegevens uit Flink in Live-tabellen van Azure Databricks Delta.
Een Apache Kafka-tabel® maken in Apache Flink® SQL
In deze stap kunt u kafka-tabel en ADLS Gen2 maken op Flink SQL. In dit document gebruiken we een airplanes_state_real_time table
. U kunt elk artikel van uw keuze gebruiken.
U moet de broker-IP's bijwerken met uw Kafka-cluster in het codefragment.
CREATE TABLE kafka_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'kafka',
'topic' = 'airplanes_state_real_time',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
Vervolgens kunt u een ADLSgen2-tabel maken op Flink SQL.
Werk de containernaam en de naam van het opslagaccount in het codefragment bij met uw ADLS Gen2-gegevens.
CREATE TABLE adlsgen2_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'filesystem',
'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
'format' = 'json'
);
Verder kunt u de Kafka-tabel invoegen in de ADLSgen2-tabel op Flink SQL.
De streamingtaak valideren op Flink
Gegevenssink controleren vanuit Kafka in Azure Storage in Azure Portal
Verificatie van Azure Storage- en Azure Databricks-notebook
ADLS Gen2 biedt OAuth 2.0 met de service-principal van uw Microsoft Entra-toepassing voor verificatie vanuit een Azure Databricks-notebook en koppel deze vervolgens aan Azure Databricks DBFS.
Laten we service-principal-appid, tenant-id en geheime sleutel ophalen.
Verleen de dienst-principal de rol van Eigenaar van Storage Blob-gegevens in Azure Portal
ADLS Gen2 koppelen aan DBFS in Azure Databricks Notebook
Bereid notitieblok voor
Laten we de volgende code schrijven:
%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")
Delta Live Table Pipeline definiëren en uitvoeren in Azure Databricks
Delta Live Table controleren in Azure Databricks Notebook
Referentie
- Apache, Apache Kafka, Kafka, Apache Flink, Flink en bijbehorende opensource-projectnamen zijn handelsmerken van de Apache Software Foundation (ASF).