Dela via


Införliva Apache Flink® DataStream i Azure Databricks Delta Lake-tabeller

Det här exemplet visar hur du kan överföra dataström i Azure ADLS Gen2 från ett Apache Flink-kluster i HDInsight på AKS till Delta Lake-tabeller med hjälp av Azure Databricks Auto Loader.

Förutsättningar

Azure Databricks Auto Loader

Databricks Auto Loader gör det enkelt att strömma data till objektlagring från Flink-program till Delta Lake-tabeller. Auto Loader tillhandahåller en strukturerad strömningskälla med namnet cloudFiles.

Här följer stegen för hur du kan använda data från Flink i Azure Databricks delta live-tabeller.

I det här steget kan du skapa Kafka-tabellen och ADLS Gen2 på Flink SQL. I det här dokumentet använder vi en airplanes_state_real_time table. Du kan använda valfri artikel.

Du måste uppdatera koordinator-IP-adresserna med ditt Kafka-kluster i kodfragmentet.

CREATE TABLE kafka_airplanes_state_real_time (
   `date` STRING,
   `geo_altitude` FLOAT,
   `icao24` STRING,
   `latitude` FLOAT,
   `true_track` FLOAT,
   `velocity` FLOAT,
   `spi` BOOLEAN,
   `origin_country` STRING,
   `minute` STRING,
   `squawk` STRING,
   `sensors` STRING,
   `hour` STRING,
   `baro_altitude` FLOAT,
   `time_position` BIGINT,
   `last_contact` BIGINT,
   `callsign` STRING,
   `event_time` STRING,
   `on_ground` BOOLEAN,
   `category` STRING,
   `vertical_rate` FLOAT,
   `position_source` INT,
   `current_time` STRING,
   `longitude` FLOAT
 ) WITH (
    'connector' = 'kafka',  
    'topic' = 'airplanes_state_real_time',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

Sedan kan du skapa en ADLSgen2-tabell i Flink SQL.

Uppdatera containernamnet och lagringskontonamnet i kodfragmentet med din ADLS Gen2-information.

CREATE TABLE adlsgen2_airplanes_state_real_time (
  `date` STRING,
  `geo_altitude` FLOAT,
  `icao24` STRING,
  `latitude` FLOAT,
  `true_track` FLOAT,
  `velocity` FLOAT,
  `spi` BOOLEAN,
  `origin_country` STRING,
  `minute` STRING,
  `squawk` STRING,
  `sensors` STRING,
  `hour` STRING,
  `baro_altitude` FLOAT,
  `time_position` BIGINT,
  `last_contact` BIGINT,
  `callsign` STRING,
  `event_time` STRING,
  `on_ground` BOOLEAN,
  `category` STRING,
  `vertical_rate` FLOAT,
  `position_source` INT,
  `current_time` STRING,
  `longitude` FLOAT
) WITH (
    'connector' = 'filesystem',
    'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
    'format' = 'json'
);

Dessutom kan du infoga Kafka-tabellen i ADLSgen2-tabellen i Flink SQL.

Skärmbild som visar hur du infogar Kafka-tabellen i ADLSgen2-tabellen.

Skärmbild som visar att strömningsjobbet valideras på Flink.

Kontrollera datalagringsenhet från Kafka i Azure Storage via Azure-portalen

Skärmbild som visar kontrollera datakälla från Kafka på Azure Storage.

Autentisering av Azure Storage och Azure Databricks Notebook

ADLS Gen2 tillhandahåller OAuth 2.0 med ditt tjänsthuvudnamn för Microsoft Entra-programtjänsten för autentisering från en notebook i Azure Databricks och monterar sedan i Azure Databricks DBFS.

Vi hämtar AppID för tjänstens princip, klientorganisationens-ID och nyckel.

Skärmbild som visar få tjänstprincipens app-ID, klientorganisations-ID och hemlig nyckel.

Bevilja tjänstprincipen Lagringsblobdataägare på Azure-portalen

Skärmbild som visar tjänstprincipen Lagringsblobdataägare på Azure-portalen.

Montera ADLS Gen2 i DBFS i Azure Databricks notebook

Skärmbild som visar hur du monterar ADLS Gen2 i DBFS i Azure Databricks Notebook.

Förbered anteckningsbok

Nu ska vi skriva följande kod:

%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")

Definiera Delta Live Table Pipeline och kör på Azure Databricks

Skärmbild som visar Delta Live Table Pipeline och körs på Azure Databricks.

Skärmbild som visar Delta Live Table Pipeline och körs på Azure Databricks.

Kontrollera Delta Live Table i Azure Databricks Notebook

Skärmbild som visar en kontroll av Delta Live Table på en Azure Databricks Notebook.

Hänvisning