Udostępnij za pośrednictwem


Dołączanie strumienia danych apache Flink® do tabel usługi Delta Lake usługi Azure Databricks

W tym przykładzie pokazano, jak ujść dane strumienia w usłudze Azure ADLS Gen2 z klastra Apache Flink w usłudze HDInsight w usłudze AKS do tabel usługi Delta Lake przy użyciu usługi Azure Databricks Auto Loader.

Wymagania wstępne

Moduł automatycznego ładowania usługi Azure Databricks

Usługa Databricks Auto Loader ułatwia przesyłanie strumieniowe danych do magazynu obiektów z aplikacji Flink do tabel usługi Delta Lake. Moduł automatycznego ładowania udostępnia źródło przesyłania strumieniowego ze strukturą o nazwie cloudFiles.

Poniżej przedstawiono procedurę używania danych z funkcji Flink w tabelach na żywo różnicowych usługi Azure Databricks.

W tym kroku możesz utworzyć tabelę platformy Kafka i usługę ADLS Gen2 w języku Flink SQL. W tym dokumencie używamy elementu airplanes_state_real_time table. Możesz użyć dowolnego wybranego artykułu.

Należy zaktualizować adresy IP brokera przy użyciu klastra platformy Kafka we fragmencie kodu.

CREATE TABLE kafka_airplanes_state_real_time (
   `date` STRING,
   `geo_altitude` FLOAT,
   `icao24` STRING,
   `latitude` FLOAT,
   `true_track` FLOAT,
   `velocity` FLOAT,
   `spi` BOOLEAN,
   `origin_country` STRING,
   `minute` STRING,
   `squawk` STRING,
   `sensors` STRING,
   `hour` STRING,
   `baro_altitude` FLOAT,
   `time_position` BIGINT,
   `last_contact` BIGINT,
   `callsign` STRING,
   `event_time` STRING,
   `on_ground` BOOLEAN,
   `category` STRING,
   `vertical_rate` FLOAT,
   `position_source` INT,
   `current_time` STRING,
   `longitude` FLOAT
 ) WITH (
    'connector' = 'kafka',  
    'topic' = 'airplanes_state_real_time',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

Następnie możesz utworzyć tabelę ADLSgen2 w języku Flink SQL.

Zaktualizuj nazwę kontenera i nazwę konta magazynu we fragmencie kodu, podając szczegóły usługi ADLS Gen2.

CREATE TABLE adlsgen2_airplanes_state_real_time (
  `date` STRING,
  `geo_altitude` FLOAT,
  `icao24` STRING,
  `latitude` FLOAT,
  `true_track` FLOAT,
  `velocity` FLOAT,
  `spi` BOOLEAN,
  `origin_country` STRING,
  `minute` STRING,
  `squawk` STRING,
  `sensors` STRING,
  `hour` STRING,
  `baro_altitude` FLOAT,
  `time_position` BIGINT,
  `last_contact` BIGINT,
  `callsign` STRING,
  `event_time` STRING,
  `on_ground` BOOLEAN,
  `category` STRING,
  `vertical_rate` FLOAT,
  `position_source` INT,
  `current_time` STRING,
  `longitude` FLOAT
) WITH (
    'connector' = 'filesystem',
    'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
    'format' = 'json'
);

Ponadto możesz wstawić tabelę Platformy Kafka do tabeli ADLSgen2 w języku Flink SQL.

Zrzut ekranu przedstawiający wstawianie tabeli platformy Kafka do tabeli ADLSgen2.

Zrzut ekranu przedstawiający weryfikowanie zadania przesyłania strumieniowego na Flink.

Sprawdzanie ujścia danych z platformy Kafka w usłudze Azure Storage w witrynie Azure Portal

Zrzut ekranu przedstawiający sprawdzanie ujścia danych z platformy Kafka w usłudze Azure Storage.

Uwierzytelnianie notesu usługi Azure Storage i usługi Azure Databricks

Usługa ADLS Gen2 udostępnia protokół OAuth 2.0 z jednostką usługi aplikacji Microsoft Entra na potrzeby uwierzytelniania z notesu usługi Azure Databricks, a następnie zainstaluj go w systemie plików DBFS usługi Azure Databricks.

Pobierzmy identyfikator jednostki usługi, identyfikator dzierżawy i klucz tajny.

Zrzut ekranu przedstawiający pobieranie identyfikatora jednostki usługi, identyfikatora dzierżawy i klucza tajnego.

Udzielanie jednostki usługi właścicielowi danych obiektu blob usługi Storage w witrynie Azure Portal

Zrzut ekranu przedstawiający zasadę usługi właściciel danych obiektu blob usługi Storage w witrynie Azure Portal.

Instalowanie usługi ADLS Gen2 w systemie plików DBFS w notesie usługi Azure Databricks

Zrzut ekranu przedstawiający instalowanie usługi ADLS Gen2 w systemie plików DBFS w notesie usługi Azure Databricks.

Przygotowywanie notesu

Napiszmy następujący kod:

%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")

Definiowanie potoku usługi Delta Live Table i uruchamianie w usłudze Azure Databricks

Zrzut ekranu przedstawiający potok usługi Delta Live Table i uruchamiany w usłudze Azure Databricks.

Zrzut ekranu przedstawiający potok usługi Delta Live Table i uruchamiany w usłudze Azure Databricks.

Sprawdzanie tabeli delta live w notesie usługi Azure Databricks

Zrzut ekranu przedstawia sprawdzanie tabeli delta live w notesie usługi Azure Databricks.

Odwołanie