共用方式為


DLT Python 語言參考手冊

本文提供 DLT Python 程式設計介面的詳細數據。

如需 SQL API 的資訊,請參閱 DLT SQL 語言參考。

如需設定自動載入器的特定詳細數據,請參閱 什麼是自動載入器?

開始之前

當您使用 DLT Python 介面實作管線時,以下是重要的考慮:

  • 由於 Python table()view() 函式會在管線更新的規劃和執行期間多次叫用,因此請勿在其中一個可能有副作用的函式中包含程式代碼(例如修改數據或傳送電子郵件的程式代碼)。 為了避免非預期的行為,定義數據集的 Python 函式應該只包含定義數據表或檢視表所需的程式代碼。
  • 若要執行傳送電子郵件或與外部監視服務整合等作業,特別是在定義資料集的函式中,請使用 事件攔截。 在定義數據集的函式中實作這些作業會導致非預期的行為。
  • Python tableview 函式必須傳回 DataFrame。 某些在 DataFrame 上運作的函式不會傳回 DataFrame,而且不應該使用。 這些作業包括 collect()count()toPandas()save()saveAsTable()等函式。 由於 DataFrame 轉換會在解析完整數據流圖形 之後 執行,因此使用這類作業可能會產生非預期的副作用。

匯入 dlt Python 模組

DLT Python 函式定義於 dlt 模組中。 使用 Python API 實作的管線必須匯入這個模組:

import dlt

建立 DLT 具象化視圖或串流資料表

在 Python 中,DLT 會根據定義查詢來決定將數據集更新為具體化檢視或串流數據表。 @table 裝飾器可以用來定義具現化視圖和串流數據表。

若要在 Python 中定義具體化檢視,請將 @table 套用至對數據源執行靜態讀取的查詢。 若要定義串流資料表,請將 @table 套用至對資料源執行串流讀取的查詢,或使用 create_streaming_table() 函式。 這兩種數據集類型都有相同的語法規格,如下所示:

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

建立 DLT 檢視

若要在 Python 中定義視圖,請套用 @view 裝飾器。 如同 @table 裝飾專案,您可以在 DLT 中針對靜態或串流數據集使用檢視。 以下是使用 Python 定義檢視的語法:

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

範例:定義數據表和檢視

