Udostępnij za pośrednictwem


Włącz Apache Flink® DataStream do tabel Delta Lake na platformie Azure Databricks

W tym przykładzie pokazano, jak przesyłać dane strumieniowe do usługi Azure ADLS Gen2 z klastra Apache Flink uruchomionego na HDInsight na AKS do tabel Delta Lake przy użyciu Auto Loader w Azure Databricks.

Warunki wstępne

Moduł automatycznego ładowania usługi Azure Databricks

Usługa Databricks Auto Loader ułatwia przesyłanie strumieniowe danych do magazynu obiektów z aplikacji Flink do tabel usługi Delta Lake. Auto Loader udostępnia ustrukturyzowane źródło przesyłania strumieniowego o nazwie cloudFiles.

Oto kroki, jak można używać danych z Flink w tabelach delta live usługi Azure Databricks.

W tym kroku możesz utworzyć tabelę platformy Kafka i usługę ADLS Gen2 w języku Flink SQL. W tym dokumencie używamy airplanes_state_real_time table. Możesz użyć dowolnego wybranego artykułu.

Należy zaktualizować adresy IP brokera przy użyciu klastra platformy Kafka we fragmencie kodu.

CREATE TABLE kafka_airplanes_state_real_time (
   `date` STRING,
   `geo_altitude` FLOAT,
   `icao24` STRING,
   `latitude` FLOAT,
   `true_track` FLOAT,
   `velocity` FLOAT,
   `spi` BOOLEAN,
   `origin_country` STRING,
   `minute` STRING,
   `squawk` STRING,
   `sensors` STRING,
   `hour` STRING,
   `baro_altitude` FLOAT,
   `time_position` BIGINT,
   `last_contact` BIGINT,
   `callsign` STRING,
   `event_time` STRING,
   `on_ground` BOOLEAN,
   `category` STRING,
   `vertical_rate` FLOAT,
   `position_source` INT,
   `current_time` STRING,
   `longitude` FLOAT
 ) WITH (
    'connector' = 'kafka',  
    'topic' = 'airplanes_state_real_time',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

Następnie możesz utworzyć tabelę ADLSgen2 w języku Flink SQL.

Zaktualizuj nazwę kontenera i nazwę konta magazynu we fragmencie kodu, podając szczegóły usługi ADLS Gen2.

CREATE TABLE adlsgen2_airplanes_state_real_time (
  `date` STRING,
  `geo_altitude` FLOAT,
  `icao24` STRING,
  `latitude` FLOAT,
  `true_track` FLOAT,
  `velocity` FLOAT,
  `spi` BOOLEAN,
  `origin_country` STRING,
  `minute` STRING,
  `squawk` STRING,
  `sensors` STRING,
  `hour` STRING,
  `baro_altitude` FLOAT,
  `time_position` BIGINT,
  `last_contact` BIGINT,
  `callsign` STRING,
  `event_time` STRING,
  `on_ground` BOOLEAN,
  `category` STRING,
  `vertical_rate` FLOAT,
  `position_source` INT,
  `current_time` STRING,
  `longitude` FLOAT
) WITH (
    'connector' = 'filesystem',
    'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
    'format' = 'json'
);

Ponadto, możesz wstawić tabelę Kafka do tabeli ADLSgen2 w Flink SQL.

Zrzut ekranu przedstawia wstawianie tabeli Kafka do tabeli ADLSgen2.

Zrzut ekranu pokazuje weryfikację zadania przesyłania strumieniowego w Flink.

Sprawdzanie ujścia danych z platformy Kafka w usłudze Azure Storage w witrynie Azure Portal

Zrzut ekranu przedstawia sprawdzanie ujścia danych z platformy Kafka w usłudze Azure Storage.

Uwierzytelnianie w usłudze Azure Storage i notesie Azure Databricks

ADLS Gen2 zapewnia protokół OAuth 2.0 z jednostką usługi aplikacji Microsoft Entra do uwierzytelniania z notesu usług Azure Databricks, a następnie podłączenia do systemu plików DBFS w Azure Databricks.

Pobierzmy identyfikator jednostki usługi, identyfikator dzierżawy i klucz tajny.

Zrzut ekranu przedstawiający pobieranie identyfikatora jednostki usługi, identyfikatora dzierżawy i klucza tajnego.

Przyznanie zasad usługowych właścicielowi danych obiektów Blob w portalu Azure

Zrzut ekranu przedstawia zasadę usługi Właściciel danych Blob Storage w Azure Portal.

Podłączanie ADLS Gen2 do DBFS w notesie Azure Databricks

Zrzut ekranu przedstawia zamontowanie usługi ADLS Gen2 w systemie plików DBFS na notatniku usługi Azure Databricks.

przygotowywanie notesu

Napiszmy następujący kod:

%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")

Definiuj potok Delta Live Table i uruchom w Azure Databricks

Zrzut ekranu przedstawia potok usługi Delta Live Table i działa w usłudze Azure Databricks.

Zrzut ekranu przedstawia potok usługi Delta Live Table i działa w usłudze Azure Databricks.

Sprawdź Delta Live Table w Notebooku Azure Databricks

Zrzut ekranu pokazuje sprawdzanie tabeli Delta Live w notesie Azure Databricks.

Odniesienie