Delen via


Apache Flink® DataStream integreren in Azure Databricks Delta Lake tabellen

In dit voorbeeld ziet u hoe u streamgegevens in Azure ADLS Gen2 kunt opslaan vanuit een Apache Flink-cluster op HDInsight op AKS in Delta Lake-tabellen met behulp van Azure Databricks Auto Loader.

Voorwaarden

Azure Databricks Auto Loader

Databricks Auto Loader maakt het eenvoudig om gegevens vanuit Flink-toepassingen naar objectopslag te streamen en ze in Delta Lake-tabellen op te slaan. Auto Loader biedt een Structured Streaming-bron met de naam cloudFiles.

Hier volgen de stappen voor het gebruik van gegevens uit Flink in Live-tabellen van Azure Databricks Delta.

In deze stap kunt u kafka-tabel en ADLS Gen2 maken op Flink SQL. In dit document gebruiken we een airplanes_state_real_time table. U kunt elk artikel van uw keuze gebruiken.

U moet de broker-IP's bijwerken met uw Kafka-cluster in het codefragment.

CREATE TABLE kafka_airplanes_state_real_time (
   `date` STRING,
   `geo_altitude` FLOAT,
   `icao24` STRING,
   `latitude` FLOAT,
   `true_track` FLOAT,
   `velocity` FLOAT,
   `spi` BOOLEAN,
   `origin_country` STRING,
   `minute` STRING,
   `squawk` STRING,
   `sensors` STRING,
   `hour` STRING,
   `baro_altitude` FLOAT,
   `time_position` BIGINT,
   `last_contact` BIGINT,
   `callsign` STRING,
   `event_time` STRING,
   `on_ground` BOOLEAN,
   `category` STRING,
   `vertical_rate` FLOAT,
   `position_source` INT,
   `current_time` STRING,
   `longitude` FLOAT
 ) WITH (
    'connector' = 'kafka',  
    'topic' = 'airplanes_state_real_time',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

Vervolgens kunt u een ADLSgen2-tabel maken op Flink SQL.

Werk de containernaam en de naam van het opslagaccount in het codefragment bij met uw ADLS Gen2-gegevens.

CREATE TABLE adlsgen2_airplanes_state_real_time (
  `date` STRING,
  `geo_altitude` FLOAT,
  `icao24` STRING,
  `latitude` FLOAT,
  `true_track` FLOAT,
  `velocity` FLOAT,
  `spi` BOOLEAN,
  `origin_country` STRING,
  `minute` STRING,
  `squawk` STRING,
  `sensors` STRING,
  `hour` STRING,
  `baro_altitude` FLOAT,
  `time_position` BIGINT,
  `last_contact` BIGINT,
  `callsign` STRING,
  `event_time` STRING,
  `on_ground` BOOLEAN,
  `category` STRING,
  `vertical_rate` FLOAT,
  `position_source` INT,
  `current_time` STRING,
  `longitude` FLOAT
) WITH (
    'connector' = 'filesystem',
    'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
    'format' = 'json'
);

Verder kunt u de Kafka-tabel invoegen in de ADLSgen2-tabel op Flink SQL.

Schermopname toont het invoegen van de Kafka-tabel in de ADLSgen2-tabel.

Schermopname toont het valideren van de streaming job op Flink.

Gegevenssink controleren vanuit Kafka in Azure Storage in Azure Portal

Schermopname toont controle van het dataverzamelpunt van Kafka in Azure Storage.

Verificatie van Azure Storage- en Azure Databricks-notebook

ADLS Gen2 biedt OAuth 2.0 met de service-principal van uw Microsoft Entra-toepassing voor verificatie vanuit een Azure Databricks-notebook en koppel deze vervolgens aan Azure Databricks DBFS.

Laten we service-principal-appid, tenant-id en geheime sleutel ophalen.

Screenshot toont de app-id, tenant-id en geheime sleutel van de service-principal.

Verleen de dienst-principal de rol van Eigenaar van Storage Blob-gegevens in Azure Portal

Schermopname toont het serviceprincipe de eigenaar van de opslagblobgegevens in Azure Portal.

ADLS Gen2 koppelen aan DBFS in Azure Databricks Notebook

Schermopname toont het koppelen van ADLS Gen2 aan DBFS, in Azure Databricks-notebook.

Bereid notitieblok voor

Laten we de volgende code schrijven:

%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")

Delta Live Table Pipeline definiëren en uitvoeren in Azure Databricks

Schermopname toont Delta Live Table Pipeline en wordt uitgevoerd in Azure Databricks.

Schermopname toont Delta Live Table Pipeline en wordt uitgevoerd op Azure Databricks.

Delta Live Table controleren in Azure Databricks Notebook

Schermopname toont het controleren van Delta Live Table in Azure Databricks Notebook.

Referentie