Azure Databricks Delta Lake 테이블에 Apache Flink® DataStream 통합
이 예제에서는 Azure Databricks 자동 로더를 사용하여 AKS의 HDInsight에 있는 Apache Flink 클러스터에서 Delta Lake 테이블로 Azure ADLS Gen2의 스트림 데이터를 싱크하는 방법을 보여 줍니다.
필수 구성 요소
- AKS의 HDInsight에서 Apache Flink 1.17.0
- HDInsight에서 Apache Kafka 3.2
- AKS의 HDInsight와 동일한 가상 네트워크에 있는 Azure Databricks
- ADLS Gen2 및 서비스 주체
Azure Databricks 자동 로더
Databricks 자동 로더를 사용하면 Flink 애플리케이션에서 개체 스토리지에 존재하는 데이터를 Delta Lake 테이블로 쉽게 스트리밍할 수 있습니다. 자동 로더 cloudFiles라는 구조적 스트리밍 원본을 제공합니다.
다음은 Azure Databricks 델타 라이브 테이블의 Flink에서 데이터를 사용하는 단계입니다.
Apache Flink® SQL에서 Apache Kafka® 테이블 만들기
이 단계에서는 Flink SQL에서 Kafka 테이블 및 ADLS Gen2를 만들 수 있습니다. 이 문서에서는 airplanes_state_real_time table
사용합니다. 원하는 모든 문서를 사용할 수 있습니다.
Kafka 클러스터의 브로커 IP를 코드 스니펫에서 업데이트해야 합니다.
CREATE TABLE kafka_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'kafka',
'topic' = 'airplanes_state_real_time',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
다음으로 Flink SQL에서 ADLSgen2 테이블을 만들 수 있습니다.
코드 조각의 컨테이너 이름 및 저장소 계정 이름을 귀하의 ADLS Gen2 세부 정보로 업데이트하십시오.
CREATE TABLE adlsgen2_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'filesystem',
'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
'format' = 'json'
);
또한 Flink SQL의 ADLSgen2 테이블에 Kafka 테이블을 삽입할 수 있습니다.
Flink에서 스트리밍 작업의 유효성 검사
Azure 포털의 Azure Storage에서 Kafka의 데이터 수신 확인
Azure Storage 및 Azure Databricks Notebook의 인증
ADLS Gen2는 Azure Databricks Notebook에서 인증을 위해 Microsoft Entra 애플리케이션 서비스 주체와 함께 OAuth 2.0을 제공하고 Azure Databricks DBFS에 탑재합니다.
서비스 원칙 appid, 테넌트 ID 및 비밀 키를 가져오겠습니다.
서비스 원칙에 Azure Portal의 Storage Blob 데이터 소유자 부여
Azure Databricks Notebook에서 ADLS Gen2를 DBFS에 탑재하기
노트북을 준비
다음 코드를 작성해 보겠습니다.
%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")
Delta Live Table Pipeline 정의 및 Azure Databricks에서 실행
Azure Databricks Notebook에서 델타 라이브 테이블 확인
참조
- Apache, Apache Kafka, Kafka, Apache Flink, Flink 및 관련 오픈 소스 프로젝트 명칭은 Apache Software Foundation (ASF)의 상표입니다.