Delta Live Tables Python 語言參考
本文提供差異即時資料表 Python 程式設計介面的詳細資料。
如需 SQL API 的相關資訊,請參閱差異即時資料表 SQL 語言參考。
如需設定自動載入器的特定詳細資料,請參閱什麼是自動載入器?。
開始之前
當您使用差異即時資料表Python 介面實作管線時,以下是重要的考量:
- 由於 Python
table()
和view()
函式會在管線更新的規劃和執行期間多次叫用,因此請勿在其中一個可能有副作用的函式中包含程式碼 (例如,修改資料或傳送電子郵件的程式碼)。 為了避免非預期的行為,定義資料集的 Python 函式應該只包含定義資料表或檢視所需的程式碼。 - 若要執行像是傳送電子郵件或與外部監視服務整合等作業,特別是在定義資料集的函式中,請使用事件勾點。 在定義資料集的函式中實作這些作業會導致非預期的行為。
- Python
table
和view
函式必須傳回 DataFrame。 某些在 DataFrame 上運作的函式不會傳回 DataFrame,而且不應該使用。 這些作業包括collect()
、count()
、toPandas()
、save()
和saveAsTable()
等函式。 由於 DataFrame 轉換會在解析完整資料流程圖形之後執行,因此使用這類作業可能會有非預期的副作用。
匯入 dlt
Python 模組
差異即時資料表 Python 函式定義於 dlt
模組中。 使用 Python API 實作的管線必須匯入此課程模組:
import dlt
建立差異即時資料表具體化檢視或串流資料表
在 Python 中,差異即時資料表會根據定義查詢來決定將資料集更新為具體化檢視或串流資料表。 @table
裝飾項目可用來定義具體化檢視和串流資料表。
若要在 Python 中定義具體化檢視,請將 @table
套用至對資料來源執行靜態讀取的查詢。 若要定義串流資料表,請將 @table
套用至對資料來源執行串流讀取的查詢,或使用 create_streaming_table () 函式。 這兩種資料集類型都有相同的語法規格,如下所示:
注意
若要使用 cluster_by
參數啟用液體叢集,您的管線必須設定為使用預覽通道
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
建立差異即時資料表檢視
若要在 Python 中定義檢視,請套用 @view
裝飾項目。 @table
如同裝飾項目,您可以在差異即時資料表中針對靜態或串流資料集使用檢視。 以下是使用 Python 定義檢視的語法:
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
定義資料表和檢視
若要在 Python 中定義資料表或檢視,請將 @dlt.view
或 @dlt.table
裝飾項目套用至函式。 您可以使用函式名稱或 name
參數來指派資料表或檢視名稱。 下列範例會定義兩個不同的資料集:一個稱為 taxi_raw
的檢視,其接受 JSON 檔案做為輸入來源,而名為 filtered_data
的資料表會接受 taxi_raw
檢視做為輸入:
import dlt
@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return spark.read.table("LIVE.taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return spark.read.table("LIVE.taxi_raw").where(...)
範例:存取在相同管線中定義的資料集
注意
dlt.read()
雖然 Delta Live Tables Python 介面仍然提供 和 dlt.read_stream()
函式,但 Databricks 建議一律使用 spark.read.table()
和 spark.readStream.table()
函式,原因如下:
- 函
spark
式支援讀取內部和外部數據集,包括外部記憶體中的數據集,或在其他管線中定義。 函式dlt
僅支援讀取內部數據集。 - 函
spark
式支援指定選項,例如skipChangeCommits
讀取作業。 函式不支援dlt
指定選項。
若要存取在相同管線中定義的數據集,請使用 spark.read.table()
或 spark.readStream.table()
函式,在數據集名稱前面加上 LIVE
關鍵詞:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredA():
return spark.read.table("LIVE.customers_raw").where(...)
範例:從中繼存放區中註冊的資料表讀取
若要從 Hive 中繼存放區中註冊的資料表讀取資料,請在函式引數中省略 LIVE
關鍵字,並選擇性地使用資料庫名稱限定資料表名稱:
@dlt.table
def customers():
return spark.read.table("sales.customers").where(...)
如需從 Unity 目錄資料表讀取的範例,請參閱將資料內嵌至 Unity 目錄管線。
範例:使用 存取數據集 spark.sql
您也可以在查詢函式中使用 spark.sql
運算式傳回資料集。 若要從內部資料集讀取,請在資料集名稱前面加上 LIVE.
:
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")
建立資料表以作為串流作業的目標
使用 create_streaming_table()
函式,透過串流作業建立記錄輸出的目標資料表,包括 apply_changes ()、apply_changes_from_snapshot () 和 @append_flow 輸出記錄。
注意
create_target_table()
和 create_streaming_live_table()
函式已被取代。 Databricks 建議更新現有程式碼以使用 create_streaming_table()
函式。
注意
若要使用 cluster_by
參數啟用液體叢集,您的管線必須設定為使用預覽通道
create_streaming_table(
name = "<table-name>",
comment = "<comment>"
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
path="<storage-location-path>",
schema="schema-definition",
expect_all = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
row_filter = "row-filter-clause"
)
引數 |
---|
name 類型: str (英文)資料表名稱。 此為必要參數。 |
comment 類型: str (英文)資料表的選用描述。 |
spark_conf 類型: dict (英文)執行此查詢的 Spark 組態選用清單。 |
table_properties 類型: dict (英文)資料表之資料表屬性的選用清單。 |
partition_cols 類型: array (英文)用於分割資料表的一個或多個資料行的選用清單。 |
cluster_by 類型: array (英文)選擇性地在資料表上啟用液體群集,並定義要當做叢集索引鍵使用的資料行。 請參閱<針對差異資料表使用液態叢集>。 |
path 類型: str (英文)資料表資料的選用儲存位置。 如果未設定,系統會預設為管線儲存位置。 |
schema 類型: str 或 StructType 資料表的選用結構描述定義。 結構描述可定義為 SQL DDL 字串或使用 Python StructType . |
expect_all expect_all_or_drop expect_all_or_fail 類型: dict (英文)資料表的選用資料品質條件約束。 請參閱多個期望。 |
row_filter (公開預覽)類型: str (英文)資料表的可選資料列篩選子句。 請參閱發佈具有資料列篩選和資料行遮罩的資料表。 |
控制資料表具體化的方式
資料表也提供其具體化的額外控制:
- 指定如何使用
partition_cols
來分割資料表。 您可以使用資料分割來加速查詢。 - 您可以在定義檢視或資料表時設定資料表屬性。 請參閱差異即時資料表資料表屬性。
- 使用
path
設定來設定資料表資料的儲存位置。 如果path
未設定,資料表資料預設會儲存在管線儲存位置。 - 您可以在結構描述定義中使用產生的資料行。 請參閱範例:指定結構描述和資料分割資料行。
注意
對於大小小於 1 TB 的資料表,Databricks 建議讓差異即時資料表控制資料組織。 除非您預期資料表會成長到超過 1 TB,否則您不應該指定資料分割資料行。
範例:指定結構描述和資料分割資料行
您可以選擇性地使用 Python StructType
或 SQL DDL 字串來指定資料表結構描述。 使用 DDL 字串指定時,定義可以包含產生的資料行。
下列範例會使用 Python StructType
所指定的結構描述建立名為 sales
的資料表:
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
下列範例會使用 DDL 字串指定資料表的結構描述、定義產生的資料行,以及定義資料分割資料行:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
根據預設,如果您未指定結構描述,差異即時資料表會從 table
定義推斷結構描述。
設定串流資料表以忽略來源串流資料表中的變更
注意
skipChangeCommits
旗標只適用於使用option()
函式的spark.readStream
。 您無法在dlt.read_stream()
函式中使用此旗標。- 當來源串流資料表定義為 apply_changes() 函式的目標時,您就無法使用
skipChangeCommits
旗標。
根據預設,串流資料表需要僅限附加的來源。 當串流資料表使用另一個串流資料表做為來源,而來源串流資料表需要更新或刪除時,例如 GDPR「被遺忘的權利」處理時,skipChangeCommits
可以在讀取來源串流資料表時設定旗標來忽略這些變更。 如需此旗標的詳細資訊,請參閱忽略更新和刪除。
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")
範例:定義資料表條件約束
重要
資料表條件約束處於公開預覽狀態。
指定結構描述時,可以定義主索引鍵和外部索引鍵。 條件約束僅供參考,不會強制執行。 請參閱 SQL 語言參考中的 CONSTRAINT 子句。
下列範例會定義具有主索引鍵和外部索引鍵條件約束的資料表:
@dlt.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
"""
def sales():
return ("...")
範例:定義資料列篩選和資料行遮罩
重要
資料列篩選和資料行遮罩處於公開預覽狀態。
若要使用數據列篩選和數據行遮罩建立具體化檢視表或串流數據表,請使用 ROW FILTER 子句 和 MASK 子句。 下列範例示範如何使用資料列篩選和資料行遮罩來定義具體化檢視和串流數據表:
@dlt.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
return ("...")
如需有關資料列篩選和資料行遮罩的詳細資訊,請參閱發佈具有資料列篩選和資料行遮罩的資料表。
Python 差異即時資料表屬性
下表描述使用差異即時資料表定義資料表和檢視時,您可以指定的選項和屬性:
注意
若要使用 cluster_by
參數啟用液體叢集,您的管線必須設定為使用預覽通道
@table 或 @view |
---|
name 類型: str (英文)資料表或檢視表的選用名稱。 如果未定義,則會使用函式名稱做為資料表或檢視名稱。 |
comment 類型: str (英文)資料表的選用描述。 |
spark_conf 類型: dict (英文)執行此查詢的 Spark 組態選用清單。 |
table_properties 類型: dict (英文)資料表之資料表屬性的選用清單。 |
path 類型: str (英文)資料表資料的選用儲存位置。 如果未設定,系統會預設為管線儲存位置。 |
partition_cols 類型: a collection of str (英文)選用集合,例如要用於分割資料表之一個或多個資料行的 list 。 |
cluster_by 類型: array (英文)選擇性地在資料表上啟用液體群集,並定義要當做叢集索引鍵使用的資料行。 請參閱<針對差異資料表使用液態叢集>。 |
schema 類型: str 或 StructType 資料表的選用結構描述定義。 結構描述可定義為 SQL DDL 字串或使用 Python StructType 進行定義。 |
temporary 類型: bool (英文)建立資料表,但不會發佈資料表的中繼資料。 temporary 關鍵字會指示差異即時資料表建立可供管線使用的資料表,但不應該在管線外部存取。 為了縮短處理時間,臨時資料表會保存管線的存留期,而不只是單一更新。預設值為 False。 |
row_filter (公開預覽)類型: str (英文)資料表的可選資料列篩選子句。 請參閱發佈具有資料列篩選和資料行遮罩的資料表。 |
資料表或檢視定義 |
---|
def <function-name>() 定義資料集的 Python 函式。 如果未設定 name 參數,則會將 <function-name> 當做目標資料集名稱使用。 |
query 會傳回 Spark 資料集或 Koalas DataFrame 的 Spark SQL 陳述式。 使用 dlt.read() 或 spark.read.table() 從相同管線中定義的資料集執行完整讀取。 若要讀取外部數據集,請使用 函式 spark.read.table() 。 您無法用來 dlt.read() 讀取外部資料集。 因為 spark.read.table() 可以用來讀取內部數據集、定義在目前管線外部的數據集,而且可讓您指定讀取數據的選項,所以 Databricks 建議使用它,而不是 dlt.read() 函式。當您使用函 spark.read.table() 式從相同管線中定義的數據集讀取時,請將 關鍵詞前面加上 LIVE 函式自變數中的數據集名稱。 例如,若要從名為 customers 的資料集讀取:spark.read.table("LIVE.customers") 您也可以使用 spark.read.table() 函式,藉由省略 LIVE 關鍵字,並選擇性地使用資料庫名稱限定資料表名稱,以從中繼存放區中註冊的資料表讀取:spark.read.table("sales.customers") 使用 dlt.read_stream() 或 spark.readStream.table() 從相同管線中定義的數據集執行串流讀取。 若要從外部數據集執行串流讀取,請使用spark.readStream.table() 功能。 因為 spark.readStream.table() 可以用來讀取內部數據集、定義在目前管線外部的數據集,而且可讓您指定讀取數據的選項,所以 Databricks 建議使用它,而不是 dlt.read_stream() 函式。若要使用 SQL 語法在 Delta Live Tables table 函式中定義查詢,請使用 函式 spark.sql 。 請參閱 範例:使用spark.sql存取數據集。 若要使用 Python 在 Delta Live Tables table 函式中定義查詢,請使用 PySpark 語法。 |
預期結果 |
---|
@expect("description", "constraint") 宣告所識別的資料品質條件約束 description . 如果資料列違反預期,請在目標資料集中包含資料列。 |
@expect_or_drop("description", "constraint") 宣告所識別的資料品質條件約束 description . 如果資料列違反預期,請從目標資料集捨棄該資料列。 |
@expect_or_fail("description", "constraint") 宣告所識別的資料品質條件約束 description . 如果資料列違反預期,請立即停止執行。 |
@expect_all(expectations) 宣告一或多個資料品質條件約束。 expectations 是 Python 字典,其中索引鍵是預期描述,而值是預期條件約束。 如果資料列違反任何預期,請在目標資料集中包含該資料列。 |
@expect_all_or_drop(expectations) 宣告一或多個資料品質條件約束。 expectations 是 Python 字典,其中索引鍵是預期描述,而值是預期條件約束。 如果資料列違反任何預期,請從目標資料集捨棄該資料列。 |
@expect_all_or_fail(expectations) 宣告一或多個資料品質條件約束。 expectations 是 Python 字典,其中索引鍵是預期描述,而值是預期條件約束。 如果資料列違反任何預期,請立即停止執行。 |
在差異即時資料表中使用 Python 從變更摘要擷取異動資料
使用 Python API 中的 apply_changes()
函式,以使用差異即時資料表異動資料擷取 (CDC) 功能來處理異動資料摘要 (CDF) 中的來源資料。
重要
您必須宣告目標串流資料表,才能套用變更。 您可以選擇性地指定目標資料表的結構描述。 指定 apply_changes()
目標資料表的結構描述時,您必須包含 __START_AT
與具有與 sequence_by
欄位相同資料類型的 __END_AT
資料行。
若要建立必要的目標資料表,您可以在差異即時資料表 Python 介面中使用 create_streaming_table () 函式。
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
注意
為了 APPLY CHANGES
處理,INSERT
和 UPDATE
事件的預設行為是從來源向上插入 CDC 事件:更新目標資料表中符合指定索引鍵的任何資料列,或在目標資料表中不存在比對記錄時插入新資料列。 您可以使用 APPLY AS DELETE WHEN
條件來指定 DELETE
事件的處理。
若要深入瞭解使用變更摘要處理 CDC,請參閱套用變更 API:使用差異即時資料表簡化異動資料擷取。 如需使用 apply_changes()
函式的範例,請參閱範例:使用 CDF 來源資料處理的 SCD 類型 1 和 SCD 類型 2。
重要
您必須宣告目標串流資料表,才能套用變更。 您可以選擇性地指定目標資料表的結構描述。 指定 apply_changes
目標資料表結構描述時,您必須包含 __START_AT
,以及與 sequence_by
欄位具有相同資料類型的 __END_AT
資料行。
請參閱套用變更 API:使用差異即時資料表簡化異動資料擷取。
引數 |
---|
target 類型: str (英文)要更新的資料表名稱。 在執行 apply_changes() 函式之前,您可以使用 create_streaming_table() 函式來建立目標資料表。此為必要參數。 |
source 類型: str (英文)包含 CDC 記錄的資料來源。 此為必要參數。 |
keys 類型: list (英文)唯一識別來源資料中某資料列的資料行或資料行組合。 這可用來識別哪些 CDC 事件會套用至目標資料表中的特定記錄。 您可以指定下列其中一項: - 字串清單: ["userId", "orderId"] - Spark SQL col() 函式的清單:[col("userId"), col("orderId"] col() 函式的引數不能包含限定詞。 例如,您可以使用 col(userId) ,但無法使用 col(source.userId) 。此為必要參數。 |
sequence_by 類型: str 或 col() 指定來源資料中 CDC 事件邏輯順序的資料行名稱。 差異即時資料表會使用此排序來處理依序抵達的變更事件。 您可以指定下列其中一項: - 字串: "sequenceNum" - Spark SQL col() 函式:col("sequenceNum") col() 函式的引數不能包含限定詞。 例如,您可以使用 col(userId) ,但無法使用 col(source.userId) 。指定的數據行必須是可排序的數據類型。 此為必要參數。 |
ignore_null_updates 類型: bool (英文)允許內嵌包含目標資料行子集的更新。 當 CDC 事件符合現有的資料列且 ignore_null_updates 為 True 時,具有 null 的資料行會保留其目標中的現有值。 這也適用於值為 null 的巢狀資料行。 當 ignore_null_updates 為 False 時,會以 null 值覆寫現有的值。這是選用參數。 預設值為 False 。 |
apply_as_deletes 類型: str 或 expr() 指定應將 CDC 事件視為 DELETE 而非 upsert 的時機。 為了處理順序錯誤的資料,已刪除的資料列會暫時保留為基礎差異資料表中的標記,而檢視會在中繼存放區中建立,以篩選掉這些標記。 您可以使用pipelines.cdc.tombstoneGCThresholdInSeconds table 屬性。您可以指定下列其中一項: - 字串: "Operation = 'DELETE'" - Spark SQL expr() 函式:expr("Operation = 'DELETE'") 這是選用參數。 |
apply_as_truncates 類型: str 或 expr() 指定應將 CDC 事件視為完整資料表 TRUNCATE 的時機。 因為這個子句會觸發目標資料表的完整截斷,所以應該只用於需要這項功能的特定使用案例。只有 SCD 類型 1 才支援 apply_as_truncates 參數。 SCD 類型 2 不支援截斷作業。您可以指定下列其中一項: - 字串: "Operation = 'TRUNCATE'" - Spark SQL expr() 函式:expr("Operation = 'TRUNCATE'") 這是選用參數。 |
column_list except_column_list 類型: list (英文)要包含在目標資料表中的資料行子集。 使用 column_list 指定要包含之資料行的完整清單。 使用 except_column_list 指定要排除的資料行。 您可以將值宣告為字串清單或 Spark SQL col() 函式:- column_list = ["userId", "name", "city"] .- column_list = [col("userId"), col("name"), col("city")] - except_column_list = ["operation", "sequenceNum"] - except_column_list = [col("operation"), col("sequenceNum") col() 函式的引數不能包含限定詞。 例如,您可以使用 col(userId) ,但無法使用 col(source.userId) 。這是選用參數。 當目標資料表中沒有 column_list 或 except_column_list 引數傳遞至函式時,預設值為包含所有資料行。 |
stored_as_scd_type 類型: str 或 int 是否要將記錄儲存為 SCD 類型 1 或 SCD 類型 2。 針對 SCD 類型 1 設定為 1 ,或針對 SCD 類型 2 設定為 2 。這個子句是選用的。 預設類型為 SCD 類型 1。 |
track_history_column_list track_history_except_column_list 類型: list (英文)要追蹤目標資料表中記錄的輸出資料行子集。 使用 track_history_column_list 指定要追蹤之資料行的完整清單。 使用track_history_except_column_list 表示指定要從追蹤中排除的資料行。 您可以將值宣告為字串清單或 Spark SQL col() 函式:- track_history_column_list = ["userId", "name", "city"] .- track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") col() 函式的引數不能包含限定詞。 例如,您可以使用 col(userId) ,但無法使用 col(source.userId) 。這是選用參數。 預設值是當目標資料表中沒有 track_history_column_list 或track_history_except_column_list 引數時,會傳遞至函式。 |
使用差異即時資料表中的 Python 從資料庫快照集變更資料擷取
重要
APPLY CHANGES FROM SNAPSHOT
API 正處於公開預覽。
使用 Python API 中的 apply_changes_from_snapshot()
函式,以使用差異即時資料表異動資料擷取 (CDC) 功能來處理來自資料庫快照集的來源資料。
重要
您必須宣告目標串流資料表,才能套用變更。 您可以選擇性地指定目標資料表的結構描述。 指定 apply_changes_from_snapshot()
目標資料表的結構描述時,您必須包含 __START_AT
與具有與 sequence_by
欄位相同資料類型的 __END_AT
資料行。
若要建立必要的目標資料表,您可以在差異即時資料表 Python 介面中使用 create_streaming_table () 函式。
apply_changes_from_snapshot(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
) -> None
注意
為了 APPLY CHANGES FROM SNAPSHOT
處理,預設行為是在目標中不存在具有相同索引鍵的相符記錄時插入新的資料列。 如果相符的記錄確實存在,只有當資料列中的任何值都已變更時,才會加以更新。 目標中存在索引鍵但不再存在於來源中的資料列會遭到刪除。
若要深入瞭解使用快照處理 CDC,請參閱套用變更 API:使用差異即時資料表簡化異動資料擷取。 如需使用 apply_changes_from_snapshot()
函式的範例,請參閱定期快照擷取和歷程記錄快照擷取範例。
引數 |
---|
target 類型: str (英文)要更新的資料表名稱。 在執行 apply_changes() 函式之前,您可以使用 create_streaming_table() 函式來建立目標資料表。此為必要參數。 |
source 類型: str 或 lambda function 要定期快照的資料表或檢視名稱,或是傳回要處理的快照 DataFrame 和快照版本的 Python Lambda 函式。 請參閱實作來源引數。 此為必要參數。 |
keys 類型: list (英文)唯一識別來源資料中某資料列的資料行或資料行組合。 這可用來識別哪些 CDC 事件會套用至目標資料表中的特定記錄。 您可以指定下列其中一項: - 字串清單: ["userId", "orderId"] - Spark SQL col() 函式的清單:[col("userId"), col("orderId"] col() 函式的引數不能包含限定詞。 例如,您可以使用 col(userId) ,但無法使用 col(source.userId) 。此為必要參數。 |
stored_as_scd_type 類型: str 或 int 是否要將記錄儲存為 SCD 類型 1 或 SCD 類型 2。 針對 SCD 類型 1 設定為 1 ,或針對 SCD 類型 2 設定為 2 。這個子句是選用的。 預設類型為 SCD 類型 1。 |
track_history_column_list track_history_except_column_list 類型: list (英文)要追蹤目標資料表中記錄的輸出資料行子集。 使用 track_history_column_list 指定要追蹤之資料行的完整清單。 使用track_history_except_column_list 表示指定要從追蹤中排除的資料行。 您可以將值宣告為字串清單或 Spark SQL col() 函式:- track_history_column_list = ["userId", "name", "city"] .- track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") col() 函式的引數不能包含限定詞。 例如,您可以使用 col(userId) ,但無法使用 col(source.userId) 。這是選用參數。 預設值是當目標資料表中沒有 track_history_column_list 或track_history_except_column_list 引數時,會傳遞至函式。 |
實作 source
引數
apply_changes_from_snapshot()
函式包含 source
引數。 若要處理歷程記錄快照,source
引數應該是 Python Lambda 函式,其會將兩個值傳回給 apply_changes_from_snapshot()
函式:包含要處理的快照資料和快照版本的 Python DataFrame。
以下是 Lambda 函式的簽名:
lambda Any => Optional[(DataFrame, Any)]
- Lambda 函式的引數是最近處理的快照版本。
- Lambda 函式的傳回值為
None
或兩個值的元組:第一個值是包含要處理之快照的 DataFrame。 元組的第二個值是代表快照邏輯順序的快照版本。
實作和呼叫 Lambda 函式的範例:
def next_snapshot_and_version(latest_snapshot_version):
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
apply_changes_from_snapshot(
# ...
source = next_snapshot_and_version,
# ...
)
每次觸發包含 apply_changes_from_snapshot()
函式的管線時,差異即時資料表執行階段都會執行下列步驟:
- 執行
next_snapshot_and_version
函式以載入下一個快照 DataFrame 和對應的快照版本。 - 如果沒有 DataFrame 傳回,則會終止執行,且管線更新會標示為完成。
- 偵測新快照中的變更,並以累加方式將其套用至目標資料表。
- 返回步驟 #1 以載入下一個快照及其版本。
限制
差異即時資料表 Python 介面有下列限制:
不支援 pivot()
函式。 Spark 中的 pivot
操作需要積極式載入輸入資料,才能計算輸出結構描述。 差異即時資料表不支援此功能。