若要在 Python 中定義資料表或檢視表,請將 @dlt.view@dlt.table 裝飾器套用至函數。 您可以使用函式名稱或 name 參數來指派數據表或檢視名稱。 下列範例會定義兩個不同的數據集:一個稱為 taxi_raw 的檢視,其接受 JSON 檔案做為輸入來源,而名為 filtered_data 的數據表會接受 taxi_raw 檢視做為輸入:

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return spark.read.table("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return spark.read.table("taxi_raw").where(...)

範例:存取在相同管線中定義的數據集

注意

雖然 DLT Python 介面仍然提供 dlt.read()dlt.read_stream() 函式且完全支援,但 Databricks 建議一律使用 spark.read.table()spark.readStream.table() 函式,原因如下:

  • spark 函式支援讀取內部和外部的資料集,包括外部儲存裝置中的資料集,或在其他管線中定義的資料集。 dlt 函式僅支援讀取內部數據集。
  • spark 函式支援為讀取操作指定選項,例如 skipChangeCommitsdlt 函式不支援指定選項。

若要存取在相同管線中定義的數據集,請使用 spark.read.table()spark.readStream.table() 函式:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return spark.read.table("customers_raw").where(...)

注意

在查詢管線中的檢視或資料表時,您可以直接指定目錄和架構,也可以使用管線中設定的預設值。 在此範例中,customers數據表是從您管線設定的預設目錄和架構中寫入和讀取。

範例:從中繼存放區中註冊的數據表讀取

若要從 Hive 中繼存放區中註冊的數據表讀取數據,請在函式自變數中,使用資料庫名稱來限定數據表名稱:

@dlt.table
def customers():
  return spark.read.table("sales.customers").where(...)

如需從 Unity 目錄資料表讀取的範例,請參閱 將資料內嵌至 Unity 目錄管線

範例:使用 spark.sql 存取數據集

您也可以在查詢函式中使用 spark.sql 表示式來傳回數據集。 若要從內部數據集讀取,您可以將名稱保留為不合格的,以使用預設目錄和架構,也可以預先加上:

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM catalog_name.schema_name.customers_cleaned WHERE city = 'Chicago'")

從實體化視圖或流式表永久刪除記錄

若要從啟用了刪除向量的具現化檢視或串流資料表中永久刪除記錄,例如以符合GDPR規範,必須在其基礎 Delta 資料表上執行其他作業。 若要確保從具體化檢視刪除記錄,請參閱 永久刪除具有啟用刪除向量之具體化檢視中的記錄。 若要確保從串流資料表刪除記錄,請參閱 從串流資料表永久刪除記錄

使用 DLT sink API 寫入外部事件串流服務或 Delta 表格

重要

DLT sink API 位於公開預覽版

注意

  • 執行完整重新整理更新不會移除接收器中的數據。 任何重新處理的資料都會附加到匯集端,而不會改變現有的資料。
  • sink API 不支援 DLT 預期。

若要寫入 Apache Kafka 或 Azure 事件中樞之類的事件串流服務,或從 DLT 管線寫入至 Delta 數據表,請使用 create_sink() Python 模組中包含的 dlt 函式。 在使用create_sink() 函式建立接收器之後,您可以在附加流中使用該接收器將數據寫入其中。 append flow 是 create_sink() 函式唯一支援的流程類型。 不支援其他流程類型,例如 apply_changes

以下是使用 create_sink() 函式建立接收端的語法:

create_sink(<sink_name>, <format>, <options>)
論點
name
類型:str
用來識別接收端並用以參考和管理該接收端的字串。 接收端名稱對管線而言必須是唯一的,包括所有原始程式碼,例如屬於管線一部分的筆記本或模組。
這是必要參數。
format
類型:str
定義輸出格式的字串,kafkadelta
這是必要參數。
options
類型:dict
選填的接收器選項清單,格式為 {"key": "value"},其中鍵和值都是字串。 Kafka 和 Delta 接收器所支援的所有 Databricks 執行時間選項都已支援。 如需 Kafka 選項,請參閱 設定 Kafka 結構化串流寫入器。 如需 Delta 選項,請參閱 Delta 表作為資料目標

範例:使用 create_sink() 函式建立 Kafka 匯出

create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

範例:使用 create_sink() 函式和檔案系統路徑建立 Delta 匯入點

下列範例會建立一個資料接收器,利用檔案系統路徑將資料寫入 Delta 表:

create_sink(
  "my_delta_sink",
    "delta",
    { "path": "//path/to/my/delta/table" }
)

範例:使用 create_sink() 函式和 Unity 資料目錄資料表名稱建立 Delta 匯集

注意

Delta 資料流接收點支援 Unity Catalog 的外部和管理的資料表,以及 Hive 資料庫管理的資料表。 表格名稱必須完整指定。 例如,Unity 目錄資料表必須使用三層識別碼:<catalog>.<schema>.<table>。 Hive 中繼存放區資料表必須使用 <schema>.<table>

下列範例會藉由傳遞 Unity Catalog 中的表名稱,建立寫入 Delta 表的匯集器:

create_sink(
  "my_delta_sink",
    "delta",
    { "tableName": "my_catalog.my_schema.my_table" }
)

範例:使用附加流程寫入 Delta 儲存區

下列範例會建立一個寫入 Delta 數據表的接收器,然後建立一個附加流程以寫入該接收器:

create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

@append_flow(name = "flow", target = "my_sink")
def flowFunc():
  return <streaming-query>

範例:使用追加流寫入Kafka接收點

下列範例會建立一個接收端以寫入 Kafka 主題,然後建立一個資料流來追加寫入該接收端:

create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

@append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
  return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))

