다음을 통해 공유


Azure Databricks Delta Lake 테이블에 Apache Flink® DataStream 통합

이 예제에서는 Azure Databricks 자동 로더를 사용하여 AKS의 HDInsight에 있는 Apache Flink 클러스터에서 Delta Lake 테이블로 Azure ADLS Gen2의 스트림 데이터를 싱크하는 방법을 보여 줍니다.

필수 구성 요소

Azure Databricks 자동 로더

Databricks 자동 로더를 사용하면 Flink 애플리케이션에서 개체 스토리지에 존재하는 데이터를 Delta Lake 테이블로 쉽게 스트리밍할 수 있습니다. 자동 로더 cloudFiles라는 구조적 스트리밍 원본을 제공합니다.

다음은 Azure Databricks 델타 라이브 테이블의 Flink에서 데이터를 사용하는 단계입니다.

이 단계에서는 Flink SQL에서 Kafka 테이블 및 ADLS Gen2를 만들 수 있습니다. 이 문서에서는 airplanes_state_real_time table사용합니다. 원하는 모든 문서를 사용할 수 있습니다.

Kafka 클러스터의 브로커 IP를 코드 스니펫에서 업데이트해야 합니다.

CREATE TABLE kafka_airplanes_state_real_time (
   `date` STRING,
   `geo_altitude` FLOAT,
   `icao24` STRING,
   `latitude` FLOAT,
   `true_track` FLOAT,
   `velocity` FLOAT,
   `spi` BOOLEAN,
   `origin_country` STRING,
   `minute` STRING,
   `squawk` STRING,
   `sensors` STRING,
   `hour` STRING,
   `baro_altitude` FLOAT,
   `time_position` BIGINT,
   `last_contact` BIGINT,
   `callsign` STRING,
   `event_time` STRING,
   `on_ground` BOOLEAN,
   `category` STRING,
   `vertical_rate` FLOAT,
   `position_source` INT,
   `current_time` STRING,
   `longitude` FLOAT
 ) WITH (
    'connector' = 'kafka',  
    'topic' = 'airplanes_state_real_time',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

다음으로 Flink SQL에서 ADLSgen2 테이블을 만들 수 있습니다.

코드 조각의 컨테이너 이름 및 저장소 계정 이름을 귀하의 ADLS Gen2 세부 정보로 업데이트하십시오.

CREATE TABLE adlsgen2_airplanes_state_real_time (
  `date` STRING,
  `geo_altitude` FLOAT,
  `icao24` STRING,
  `latitude` FLOAT,
  `true_track` FLOAT,
  `velocity` FLOAT,
  `spi` BOOLEAN,
  `origin_country` STRING,
  `minute` STRING,
  `squawk` STRING,
  `sensors` STRING,
  `hour` STRING,
  `baro_altitude` FLOAT,
  `time_position` BIGINT,
  `last_contact` BIGINT,
  `callsign` STRING,
  `event_time` STRING,
  `on_ground` BOOLEAN,
  `category` STRING,
  `vertical_rate` FLOAT,
  `position_source` INT,
  `current_time` STRING,
  `longitude` FLOAT
) WITH (
    'connector' = 'filesystem',
    'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
    'format' = 'json'
);

또한 Flink SQL의 ADLSgen2 테이블에 Kafka 테이블을 삽입할 수 있습니다.

스크린샷은 ADLSgen2 테이블에 Kafka 테이블 삽입을 보여 줍니다.

스크린샷은 Flink에서 스트리밍 작업의 유효성을 검사하는 것을 보여줍니다.

Azure 포털의 Azure Storage에서 Kafka의 데이터 수신 확인

스크린샷은 Azure Storage의 Kafka에서 데이터 싱크를 확인하는 방법을 보여 줍니다.

Azure Storage 및 Azure Databricks Notebook의 인증

ADLS Gen2는 Azure Databricks Notebook에서 인증을 위해 Microsoft Entra 애플리케이션 서비스 주체와 함께 OAuth 2.0을 제공하고 Azure Databricks DBFS에 탑재합니다.

서비스 원칙 appid, 테넌트 ID 및 비밀 키를 가져오겠습니다.

스크린샷은 서비스 주체 App ID, 테넌트 ID 및 비밀 키를 보여줍니다.

서비스 원칙에 Azure Portal의 Storage Blob 데이터 소유자 부여

스크린샷은 Azure Portal의 Storage Blob 데이터 소유자 서비스 원칙을 보여 줍니다.

Azure Databricks Notebook에서 ADLS Gen2를 DBFS에 탑재하기

스크린샷은 Azure Databricks Notebook에서 ADLS Gen2를 DBFS에 탑재하는 것을 보여 줍니다.

노트북을 준비

다음 코드를 작성해 보겠습니다.

%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")

Delta Live Table Pipeline 정의 및 Azure Databricks에서 실행

스크린샷은 델타 라이브 테이블 파이프라인을 보여 줍니다. Azure Databricks에서 실행됩니다.

스크린샷은 델타 라이브 테이블 파이프라인을 보여 줍니다. Azure Databricks에서 실행됩니다.

Azure Databricks Notebook에서 델타 라이브 테이블 확인

스크린샷은 Azure Databricks Notebook에서 델타 라이브 테이블을 확인하는 것을 보여 줍니다.

참조

  • Apache, Apache Kafka, Kafka, Apache Flink, Flink 및 관련 오픈 소스 프로젝트 명칭은 Apache Software Foundation (ASF)의 상표입니다.