Sdílet prostřednictvím


Začlenění Apache Flink® DataStreamu do tabulek Azure Databricks Delta Lake

Tento příklad ukazuje, jak streamovat data v Azure ADLS Gen2 z clusteru Apache Flink ve službě HDInsight v AKS do tabulek Delta Lake pomocí automatického zavaděče Azure Databricks.

Požadavky

Automatický zavaděč Azure Databricks

Databricks Auto Loader usnadňuje streamování dat do úložiště objektů a jejich ukládání do tabulek Delta Lake z Flink aplikací. Automatický zavaděč poskytuje strukturovaný streamovací zdroj označovaný jako cloudFiles.

Tady je postup použití dat z Flinku v živých tabulkách Azure Databricks Delta.

V tomto kroku můžete vytvořit tabulku Kafka a ADLS Gen2 v Flink SQL. V tomto dokumentu používáme airplanes_state_real_time table. Můžete použít libovolný článek podle svého výběru.

V fragmentu kódu je potřeba aktualizovat IP adresy zprostředkovatele pomocí clusteru Kafka.

CREATE TABLE kafka_airplanes_state_real_time (
   `date` STRING,
   `geo_altitude` FLOAT,
   `icao24` STRING,
   `latitude` FLOAT,
   `true_track` FLOAT,
   `velocity` FLOAT,
   `spi` BOOLEAN,
   `origin_country` STRING,
   `minute` STRING,
   `squawk` STRING,
   `sensors` STRING,
   `hour` STRING,
   `baro_altitude` FLOAT,
   `time_position` BIGINT,
   `last_contact` BIGINT,
   `callsign` STRING,
   `event_time` STRING,
   `on_ground` BOOLEAN,
   `category` STRING,
   `vertical_rate` FLOAT,
   `position_source` INT,
   `current_time` STRING,
   `longitude` FLOAT
 ) WITH (
    'connector' = 'kafka',  
    'topic' = 'airplanes_state_real_time',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

Dále můžete vytvořit tabulku ADLSgen2 v Flink SQL.

Aktualizujte název kontejneru a název účtu úložiště v fragmentu kódu podrobnostmi ADLS Gen2.

CREATE TABLE adlsgen2_airplanes_state_real_time (
  `date` STRING,
  `geo_altitude` FLOAT,
  `icao24` STRING,
  `latitude` FLOAT,
  `true_track` FLOAT,
  `velocity` FLOAT,
  `spi` BOOLEAN,
  `origin_country` STRING,
  `minute` STRING,
  `squawk` STRING,
  `sensors` STRING,
  `hour` STRING,
  `baro_altitude` FLOAT,
  `time_position` BIGINT,
  `last_contact` BIGINT,
  `callsign` STRING,
  `event_time` STRING,
  `on_ground` BOOLEAN,
  `category` STRING,
  `vertical_rate` FLOAT,
  `position_source` INT,
  `current_time` STRING,
  `longitude` FLOAT
) WITH (
    'connector' = 'filesystem',
    'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
    'format' = 'json'
);

Tabulku Kafka můžete dále vložit do tabulky ADLSgen2 v Flink SQL.

snímek obrazovky ukazuje vložení tabulky Kafka do tabulky ADLSgen2.

Snímek obrazovky ukazuje, jak se ověřuje úloha streamování ve Flinku.

Kontrola jímky dat ze systému Kafka ve službě Azure Storage na webu Azure Portal

Snímek obrazovky ukazuje jímku dat ze systému Kafka ve službě Azure Storage.

Ověření Azure Storage a poznámkového bloku Azure Databricks

ADLS Gen2 poskytuje OAuth 2.0 s instančním objektem aplikace Microsoft Entra pro ověřování z poznámkového bloku Azure Databricks a pak se připojí k Azure Databricks DBFS.

Získejme ID aplikace, ID tenanta a tajný klíč aplikace služeb.

Snímek obrazovky znázorňuje appid hlavního objektu služby, ID tenanta a tajný klíč.

Udělte oprávnění Služebnímu Principálu jako Vlastník Datových Objektů Blob Úložiště na portálu Azure

snímek obrazovky ukazuje princip vlastníka dat úložiště Blob v Azure portálu.

Připojit ADLS Gen2 k DBFS v poznámkovém bloku Azure Databricks

snímek obrazovky ukazuje připojení ADLS Gen2 k DBFS v poznámkovém bloku Azure Databricks.

Připravte poznámkový blok

Pojďme napsat následující kód:

%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")

Definování kanálu Delta Live Table a spuštění v Azure Databricks

Snímek obrazovky zobrazuje pipeline Delta Live Table a její spuštění na Azure Databricks.

Snímek obrazovky ukazuje kanál tabulky Delta Live, který je spuštěný na Azure Databricks.

Kontrola Delta Live Table v poznámkovém bloku Azure Databricks

Snímek obrazovky zobrazuje kontrolu Delta Live Table v poznámkovém bloku Azure Databricks.

Odkaz