寫入 Kafka 之 DataFrame 的架構應該包含 設定 Kafka 結構化串流寫入器中指定的數據行。

建立數據表以作為串流作業的目標

使用 create_streaming_table() 函式來建立由串流作業輸出的記錄之目標數據表,包括 apply_changes()apply_changes_from_snapshot(),以及 @append_flow 輸出記錄。

注意

create_target_table()create_streaming_live_table() 函式已被取代。 Databricks 建議更新現有的程式代碼,以使用 create_streaming_table() 函式。

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>",
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
  row_filter = "row-filter-clause"
)
論點
name
類型:str
數據表名稱。
這是必要參數。
comment
類型:str
數據表的選擇性描述。
spark_conf
類型:dict
執行此查詢的Spark組態選擇性清單。
table_properties
類型:dict
表格屬性的選擇性清單。
partition_cols
類型:array
用於分割數據表的一個或多個數據行的選擇性清單。
cluster_by
類型:array
選擇性地在數據表上啟用液體群集,並定義要當做叢集索引鍵使用的數據行。
請參閱 Delta 資料表的液態聚類使用方法
path
類型:str
數據表數據的選擇性儲存位置。 如果未設定,系統會預設為管線儲存位置。
schema
類型:strStructType
數據表的選擇性架構定義。 架構可以定義為 SQL DDL 字串或使用 Python
StructType
expect_all
expect_all_or_drop
expect_all_or_fail
類型:dict
表格的可選數據品質限制。 參閱 多項期望
row_filter (公開預覽)
類型:str
表格的可選的行篩選條件。 請參閱 使用資料列篩選和欄位遮罩發佈資料表,

控制數據表具體化的方式

資料表還提供對其實現的額外控制。

注意

對於大小小於 1 TB 的數據表,Databricks 建議讓 DLT 控制數據組織。 除非您預期數據表成長超過一太位元組,否則您不應該指定分區欄位。

範例:指定架構和叢集數據行

您可以選擇性地使用 Python StructType 或 SQL DDL 字串來指定資料表架構。 使用 DDL 字串指定時,定義可以包含 生成的欄位

下列範例會建立名為 sales 的數據表,並使用 Python StructType指定的架構:

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

下列範例會使用 DDL 字串指定資料表的架構、定義產生的數據行,以及定義叢集資料行:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  cluster_by = ["order_day_of_week", "customer_id"])
def sales():
  return ("...")

根據預設,如果您未指定架構,DLT 會從 table 定義推斷架構。

範例:指定分割區欄位

下列範例會使用 DDL 字串指定資料表的結構、定義生成的欄位,以及定義分割欄位:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

範例:定義數據表條件約束

重要

表格約束條件在 公開預覽

指定架構時,您可以定義主鍵和外鍵。 條件約束是參考性的,不會強制執行。 請參閱 SQL 語言參考中的 CONSTRAINT 子句

下列範例會定義具有主鍵和外鍵條件約束的數據表:

