Delta Live Tables 싱크를 사용하여 외부 서비스로 레코드 스트리밍
이 문서에서는 델타 라이브 테이블 sink
API와 DLT 흐름 사용하여 파이프라인으로 변환된 레코드를 Unity 카탈로그 관리 테이블 및 외부 테이블, Hive 메타스토어 테이블, Apache Kafka 또는 Azure Event Hubs와 같은 이벤트 스트리밍 서비스와 같은 외부 데이터 싱크에 쓰는 방법을 설명합니다.
Delta Live Tables의 싱크란 무엇인가요?
Delta Live Tables 싱크를 사용하면 Apache Kafka 또는 Azure Event Hubs와 같은 이벤트 스트리밍 서비스, Unity 카탈로그 또는 Hive 메타스토어에서 관리하는 외부 테이블과 같은 대상에 변환된 데이터를 쓸 수 있습니다. 이전에는 Delta Live Tables 파이프라인에서 만든 스트리밍 테이블 및 구체화된 뷰를 Azure Databricks 관리형 델타 테이블에만 유지할 수 있었습니다. 싱크를 사용하면 이제 Delta Live Tables 파이프라인의 출력을 유지하기 위한 더 많은 옵션이 제공됩니다.
Delta Live Tables 싱크는 언제 사용해야 하나요?
Databricks는 다음을 수행해야 하는 경우 델타 라이브 테이블 싱크를 사용하는 것이 좋습니다.
- 사기 감지, 실시간 분석 및 고객 권장 사항과 같은 운영 사용 사례를 구축합니다. 운영 사용 사례는 일반적으로 Apache Kafka 토픽과 같은 메시지 버스에서 데이터를 읽은 다음 대기 시간이 짧은 데이터를 처리하고 처리된 레코드를 메시지 버스에 다시 씁니다. 이 방법을 사용하면 클라우드 스토리지에서 작성하거나 읽지 않음으로써 대기 시간을 줄일 수 있습니다.
- Delta Live Tables 흐름에서 변환된 데이터를 Unity 카탈로그 관리 테이블, 외부 테이블, Hive 메타스토어 테이블을 포함하는 외부 델타 인스턴스가 관리하는 테이블에 씁니다.
- Apache Kafka 토픽과 같이 Databricks 외부의 싱크로 역방향 ETL(추출-변환-로드)을 수행합니다. 이 방법을 사용하면 Unity 카탈로그 테이블 또는 기타 Databricks 관리 스토리지 외부에서 데이터를 읽거나 사용해야 하는 사용 사례를 효과적으로 지원할 수 있습니다.
Delta Live Tables 싱크를 사용하려면 어떻게 해야 하나요?
메모
-
spark.readStream
및dlt.read_stream
사용하는 스트리밍 쿼리만 지원됩니다. 일괄 처리 쿼리는 지원되지 않습니다. - 싱크에 기록하는 데는
append_flow
만 사용할 수 있습니다.apply_changes
같은 다른 흐름은 지원되지 않습니다. - 전체 새로 고침 업데이트 실행해도 싱크에서 이전에 계산된 결과 데이터가 정리되지 않습니다. 즉, 다시 처리된 모든 데이터가 싱크에 추가되고 기존 데이터는 변경되지 않습니다.
이벤트 데이터가 스트리밍 원본에서 Delta Live Tables 파이프라인으로 수집되면 Delta Live Tables 기능을 사용하여 이 데이터를 처리 및 구체화한 다음 추가 흐름 처리를 사용하여 변환된 데이터 레코드를 Delta Live Tables 싱크로 스트리밍합니다.
create_sink()
함수를 사용하여 이 싱크를 만듭니다.
create_sink
함수 사용에 대한 자세한 내용은 싱크 API 참조참조하세요.
Delta Live Tables 싱크를 구현하려면 다음 단계를 사용합니다.
- Delta Live Tables 파이프라인을 설정하여 스트리밍 이벤트 데이터를 처리하고 Delta Live Tables 싱크에 쓸 데이터 레코드를 준비합니다.
- 기본 설정 대상 싱크 형식을 사용하도록 Delta Live Tables 싱크를 구성하고 만듭니다.
- 추가 흐름을 사용하여 준비된 레코드를 싱크에 기록합니다.
이러한 단계는 나머지 항목에서 다룹니다.
싱크에 쓰기 위한 레코드를 준비하도록 Delta Live Tables 파이프라인 설정
첫 번째 단계는 원시 이벤트 스트림 데이터를 싱크에 쓸 준비된 데이터로 변환하도록 Delta Live Tables 파이프라인을 설정하는 것입니다.
이 프로세스를 더 잘 이해하려면 Databricks의 wikipedia-datasets
샘플 데이터에서 clickstream 이벤트 데이터를 처리하는 Delta Live Tables 파이프라인의 예제를 따를 수 있습니다. 이 파이프라인은 원시 데이터 세트를 구문 분석하여 Apache Spark 설명서 페이지에 연결되는 Wikipedia 페이지를 식별하고 참조 링크가 포함된 테이블 행으로 데이터를 점진적으로 구체화합니다Apache_Spark.
이 예제에서 Delta Live Tables 파이프라인은 품질 및 처리 효율성을 향상시키기 위해 데이터를 여러 계층으로 구성하는 medallion 아키텍처사용하여 구조화됩니다.
시작하려면 자동 로더사용하여 데이터 세트의 원시 JSON 레코드를 브론즈 계층으로 로드합니다. 이 Python 코드는 원본에서 처리되지 않은 원시 데이터를 포함하는 clickstream_raw
스트리밍 테이블을 만드는 방법을 보여 줍니다.
import dlt
json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/"
@dlt.table(
comment="The raw Wikipedia clickstream dataset, ingested from databricks-datasets.",
table_properties={
"quality": "bronze"
}
)
def clickstream_raw():
return (
spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").option("inferSchema", "true").load(json_path)
)
이 코드를 실행한 후 데이터는 Medallion 아키텍처의 "bronze" (또는 "원시 데이터") 수준에 있으므로 정리해야 합니다. 다음 단계에서는 데이터 형식 및 열 이름을 정리하고 Delta Live Tables 기대치를 사용하여 데이터 무결성을 보장하는 "실버" 수준으로 데이터를 구체화합니다.
다음 코드는 브론즈 계층 데이터를 clickstream_clean
실버 테이블로 정리하고 유효성을 검사하여 이 작업을 수행하는 방법을 보여 줍니다.
@dlt.table(
comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
table_properties={
"quality": "silver"
}
)
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_clean():
return (
spark.readStream.table("clickstream_raw")
.withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
.withColumn("click_count", expr("CAST(n AS INT)"))
.withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
.withColumnRenamed("curr_title", "current_page_title")
.withColumnRenamed("prev_title", "previous_page_title")
.select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
)
파이프라인 구조의 "골드" 계층을 개발하려면 정리된 클릭스트림 데이터를 필터링하여 참조 페이지가 Apache_Spark
항목을 격리합니다. 이 마지막 코드 예제에서는 대상 싱크 테이블에 쓰는 데 필요한 열만 선택합니다.
다음 코드에서는 금색 계층을 나타내는 spark_referrers
테이블을 만드는 방법을 보여 줍니다.
@dlt.table(
comment="A table of the most common pages that link to the Apache Spark page.",
table_properties={
"quality": "gold"
}
)
def spark_referrers():
return (
spark.readStream.table("clickstream_clean")
.filter(expr("current_page_title == 'Apache_Spark'"))
.withColumnRenamed("previous_page_title", "referrer")
.select("referrer", "current_page_id", "current_page_title", "click_count")
)
이 데이터 준비 프로세스가 완료되면 정리된 레코드가 기록될 대상 싱크를 구성해야 합니다.
Delta Live Tables의 데이터 싱크 구성
Databricks는 스트림 데이터에서 처리된 레코드를 저장할 수 있는 세 가지 유형의 대상 저장소를 지원합니다.
- 델타 테이블 싱크
- Apache Kafka 데이터 수집지점
- Azure Event Hubs 데이터 수신처
다음은 Delta, Kafka 및 Azure Event Hubs 싱크에 대한 구성의 예입니다.
델타 싱크
파일 경로별로 델타 싱크를 만들려면 다음을 수행합니다.
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)
정규화된 카탈로그 및 스키마 경로를 사용하여 테이블 이름으로 델타 싱크를 만들려면 다음을 수행합니다.
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "my_catalog.my_schema.my_table" }
)
Kafka 및 Azure Event Hubs 싱크
이 코드는 Apache Kafka 및 Azure Event Hubs 싱크 모두에서 작동합니다.
topic_name = "dlt-sink"
eh_namespace_name = "dlt-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
connection_string = dbutils.secrets.get(scope="secret-lab", key="kafka-connection-string")
eh_sasl = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule' \
+ f' required username="$ConnectionString" password="{connection_string}";'
dlt.create_sink(
name = "eh_sink",
format = "kafka",
options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": eh_sasl,
"topic": topic_name
}
)
싱크가 구성되고 Delta Live Tables 파이프라인이 준비되었으므로 처리된 레코드를 싱크로 스트리밍하기 시작할 수 있습니다.
추가 흐름을 사용하여 Delta Live Tables 싱크에 기록하기
싱크를 구성한 다음 단계는 추가 흐름에서 레코드 출력의 대상으로 지정하여 처리된 레코드를 기록합니다.
append_flow
데코레이터에서 싱크 값을 target
으로 지정하여 이 작업을 수행합니다.
- Unity 카탈로그 관리 테이블 및 외부 테이블의 경우
delta
형식을 사용하고 옵션에서 경로 또는 테이블 이름을 지정합니다. Unity 카탈로그를 사용하도록 Delta Live Tables 파이프라인을 구성해야 합니다. - Apache Kafka 토픽의 경우
kafka
형식을 사용하고 옵션에서 토픽 이름, 연결 정보 및 인증 정보를 지정합니다. 이러한 옵션은 Spark 구조적 스트리밍 Kafka 싱크에서 지원하는 것과 동일한 옵션입니다. Kafka 구조적 스트리밍 기록기 구성을 참조하세요. - Azure Event Hubs의 경우
kafka
형식을 사용하고 옵션에서 Event Hubs 이름, 연결 정보 및 인증 정보를 지정합니다. 이러한 옵션은 Kafka 인터페이스를 사용하는 Spark 구조적 스트리밍 Event Hubs 싱크에서 지원되는 동일한 옵션입니다. 서비스 주체 인증: Microsoft Entra ID 및 Azure Event Hubs을 참조하세요. - Hive 메타스토어 테이블의 경우
delta
형식을 사용하고 옵션에서 경로 또는 테이블 이름을 지정합니다. Hive 메타스토어를 사용하도록 Delta Live Tables 파이프라인을 구성해야 합니다.
다음은 Delta Live Tables 파이프라인에서 처리된 레코드를 사용하여 Delta, Kafka 및 Azure Event Hubs 싱크에 쓸 흐름을 설정하는 방법의 예입니다.
델타 싱크
@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)
Kafka 및 Azure Event Hubs 싱크
@dlt.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
spark.readStream.table("spark_referrers")
.selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)
value
매개 변수는 Azure Event Hubs 싱크에 필수입니다.
key
, partition
, headers
및 topic
같은 추가 매개 변수는 선택 사항입니다.
append_flow
데코레이터에 대한 자세한 내용은 추가 흐름을 사용하여여러 원본 스트림의 스트리밍 테이블에 쓰는 방법을 참조하세요.
제한 사항
Python API만 지원됩니다. SQL은 지원되지 않습니다.
spark.readStream
및dlt.read_stream
사용하는 스트리밍 쿼리만 지원됩니다. 일괄 처리 쿼리는 지원되지 않습니다.싱크에 데이터를 쓰는 데는
append_flow
만 사용할 수 있습니다.apply_changes
같은 다른 흐름은 지원되지 않으며 Delta Live Tables 데이터 세트 정의에서 싱크를 사용할 수 없습니다. 예를 들어 다음이 지원되지 않습니다.@table("from_sink_table") def fromSink(): return read_stream("my_sink")
델타 싱크의 경우 테이블 이름은 완전하게 명시되어야 합니다. 특히 Unity 카탈로그 관리형 외부 테이블의 경우 테이블 이름은
<catalog>.<schema>.<table>
형식이어야 합니다. Hive 메타스토어의 경우<schema>.<table>
형식이어야 합니다.FullRefresh
실행해도 싱크에서 이전에 계산된 결과 데이터가 정리되지 않습니다. 즉, 다시 처리된 모든 데이터가 싱크에 추가되고 기존 데이터는 변경되지 않습니다.Delta Live Tables 기대치는 지원되지 않습니다.
리소스
- 델타 라이브 테이블 파이프라인 개발
- 델타 라이브 테이블 플로우를 사용하여 데이터를 증분 방식으로 로드 및 처리하기
- Python 싱크 API 참조