Delta Live Tables Python 언어 참조
이 문서에서는 Delta Live Tables Python 프로그래밍 인터페이스에 대한 세부 정보를 설명합니다.
SQL API에 대한 자세한 내용은 Delta Live Tables SQL 언어 참조를 참조하세요.
자동 로더 구성과 관련된 자세한 내용은 자동 로더란?을 참조하세요.
시작하기 전에
다음은 Delta Live Tables Python 인터페이스를 사용하여 파이프라인을 구현할 때 고려해야 할 중요한 사항입니다.
- Python의
table()
및view()
함수는 파이프라인 업데이트 중에 여러 번 호출되므로, 부작용이 있을 수 있는 코드(예: 데이터를 수정하거나 이메일을 보내는 코드)는 이러한 함수에 포함하지 마세요. 예기치 않은 동작을 방지하려면 데이터 세트를 정의하는 Python 함수에는 테이블이나 뷰를 정의하는 데 필요한 코드만 포함되어야 합니다. - 특히 데이터 세트를 정의하는 함수에서 이메일을 보내거나 외부 모니터링 서비스와 통합하는 등의 작업을 수행하려면 이벤트 후크를 사용합니다. 데이터 세트를 정의하는 함수에서 이러한 작업을 구현하면 예기치 않은 동작이 발생합니다.
- Python
table
및view
함수는 DataFrame을 반환해야 합니다. DataFrames에서 작동하는 일부 함수는 DataFrames를 반환하지 않으며 사용하지 않아야 합니다. 이러한 작업에는collect()
,count()
,toPandas()
,save()
및saveAsTable()
과 같은 함수가 포함됩니다. DataFrame 변환은 전체 데이터 흐름 그래프가 확인된 후에 실행되므로 이러한 작업을 사용하면 의도하지 않은 부작용이 발생할 수 있습니다.
dlt
Python 모듈 가져오기
Delta Live Tables Python 함수는 dlt
모듈에 정의되어 있습니다. Python API를 사용하여 구현된 파이프라인은 다음 모듈을 가져와야 합니다.
import dlt
Delta Live Tables 구체화된 뷰 또는 스트리밍 테이블 만들기
Python에서 Delta Live Tables는 정의 쿼리를 기반으로 데이터 세트를 구체화된 뷰로 업데이트할지 아니면 스트리밍 테이블로 업데이트할지를 결정합니다. @table
decorator를 사용하여 구체화된 뷰와 스트리밍 테이블을 모두 정의할 수 있습니다.
Python에서 구체화된 뷰를 정의하려면 데이터 원본에 대해 정적 읽기를 수행하는 쿼리에 @table
을 적용합니다. 스트리밍 테이블을 정의하려면 데이터 원본에 대해 스트리밍 읽기를 수행하는 쿼리에 @table
을 적용하거나 create_streaming_table() 함수를 사용합니다. 두 데이터 세트 형식의 구문 사양은 다음과 같습니다.
참고 항목
cluster_by
인수를 사용하여 리퀴드 클러스터링을 사용하도록 설정하려면 미리 보기 채널을 사용하도록 파이프라인을 구성해야 합니다.
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Delta Live Tables 보기 만들기
Python에서 뷰를 정의하려면 @view
decorator를 적용합니다. @table
decorator와 마찬가지로 정적 또는 스트리밍 데이터 세트에 Delta Live Tables의 보기를 사용할 수 있습니다. 다음은 Python을 사용하여 뷰를 정의하는 구문입니다.
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
예: 테이블 및 뷰 정의
Python에서 뷰 또는 테이블을 정의하려면 함수에 @dlt.view
또는 @dlt.table
decorator를 적용합니다. 함수 이름 또는 name
매개 변수를 사용하여 테이블 또는 뷰 이름을 할당할 수 있습니다. 다음 예제는 JSON 파일을 입력 원본으로 사용하는 taxi_raw
라는 뷰와 taxi_raw
뷰를 입력으로 사용하는 filtered_data
라는 테이블의 두 가지 데이터 세트를 정의합니다.
import dlt
@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return spark.read.table("LIVE.taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return spark.read.table("LIVE.taxi_raw").where(...)
예: 동일한 파이프라인에 정의된 데이터 세트에 액세스
참고 항목
dlt.read()
Delta Live Tables Python 인터페이스에서 함수와 dlt.read_stream()
함수를 계속 사용할 수 있고 완벽하게 지원되지만 Databricks는 다음과 같은 이유로 항상 함수 및 spark.readStream.table()
함수를 사용하는 spark.read.table()
것이 좋습니다.
- 이 함수는
spark
외부 스토리지의 데이터 세트 또는 다른 파이프라인에 정의된 데이터 세트를 포함하여 내부 및 외부 데이터 세트를 읽을 수 있습니다. 함수는dlt
내부 데이터 세트 읽기만 지원합니다. - 함수는
spark
읽기 작업과 같은skipChangeCommits
옵션을 지정할 수 있습니다. 옵션 지정은 함수에서dlt
지원되지 않습니다.
동일한 파이프라인에 정의된 데이터 세트에 액세스하려면 키워드를 데이터 세트 이름 앞에 추가하여 LIVE
또는 spark.readStream.table()
함수를 사용합니다spark.read.table()
.
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredA():
return spark.read.table("LIVE.customers_raw").where(...)
예: 메타스토어에 등록된 테이블에서 읽기
Hive 메타스토어에 등록된 테이블에서 데이터를 읽으려면 함수 인수에서 LIVE
키워드를 생략하고 필요에 따라 테이블 이름을 데이터베이스 이름으로 한정합니다.
@dlt.table
def customers():
return spark.read.table("sales.customers").where(...)
Unity 카탈로그 테이블에서 읽는 예제는 Unity 카탈로그 파이프라인으로 데이터 수집을 참조하세요.
예: 를 사용하여 데이터 세트에 액세스 spark.sql
쿼리 함수에서 spark.sql
식을 사용하여 데이터 세트를 반환할 수도 있습니다. 내부 데이터 세트에서 읽으려면 데이터 세트 이름 앞에 LIVE.
을 추가합니다.
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")
스트리밍 작업의 대상으로 사용할 테이블 만들기
create_streaming_table()
함수를 사용하여 apply_changes(), apply_changes_from_snapshot(), and @append_flow 출력 레코드를 비롯한 스트리밍 작업을 통해 레코드 출력에 대한 대상 테이블을 만듭니다.
참고 항목
create_target_table()
함수 및 create_streaming_live_table()
플래그는 더 이상 사용되지 않습니다. Databricks에서는 create_streaming_table()
함수를 사용하도록 기존 코드를 업데이트할 것을 권장합니다.
참고 항목
cluster_by
인수를 사용하여 리퀴드 클러스터링을 사용하도록 설정하려면 미리 보기 채널을 사용하도록 파이프라인을 구성해야 합니다.
create_streaming_table(
name = "<table-name>",
comment = "<comment>"
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
path="<storage-location-path>",
schema="schema-definition",
expect_all = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
row_filter = "row-filter-clause"
)
인수 |
---|
name 유형: str 테이블 이름. 이 매개 변수는 필수입니다. |
comment 유형: str 테이블에 대한 선택적 설명입니다. |
spark_conf 유형: dict 이 쿼리를 실행하기 위한 선택적 Spark 구성 목록입니다. |
table_properties 유형: dict 테이블에 대한 테이블 속성의 선택적 목록입니다. |
partition_cols 유형: array 테이블 분할에 사용할 하나 이상의 열에 대한 선택적 목록입니다. |
cluster_by 유형: array 필요에 따라 테이블에서 리퀴드 클러스터링을 사용하도록 설정하고 클러스터링 키로 사용할 열을 정의합니다. Delta 테이블에 Liquid 클러스터링 사용을 참조하세요. |
path 유형: str 테이블 데이터에 대한 선택적 스토리지 위치입니다. 설정하지 않으면 시스템은 기본적으로 파이프라인 스토리지 위치로 설정됩니다. |
schema 형식: str 또는 StructType 테이블에 대한 선택적 스키마 정의입니다. 스키마는 SQL DDL 문자열로 정의하거나 Python StructType . |
expect_all expect_all_or_drop expect_all_or_fail 유형: dict 테이블에 대한 선택 사항 데이터 품질 제약 조건입니다. 여러 기대치를 참고하세요. |
row_filter (공개 미리 보기)유형: str 테이블에 대한 선택적 행 필터 절입니다. 행 필터 및 열 마스크가 있는 테이블 게시를 참조하세요. |
테이블 구체화 방법 제어
테이블은 또한 구체화에 대한 추가 제어를 제공합니다.
partition_cols
를 사용하여 테이블을 분할하는 방법을 지정합니다. 분할을 사용하여 쿼리 속도를 높일 수 있습니다.- 뷰 또는 테이블을 정의할 때 테이블 속성을 설정할 수 있습니다. Delta Live Tables 테이블 속성을 참조하세요.
path
설정을 사용하여 테이블 데이터의 스토리지 위치를 설정합니다. 기본적으로 테이블 데이터는path
이 설정되지 않은 경우 파이프라인 스토리지 위치에 저장됩니다.- 스키마 정의에서 생성된 열을 사용할 수 있습니다. 예제: 스키마 및 파티션 열 지정을 참고하세요.
참고 항목
크기가 1TB 미만인 테이블의 경우 Databricks는 Delta Live Tables가 데이터 조직을 제어하도록 하는 것이 좋습니다. 테이블이 테라바이트 이상으로 증가할 것으로 예상하지 않는 한 파티션 열을 지정해서는 안 됩니다.
예제: 스키마 및 파티션 열 지정을 참고하세요
선택적으로 Python StructType
또는 SQL DDL 문자열을 사용하여 테이블 스키마를 지정할 수 있습니다. DDL 문자열로 지정하면 정의에 생성된 열이 포함될 수 있습니다.
다음 예제에서는 sales
Python으로 명시적 지정된 스키마를 사용하여 StructType
라는 테이블을 만듭니다.
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
다음 예제에서는 DDL 문자열을 사용하여 테이블에 대한 스키마를 지정하고, 생성된 열을 정의하고, 파티션 열을 정의합니다.
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
기본적으로 Delta Live Tables는 스키마를 지정하지 않으면 table
정의에서 스키마를 유추합니다.
원본 스트리밍 테이블의 변경 내용을 무시하도록 스트리밍 테이블 구성
참고 항목
skipChangeCommits
플래그는spark.readStream
함수를 사용하여option()
과만 작동합니다.dlt.read_stream()
함수에서는 이 플래그를 사용할 수 없습니다.- 원본 스트리밍 테이블이 apply_changes() 함수의 대상으로 정의되면
skipChangeCommits
플래그를 사용할 수 없습니다.
기본적으로 스트리밍 테이블에는 추가 전용 원본이 필요합니다. 스트리밍 테이블이 다른 스트리밍 테이블을 원본으로 사용하고 원본 스트리밍 테이블에 업데이트 또는 삭제가 필요한 경우(예: GDPR "잊혀질 권리" 처리) 이러한 변경 내용을 무시하기 위해 원본 스트리밍 테이블을 읽을 때 skipChangeCommits
플래그를 설정할 수 있습니다. 이 플래그에 대한 자세한 내용은 업데이트 및 삭제 무시를 참조하세요.
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")
예: 테이블 제약 조건 정의
Important
테이블 제약 조건은 공개 미리 보기로 제공됩니다.
스키마를 지정할 때 기본 키와 외래 키를 정의할 수 있습니다. 이러한 제약 조건은 정보 제공용일 뿐이며 적용되지 않습니다. SQL 언어 참조의 제약 조건 절을 참조하세요.
다음 예제에서는 기본 및 외래 키 제약 조건이 있는 테이블을 정의합니다.
@dlt.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
"""
def sales():
return ("...")
예: 행 필터 및 열 마스크 정의
Important
행 필터 및 열 마스크는 공개 미리 보기로 제공됩니다.
행 필터 및 열 마스크를 사용하여 구체화된 뷰 또는 스트리밍 테이블을 만들려면 ROW FILTER 절과 MASK 절을 사용합니다. 다음 예제에서는 행 필터와 열 마스크를 모두 사용하여 구체화된 뷰와 스트리밍 테이블을 정의하는 방법을 보여 줍니다.
@dlt.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
return ("...")
행 필터 및 열 마스크에 대한 자세한 내용은 행 필터 및 열 마스크가 있는 테이블 게시를 참조하세요.
Python Delta Live Tables 속성
다음 표에서는 Delta Live Tables를 사용하여 테이블 및 뷰를 정의하는 동안 지정할 수 있는 옵션 및 속성에 대해 설명합니다.
참고 항목
cluster_by
인수를 사용하여 리퀴드 클러스터링을 사용하도록 설정하려면 미리 보기 채널을 사용하도록 파이프라인을 구성해야 합니다.
@table 또는 @view |
---|
name 유형: str 테이블 또는 뷰의 선택적 이름입니다. 정의되지 않은 경우 함수 이름이 테이블 또는 뷰 이름으로 사용됩니다. |
comment 유형: str 테이블에 대한 선택적 설명입니다. |
spark_conf 유형: dict 이 쿼리를 실행하기 위한 선택적 Spark 구성 목록입니다. |
table_properties 유형: dict 테이블에 대한 테이블 속성의 선택적 목록입니다. |
path 유형: str 테이블 데이터에 대한 선택적 스토리지 위치입니다. 설정하지 않으면 시스템은 기본적으로 파이프라인 스토리지 위치로 설정됩니다. |
partition_cols 유형: a collection of str 선택 사항 컬렉션(예: 테이블 분할에 사용할 하나 이상의 list 열 중 하나)입니다. |
cluster_by 유형: array 필요에 따라 테이블에서 리퀴드 클러스터링을 사용하도록 설정하고 클러스터링 키로 사용할 열을 정의합니다. Delta 테이블에 Liquid 클러스터링 사용을 참조하세요. |
schema 형식: str 또는 StructType 테이블에 대한 선택적 스키마 정의입니다. 스키마는 SQL DDL 문자열 또는 Python StructType 으로 정의할 수 있습니다 |
temporary 유형: bool 테이블을 만들지만 테이블에 대한 메타데이터는 게시하지 않습니다. temporary 키워드는 Delta Live Tables에 파이프라인에서 사용할 수 있지만 파이프라인 외부에서 액세스해서는 안 되는 테이블을 만들도록 지시합니다. 처리 시간을 줄이기 위해 임시 테이블은 단일 업데이트가 아니라 해당 테이블을 만드는 파이프라인의 수명 동안 유지됩니다.기본값은 ‘False’입니다. |
row_filter (공개 미리 보기)유형: str 테이블에 대한 선택적 행 필터 절입니다. 행 필터 및 열 마스크가 있는 테이블 게시를 참조하세요. |
테이블 또는 뷰 정의 |
---|
def <function-name>() 데이터 세트를 정의하는 Python 함수입니다. name 매개 변수가 설정되지 않은 경우 <function-name> 이 대상 데이터 세트 이름으로 사용됩니다. |
query Spark Dataset 또는 Koalas DataFrame을 반환하는 Spark SQL 문입니다. dlt.read() 또는 spark.read.table() 을 사용하여 동일한 파이프라인에 정의된 데이터 세트에서 전체 읽기를 수행합니다. 외부 데이터 세트를 읽으려면 함수를 spark.read.table() 사용합니다. 외부 데이터 세트를 읽는 데 사용할 dlt.read() 수 없습니다. spark.read.table() 현재 파이프라인 외부에서 정의된 데이터 세트인 내부 데이터 세트를 읽는 데 사용할 수 있으며 데이터 읽기 옵션을 지정할 수 있으므로 Databricks는 함수 대신 dlt.read() 사용하는 것이 좋습니다.함수를 spark.read.table() 사용하여 동일한 파이프라인에 정의된 데이터 세트에서 읽는 경우 함수 인수의 LIVE 데이터 세트 이름 앞에 키워드를 추가합니다. 예를 들어, customers 라는 데이터 세트에서 읽으려면 다음을 수행합니다.spark.read.table("LIVE.customers") 또한 spark.read.table() 함수를 사용하여 LIVE 키워드를 생략하고 필요에 따라 테이블 이름을 데이터베이스 이름으로 한정하여 메타스토어에 등록된 테이블에서 읽을 수 있습니다.spark.read.table("sales.customers") 동일한 파이프라인에 정의된 데이터 세트에서 읽은 스트리밍을 사용 dlt.read_stream() 하거나 spark.readStream.table() 수행합니다. 외부 데이터 세트에서 읽은 스트리밍을 수행하려면 다음을 사용합니다.spark.readStream.table() 기능. spark.readStream.table() 현재 파이프라인 외부에서 정의된 데이터 세트인 내부 데이터 세트를 읽는 데 사용할 수 있으며 데이터 읽기 옵션을 지정할 수 있으므로 Databricks는 함수 대신 dlt.read_stream() 사용하는 것이 좋습니다.SQL 구문을 사용하여 Delta Live Tables table 함수에서 쿼리를 정의하려면 이 함수를 spark.sql 사용합니다. 예제 : spark.sql 사용하여 데이터 세트에 액세스합니다. Python을 사용하여 Delta Live Tables table 함수에서 쿼리를 정의하려면 PySpark 구문을 사용합니다. |
기대치 |
---|
@expect("description", "constraint") 다음으로 식별되는 데이터 품질 제약 조건을 선언합니다 description . 행이 예상을 위반하는 경우 대상 데이터 세트에 행을 포함합니다. |
@expect_or_drop("description", "constraint") 다음으로 식별되는 데이터 품질 제약 조건을 선언합니다 description . 행이 예상을 위반하는 경우 대상 데이터 세트에서 행을 삭제합니다. |
@expect_or_fail("description", "constraint") 다음으로 식별되는 데이터 품질 제약 조건을 선언합니다 description . 행이 예상을 위반하는 경우 즉시 실행을 중지합니다. |
@expect_all(expectations) 하나 이상의 데이터 품질 제약 조건을 선언합니다. expectations 는 키가 예상 설명이고 값이 예상 제약 조건인 Python 사전입니다. 행이 예상을 위반하는 경우 대상 데이터 세트에 행을 포함합니다. |
@expect_all_or_drop(expectations) 하나 이상의 데이터 품질 제약 조건을 선언합니다. expectations 는 키가 예상 설명이고 값이 예상 제약 조건인 Python 사전입니다. 행이 예상을 위반하는 경우 대상 데이터 세트에서 행을 삭제합니다. |
@expect_all_or_fail(expectations) 하나 이상의 데이터 품질 제약 조건을 선언합니다. expectations 는 키가 예상 설명이고 값이 예상 제약 조건인 Python 사전입니다. 행이 예상을 위반하는 경우 즉시 실행을 중지합니다. |
Delta Live Tables에서 Python을 사용하여 변경 피드에서 데이터 캡처 변경
Python API의 apply_changes()
함수를 사용하여 Delta Live Tables CDC(변경 데이터 캡처) 기능을 사용하여 CDF(변경 데이터 피드)에서 원본 데이터를 처리합니다.
Important
변경 내용을 적용하려면 대상 스트리밍 테이블을 선언해야 합니다. 필요에 따라 대상 테이블에 대한 스키마를 지정할 수 있습니다. apply_changes()
대상 테이블의 스키마를 지정할 때 sequence_by
필드와 데이터 형식이 동일한 __START_AT
및 __END_AT
열도 포함해야 합니다.
필요한 대상 테이블을 만들려면 Delta Live Tables Python 인터페이스에서 create_streaming_table() 함수를 사용할 수 있습니다.
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
참고 항목
APPLY CHANGES
처리 시, INSERT
및 UPDATE
이벤트의 기본 동작은 원본의 CDC 이벤트를 upsert하는 것입니다. 즉, 대상 테이블에서 지정된 키와 일치하는 모든 행을 업데이트하거나, 대상 테이블에 일치하는 레코드가 없는 경우 새 행을 삽입합니다. DELETE
이벤트에 대한 처리는 APPLY AS DELETE WHEN
조건을 사용하여 지정할 수 있습니다.
변경 피드를 사용한 CDC 처리에 대한 자세한 내용은 APPLY CHANGES API: Delta Live Tables를 사용하여 변경 데이터 캡처 간소화를 참조하세요. 함수를 사용하는 apply_changes()
예제는 예제: CDF 원본 데이터를 사용한 SCD 형식 1 및 SCD 형식 2 처리를 참조하세요.
Important
변경 내용을 적용하려면 대상 스트리밍 테이블을 선언해야 합니다. 필요에 따라 대상 테이블에 대한 스키마를 지정할 수 있습니다. apply_changes
대상 테이블 스키마를 지정할 때 sequence_by
필드와 데이터 형식이 동일한 __START_AT
및 __END_AT
열도 포함해야 합니다.
APPLY CHANGES API: Delta Live Tables을 사용하여 변경 데이터 캡처 간소화를 참조하세요.
인수 |
---|
target 유형: str 업데이트할 테이블의 이름입니다. apply_changes() 함수를 실행하기 전에 create_streaming_table() 함수를 사용하여 대상 테이블을 만들 수 있습니다.이 매개 변수는 필수입니다. |
source 유형: str CDC 레코드를 포함하는 데이터 원본입니다. 이 매개 변수는 필수입니다. |
keys 유형: list 원본 데이터의 행을 고유하게 식별하는 열 또는 열의 조합입니다. 대상 테이블에서 특정 레코드에 적용되는 CDC 이벤트를 식별하는 데 사용됩니다. 다음 중 하나를 지정할 수 있습니다. - 문자열 목록: ["userId", "orderId"] - Spark SQL col() 함수 목록: [col("userId"), col("orderId"] col() 함수의 인수는 한정자를 포함할 수 없습니다. 예를 들어, col(userId) 는 사용할 수 있지만 col(source.userId) 는 사용할 수 없습니다.이 매개 변수는 필수입니다. |
sequence_by 형식: str 또는 col() 원본 데이터에서 CDC 이벤트의 논리적 순서를 지정하는 열 이름입니다. Delta Live Tables는 이 시퀀싱을 사용하여 순서가 맞지 않게 도착한 변경 이벤트를 처리합니다. 다음 중 하나를 지정할 수 있습니다. - 문자열: "sequenceNum" - Spark SQL col() 함수: col("sequenceNum") col() 함수의 인수는 한정자를 포함할 수 없습니다. 예를 들어, col(userId) 는 사용할 수 있지만 col(source.userId) 는 사용할 수 없습니다.지정된 열은 정렬 가능한 데이터 형식이어야 합니다. 이 매개 변수는 필수입니다. |
ignore_null_updates 유형: bool 대상 열의 하위 집합을 포함하는 업데이트를 수집할 수 있습니다. CDC 이벤트가 기존 행과 일치하고 ignore_null_updates 가 True 인 경우, null 을 포함하는 열은 대상에서 기존 값으로 유지됩니다. null 값을 갖는 중첩된 열에도 적용됩니다. ignore_null_updates 가 False 인 경우 기존 값이 null 값으로 덮어써집니다.이 매개 변수는 선택 사항입니다. 기본값은 False 입니다. |
apply_as_deletes 형식: str 또는 expr() CDC 이벤트를 upsert가 아닌 DELETE 로 취급해야 하는 경우를 지정합니다. 순서가 맞지 않는 데이터를 처리하기 위해, 삭제된 행은 기본 Delta 테이블에서 일시적으로 삭제 표식으로 유지되고 메타스토어에서 이러한 삭제 표식이 제외된 뷰가 만들어집니다. 보존 간격은pipelines.cdc.tombstoneGCThresholdInSeconds table 속성입니다.다음 중 하나를 지정할 수 있습니다. - 문자열: "Operation = 'DELETE'" - Spark SQL expr() 함수: expr("Operation = 'DELETE'") 이 매개 변수는 선택 사항입니다. |
apply_as_truncates 형식: str 또는 expr() CDC 이벤트가 전체 테이블 TRUNCATE 로 처리되어야 하는 경우를 지정합니다. 이 절은 대상 테이블의 전체 자르기를 트리거하므로 이 기능이 필요한 특정 사용 사례에만 사용해야 합니다.apply_as_truncates 매개 변수는 SCD 형식 1에 대해서만 지원됩니다. SCD 형식 2는 자르기 작업을 지원하지 않습니다.다음 중 하나를 지정할 수 있습니다. - 문자열: "Operation = 'TRUNCATE'" - Spark SQL expr() 함수: expr("Operation = 'TRUNCATE'") 이 매개 변수는 선택 사항입니다. |
column_list except_column_list 유형: list 대상 테이블에 포함할 열의 하위 집합입니다. 포함할 열의 전체 목록을 지정하려면 column_list 를 사용합니다. 제외할 열을 지정하려면 except_column_list 를 사용합니다. 값은 문자열 목록이나 Spark SQL col() 함수 목록으로 지정할 수 있습니다.- column_list = ["userId", "name", "city"] .- column_list = [col("userId"), col("name"), col("city")] - except_column_list = ["operation", "sequenceNum"] - except_column_list = [col("operation"), col("sequenceNum") col() 함수의 인수는 한정자를 포함할 수 없습니다. 예를 들어, col(userId) 는 사용할 수 있지만 col(source.userId) 는 사용할 수 없습니다.이 매개 변수는 선택 사항입니다. 기본값은 함수에 column_list 또는 except_column_list 인수가 전달되지 않은 경우 대상 테이블의 모든 열을 포함하는 것입니다. |
stored_as_scd_type 형식: str 또는 int 레코드를 SCD 형식 1 또는 SCD 형식 2로 저장할지 여부. SCD 유형 1의 경우 1 , SCD 유형 2의 경우 2 로 설정합니다.이 절은 옵션입니다. 기본값은 SCD 형식 1입니다. |
track_history_column_list track_history_except_column_list 유형: list 대상 테이블의 기록에 대해 추적할 출력 열의 하위 집합입니다. track_history_column_list 를 사용하여 추적할 열의 전체 목록을 지정합니다. 사용할 용어track_history_except_column_list 를 사용하여 추적에서 제외할 열을 지정합니다. 값은 문자열 목록이나 Spark SQL col() 함수 목록으로 지정할 수 있습니다.- track_history_column_list = ["userId", "name", "city"] .- track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") col() 함수의 인수는 한정자를 포함할 수 없습니다. 예를 들어, col(userId) 는 사용할 수 있지만 col(source.userId) 는 사용할 수 없습니다.이 매개 변수는 선택 사항입니다. 기본값은 함수에 track_history_column_list 또는track_history_except_column_list 인수가 전달되지 않은 경우 대상 테이블의 모든 열을 포함하는 것입니다. |
Delta Live Tables에서 Python을 사용하여 데이터베이스 스냅샷에서 데이터 캡처 변경
Important
APPLY CHANGES FROM SNAPSHOT
API는 공개 미리 보기로 제공됩니다.
Python API의 apply_changes_from_snapshot()
함수를 사용하여 Delta Live Tables CDC(변경 데이터 캡처) 기능을 사용하여 데이터베이스 스냅샷에서 원본 데이터를 처리합니다.
Important
변경 내용을 적용하려면 대상 스트리밍 테이블을 선언해야 합니다. 필요에 따라 대상 테이블에 대한 스키마를 지정할 수 있습니다. apply_changes_from_snapshot()
대상 테이블의 스키마를 지정할 때 sequence_by
필드와 데이터 형식이 동일한 __START_AT
및 __END_AT
열도 포함해야 합니다.
필요한 대상 테이블을 만들려면 Delta Live Tables Python 인터페이스에서 create_streaming_table() 함수를 사용할 수 있습니다.
apply_changes_from_snapshot(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
) -> None
참고 항목
APPLY CHANGES FROM SNAPSHOT
처리를 위해 동일한 키를 가진 일치하는 레코드가 대상에 없는 경우 새 행을 삽입하는 것이 기본 동작입니다. 일치하는 레코드가 있는 경우 행의 값이 변경된 경우에만 업데이트됩니다. 대상에 키가 있지만 더 이상 원본에 없는 행이 삭제됩니다.
스냅샷을 사용한 CDC 처리에 대한 자세한 내용은 APPLY CHANGES API: Delta Live Tables를 사용하여 변경 데이터 캡처 간소화를 참조하세요. apply_changes_from_snapshot()
함수를 사용하는 예제는 주기적인 스냅샷 수집 및 기록 스냅샷 수집 예제를 참조하세요.
인수 |
---|
target 유형: str 업데이트할 테이블의 이름입니다. apply_changes() 함수를 실행하기 전에 create_streaming_table() 함수를 사용하여 대상 테이블을 만들 수 있습니다.이 매개 변수는 필수입니다. |
source 형식: str 또는 lambda function 주기적으로 스냅샷할 테이블 또는 뷰의 이름 또는 처리할 스냅샷 DataFrame 및 스냅샷 버전을 반환하는 Python 람다 함수입니다. 원본 인수 구현을 참조하세요. 이 매개 변수는 필수입니다. |
keys 유형: list 원본 데이터의 행을 고유하게 식별하는 열 또는 열의 조합입니다. 대상 테이블에서 특정 레코드에 적용되는 CDC 이벤트를 식별하는 데 사용됩니다. 다음 중 하나를 지정할 수 있습니다. - 문자열 목록: ["userId", "orderId"] - Spark SQL col() 함수 목록: [col("userId"), col("orderId"] col() 함수의 인수는 한정자를 포함할 수 없습니다. 예를 들어, col(userId) 는 사용할 수 있지만 col(source.userId) 는 사용할 수 없습니다.이 매개 변수는 필수입니다. |
stored_as_scd_type 형식: str 또는 int 레코드를 SCD 형식 1 또는 SCD 형식 2로 저장할지 여부. SCD 유형 1의 경우 1 , SCD 유형 2의 경우 2 로 설정합니다.이 절은 옵션입니다. 기본값은 SCD 형식 1입니다. |
track_history_column_list track_history_except_column_list 유형: list 대상 테이블의 기록에 대해 추적할 출력 열의 하위 집합입니다. track_history_column_list 를 사용하여 추적할 열의 전체 목록을 지정합니다. 사용할 용어track_history_except_column_list 를 사용하여 추적에서 제외할 열을 지정합니다. 값은 문자열 목록이나 Spark SQL col() 함수 목록으로 지정할 수 있습니다.- track_history_column_list = ["userId", "name", "city"] .- track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") col() 함수의 인수는 한정자를 포함할 수 없습니다. 예를 들어, col(userId) 는 사용할 수 있지만 col(source.userId) 는 사용할 수 없습니다.이 매개 변수는 선택 사항입니다. 기본값은 함수에 track_history_column_list 또는track_history_except_column_list 인수가 전달되지 않은 경우 대상 테이블의 모든 열을 포함하는 것입니다. |
source
인수 구현
apply_changes_from_snapshot()
함수에는 source
인수가 포함됩니다. 기록 스냅샷을 처리하기 위해 source
인수는 apply_changes_from_snapshot()
함수에 처리할 스냅샷 데이터가 포함된 Python DataFrame과 스냅샷 버전이라는 두 값을 반환하는 Python 람다 함수여야 합니다.
다음은 람다 함수의 서명입니다.
lambda Any => Optional[(DataFrame, Any)]
- 람다 함수의 인수는 가장 최근에 처리된 스냅샷 버전입니다.
- 람다 함수의 반환 값
None
또는 튜플이며 튜플의 첫 번째 값은 처리할 스냅샷을 포함하는 DataFrame입니다. 튜플의 두 번째 값은 스냅샷의 논리적 순서를 나타내는 스냅샷 버전입니다.
람다 함수를 구현하고 호출하는 예제:
def next_snapshot_and_version(latest_snapshot_version):
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
apply_changes_from_snapshot(
# ...
source = next_snapshot_and_version,
# ...
)
Delta Live Tables 런타임은 apply_changes_from_snapshot()
함수가 포함된 파이프라인이 트리거될 때마다 다음 단계를 수행합니다.
next_snapshot_and_version
함수를 실행하여 다음 스냅샷 DataFrame 및 해당 스냅샷 버전을 로드합니다.- DataFrame이 반환되지 않으면 실행이 종료되고 파이프라인 업데이트가 완료된 것으로 표시됩니다.
- 새 스냅샷의 변경 내용을 검색하고 점진적으로 대상 테이블에 적용합니다.
- 1단계로 돌아와서 다음 스냅샷 및 해당 버전을 로드합니다.
제한 사항
Delta Live Tables Python 인터페이스에는 다음과 같은 제한 사항이 있습니다.
pivot()
함수는 지원되지 않습니다. Spark에서 pivot
작업을 수행하려면 출력 스키마를 계산하기 위해 입력 데이터를 즉시 로드해야 합니다. 이 기능은 Delta Live Tables에서 지원되지 않습니다.