@dlt.table(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """
def sales():
   return ("...")

範例:定義數據列篩選和數據行遮罩

重要

行篩選和列遮罩現正在公開預覽

若要使用資料列篩選和數據行遮罩建立具體化檢視表或串流數據表,請使用 ROW FILTER 子句MASK 子句。 下列範例示範如何使用資料列篩選和資料行遮罩來定義具體化檢視和串流數據表:

@dlt.table(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
   return ("...")

如需資料列篩選和資料行遮罩的詳細資訊,請參閱 使用資料列篩選和資料行遮罩發行資料表

設定串流數據表以忽略來源串流數據表中的變更

注意

  • skipChangeCommits 旗標僅在與 spark.readStream 一起使用 option() 函式時才有效。 您無法在 dlt.read_stream() 函式中使用這個旗標。
  • 當來源串流數據表定義為 skipChangeCommits 函式的目標時,您無法使用 旗標。

根據默認設定,串流資料表需要只接受附加的來源。 當串流數據表使用另一個串流數據表做為來源時,而且來源串流數據表需要更新或刪除,例如 GDPR「被遺忘的權利」處理時,可以在讀取來源串流數據表時設定 skipChangeCommits 旗標來忽略這些變更。 如需此旗標的詳細資訊,請參閱 忽略更新和刪除

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("A")

Python DLT 屬性

下表描述使用 DLT 定義資料表和檢視時,您可以指定的選項和屬性:

@table 或 @view
name
類型:str
數據表或檢視表的選擇性名稱。 如果未定義,則會使用函式名稱做為數據表或檢視名稱。
comment
類型:str
數據表的選擇性描述。
spark_conf
類型:dict
執行此查詢的Spark組態選擇性清單。
table_properties
類型:dict
一個可選的 表格屬性 清單。
path
類型:str
數據表數據的選擇性儲存位置。 如果未設定,系統會預設為管線儲存位置。
partition_cols
類型:a collection of str
例如,list 是用於分割表格的可選集合,可以包含一個或多個欄位。
cluster_by
類型:array
選擇性地在數據表上啟用液體群集,並定義要當做叢集索引鍵使用的數據行。
請參閱 在 Delta 資料表中使用液態分群
schema
類型:strStructType
數據表的選擇性架構定義。 架構可以定義為 SQL DDL 字串,或使用 Python StructType
temporary
類型:bool
建立數據表,但不會發佈數據表的元數據。 temporary 關鍵詞會指示 DLT 建立可供管線使用的數據表,但不應該在管線外部存取。 為了縮短處理時間,臨時表會在建立它的管線的生命週期內保留,而不僅僅是在單一更新中存在。
預設值為 『False』。
row_filter (公開預覽)
類型:str
資料表的可選列篩選子句。 請參閱 使用資料列篩選和欄位遮罩發佈資料表,
數據表或視圖定義
def <function-name>()
定義數據集的 Python 函式。 如果未設定 name 參數,則會使用 <function-name> 作為目標數據集名稱。
query
Spark SQL 語句,會傳回 Spark 數據集或 Koalas DataFrame。
使用 dlt.read()spark.read.table(),從相同管線中定義的數據集執行完整讀取。 若要讀取外部數據集,請使用 spark.read.table() 函式。 您無法使用 dlt.read() 來讀取外部資料集。 由於 spark.read.table() 可用來讀取內部數據集、定義在目前管線外部的數據集,並可讓您指定讀取數據的選項,Databricks 建議使用它,而不是 dlt.read() 函式。
當您在管線中定義數據集時,預設會使用管線組態中定義的目錄和架構。 您可以使用 spark.read.table() 函式,從管線中定義的數據集讀取,但沒有任何限定性。 例如,若要從名為 customers的資料集讀取 :
spark.read.table("customers")
您也可以使用 spark.read.table() 函式,選擇性地將資料表名稱限定為資料庫名稱,以從中繼存放區中註冊的數據表讀取:
spark.read.table("sales.customers")
使用 dlt.read_stream()spark.readStream.table(),從相同管線中定義的數據集執行串流讀取。 若要從外部數據集執行串流讀取,請使用
spark.readStream.table() 函式。 由於 spark.readStream.table() 可用來讀取內部數據集、定義在目前管線外部的數據集,並可讓您指定讀取數據的選項,Databricks 建議使用它,而不是 dlt.read_stream() 函式。
若要使用 SQL 語法在 DLT table 函式中定義查詢,請使用 spark.sql 函式。 請參閱 範例:使用 spark.sql存取數據集。 若要使用 Python 在 DLT table 函式中定義查詢,請使用 PySpark 語法。
期望值
@expect("description", "constraint")
宣告已識別的數據品質限制條件
description。 如果數據列違反預期,請在目標數據集中包含數據列。
@expect_or_drop("description", "constraint")
聲明已識別的數據品質限制條件
description。 如果數據列違反預期,請從目標數據集卸除該數據列。
@expect_or_fail("description", "constraint")
聲明已識別的數據質量限制條件
description。 如果數據列違反預期,請立即停止執行。
@expect_all(expectations)
宣告一或多個數據質量條件約束。
expectations 是 Python 字典,其中索引鍵是預期描述,而值則是預期條件約束。 如果數據列違反任何預期,請在目標數據集中包含該數據列。
@expect_all_or_drop(expectations)
宣告一或多個數據質量條件約束。
expectations 是 Python 字典,其中索引鍵是預期描述,而值則是預期條件約束。 如果數據列違反任何預期,請從目標數據集卸除該數據列。
@expect_all_or_fail(expectations)
宣告一或多個數據質量條件約束。
expectations 是 Python 字典,其中索引鍵是預期描述,而值則是預期條件約束。 如果數據列違反任何預期,請立即停止執行。

在 DLT 中使用 Python 從變更饋送中擷取變更數據

使用 Python API 中的 apply_changes() 函式,使用 DLT 異動數據擷取 (CDC) 功能來處理來自變更數據摘要的源數據(CDF)。

重要

您必須宣告目標串流資料表,才能將變更套用至 。 您可以選擇性地指定目標資料表的架構。 指定 apply_changes() 目標資料表的架構時,您必須包含與 __START_AT 欄位相同的數據類型 __END_ATsequence_by 資料行。

若要建立必要的目標數據表,您可以在 DLT Python 介面中使用 create_streaming_table() 函式。

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

備註

針對 APPLY CHANGES 處理,INSERTUPDATE 事件的預設行為是從來源 更新或插入 的 CDC 事件:更新目標數據表中符合指定索引鍵的任何數據列,或在目標數據表中不存在相符記錄時插入新的數據列。 您可以使用 DELETE 條件來指定 APPLY AS DELETE WHEN 事件的處理。

若要瞭解更多關於使用變更摘要處理 CDC 的資訊,請參閱 APPLY CHANGES API:使用 DLT 簡化變更數據擷取。 如需使用 apply_changes() 函式的範例,請參閱 範例:SCD 類型 1 和 SCD 類型 2 處理與 CDF 源資料

重要

您必須宣告目標串流資料表,才能將變更套用至 。 您可以選擇性地指定目標資料表的架構。 指定 apply_changes 目標資料表架構時,您必須包含與 __START_AT 字段相同的數據類型 __END_ATsequence_by 資料行。

請參閱 套用變更 API:使用 DLT簡化異動數據擷取。

論點
target
類型:str
要更新之資料表的名稱。 在執行 函式之前,您可以使用 apply_changes() 函式來建立目標數據表。
這是必要參數。
source
類型:str
包含 CDC 記錄的數據來源。
這是必要參數。
keys
類型:list
用來唯一識別源數據中某一行或多個數據列組合的欄位。 這可用來識別哪些 CDC 事件會套用至目標數據表中的特定記錄。
您可以指定下列其中一項:
  • 字串清單:["userId", "orderId"]
  • Spark SQL col() 函式的清單:[col("userId"), col("orderId"]

col() 函式的自變數不能包含限定符。 例如,您可以使用 col(userId),但無法使用 col(source.userId)
這是必要參數。
sequence_by
類型:strcol()
指定源數據中 CDC 事件邏輯順序的數據行名稱。 DLT 會使用此排序來處理不按順序到達的變更事件。
您可以指定下列其中一項:
  • 字串:"sequenceNum"
  • Spark SQL col() 函式:col("sequenceNum")

col() 函式的自變數不能包含限定符。 例如,您可以使用 col(userId),但無法使用 col(source.userId)
指定的數據行必須是可排序的數據類型。
這是必要參數。
ignore_null_updates
類型:bool
允許接收包含目標欄位子集的更新。 當 CDC 事件符合現有的資料列且 ignore_null_updatesTrue時,具有 null 的資料列會在目標中保留其現有值。 這也適用於具有 null值的巢狀數據行。 當 ignore_null_updatesFalse時,將現有的值替換為 null 的值。
這個參數是選擇性的。
預設值為 False
apply_as_deletes
類型:strexpr()
指定何時應將 CDC 事件視為 DELETE,而非視為 upsert。 為了處理順序錯誤的資料,已刪除的資料列會暫時保留為底層的 Delta 表中的墓碑,並在 Metastore 中建立視圖,以過濾掉這些墓碑。 您可以使用[工具/方法]來設定保留間隔
pipelines.cdc.tombstoneGCThresholdInSeconds 數據表屬性。
您可以指定下列其中一項:
  • 字串:"Operation = 'DELETE'"
  • 一個 Spark SQL expr() 函式:expr("Operation = 'DELETE'")

這個參數是選擇性的。
apply_as_truncates
類型:strexpr()
指定 CDC 事件何時應視為完整資料表 TRUNCATE。 因為這個子句會觸發目標數據表的完整截斷,所以應該只用於需要這項功能的特定使用案例。
只有 SCD 類型 1 才支援 apply_as_truncates 參數。 SCD 類型 2 不支援截斷作業。
您可以指定下列其中一項:
  • 字串:"Operation = 'TRUNCATE'"
  • Spark SQL expr() 函數:expr("Operation = 'TRUNCATE'")

這個參數是選擇性的。
column_list
except_column_list
類型:list
要包含在目標資料表中的欄位子集。 使用 column_list 指定要包含之資料行的完整清單。 使用 except_column_list 指定要排除的數據行。 您可以將值宣告為字串清單,或宣告為 Spark SQL col() 函式:
  • column_list = ["userId", "name", "city"]
  • column_list = [col("userId"), col("name"), col("city")]
  • except_column_list = ["operation", "sequenceNum"]
  • except_column_list = [col("operation"), col("sequenceNum")

col() 函式的自變數不能包含限定符。 例如,您可以使用 col(userId),但無法使用 col(source.userId)
這個參數是選擇性的。
當目標數據表中未傳遞任何 column_listexcept_column_list 自變數時,預設會包含目標數據表中的所有數據行。
stored_as_scd_type
類型:strint
是否要將記錄儲存為 SCD 類型 1 或 SCD 類型 2。
將 SCD 類型 1 設定為 1 或將 SCD 類型 2 設定為 2
這個子句是選擇性的。
預設值為 SCD 類型 1。
track_history_column_list
track_history_except_column_list
類型:list
在目標數據表中用於歷史追蹤的輸出欄位子集。 使用 track_history_column_list 指定要追蹤之數據行的完整清單。 使用
track_history_except_column_list 指定要從追蹤中排除的數據行。 您可以將值宣告為字串清單,或宣告為 Spark SQL col() 函式:
  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

col() 函式的自變數不能包含限定符。 例如,您可以使用 col(userId),但無法使用 col(source.userId)
這個參數是選擇性的。
當目標數據表中缺少 track_history_column_list 或其他條件時,預設將包含所有數據列。
track_history_except_column_list 自變數傳遞至 函式。

使用 Python 在 DLT 中擷取資料庫快照中的變更資料

重要

APPLY CHANGES FROM SNAPSHOT API 處於 公開預覽

使用 Python API 中的 apply_changes_from_snapshot() 函式,使用 DLT 異動數據擷取 (CDC) 功能來處理來自資料庫快照集的源數據。

重要

您必須宣告目標串流資料表,才能將變更套用至 。 您可以選擇性地指定目標資料表的架構。 指定 apply_changes_from_snapshot() 目標資料表的架構時,您也必須包含 __START_AT__END_AT 數據行,其數據類型與 sequence_by 字段相同。

若要建立必要的目標數據表,您可以在 DLT Python 介面中使用 create_streaming_table() 函式。

apply_changes_from_snapshot(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
) -> None

注意

針對 APPLY CHANGES FROM SNAPSHOT 處理,預設行為是在目標中不存在具有相同索引鍵的相符記錄時插入新的數據列。 如果相符的記錄確實存在,只有當數據列中的任何值都已變更時,才會更新它。 目標中存在索引鍵但不再存在於來源中的數據列會遭到刪除。

若要深入瞭解如何使用快照集處理異動資料擷取 (CDC),請參閱 APPLY CHANGES API:使用 DLT 簡化變更資料擷取。 如需使用 apply_changes_from_snapshot() 函式的範例,請參閱 定期快照集擷取歷程記錄快照擷取 的範例。

論點
target
類型:str
要更新之數據表的名稱。 在執行 函式之前,您可以使用 apply_changes() 函式來建立目標數據表。
這是必要參數。
source
類型:strlambda function
要定期截取快照的資料表或檢視名稱,或者一個傳回快照 DataFrame 和快照版本並進行處理的 Python lambda 函式。 請參閱 實作 source 自變數
這是必要參數。
keys
類型:list
可唯一識別源數據中數據列的數據行或數據行組合。 這可用來識別哪些 CDC 事件會套用至目標數據表中的特定記錄。
您可以指定下列其中一項:
  • 字串清單:["userId", "orderId"]
  • Spark SQL col() 函式的清單:[col("userId"), col("orderId"]

col() 函式的自變數不能包含限定符。 例如,您可以使用 col(userId),但無法使用 col(source.userId)
這是必要參數。
stored_as_scd_type
類型:strint
是否要將記錄儲存為 SCD 類型 1 或 SCD 類型 2。
將 SCD 類型 1 設定為 1 或將 SCD 類型 2 設定為 2
這個子句是選擇性的。
預設值為 SCD 類型 1。
track_history_column_list
track_history_except_column_list
類型:list
在目標數據表中用於歷史追蹤的輸出欄位子集。 使用 track_history_column_list 指定要追蹤之數據行的完整清單。 使用
track_history_except_column_list 指定要從追蹤中排除的數據行。 您可以將值宣告為字串清單,或宣告為 Spark SQL col() 函式:
  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

col() 函式的自變數不能包含限定符。 例如,您可以使用 col(userId),但無法使用 col(source.userId)
這個參數是選擇性的。
當目標數據表中沒有 track_history_column_list 或其他條件時,預設值為包含所有數據欄。
track_history_except_column_list 自變數傳遞至 函式。

實作 source 自變數

apply_changes_from_snapshot() 函式包含 source 自變數。 若要處理歷程記錄快照集,source 自變數應該是 Python Lambda 函式,會將兩個值傳回至 apply_changes_from_snapshot() 函式:包含要處理的快照集數據和快照集版本的 Python DataFrame。

以下是 Lambda 函式的簽章:

lambda Any => Optional[(DataFrame, Any)]
  • Lambda 函數的引數是最近處理的快照版本。
  • Lambda 函式的傳回值是 None 或一個包含兩個值的元組:元組的第一個值是包含要處理的快照集的 DataFrame。 元組的第二個值是代表快照邏輯順序的快照版本。

實作和呼叫 Lambda 函式的範例:

def next_snapshot_and_version(latest_snapshot_version):
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

apply_changes_from_snapshot(
  # ...
  source = next_snapshot_and_version,
  # ...
)

每次包含 apply_changes_from_snapshot() 函式的管線被觸發時,DLT 執行階段都會執行下列步驟:

  1. 執行 next_snapshot_and_version 函式,以載入下一個快照集 DataFrame 和對應的快照集版本。
  2. 如果沒有 DataFrame 傳回,則會終止執行,且管線更新會標示為完成。
  3. 偵測新快照集中的變更,並以累加方式將它們套用至目標數據表。
  4. 返回步驟 #1 以載入下一個快照集及其版本。

限制

DLT Python 介面有下列限制:

不支援 pivot() 函式。 Spark 中的 pivot 作業需要急切載入輸入數據以計算輸出架構。 DLT 不支援此功能。