共用方式為


套用變更 API:使用 DLT 簡化異動數據擷取

DLT 使用 APPLY CHANGESAPPLY CHANGES FROM SNAPSHOT API 簡化異動數據擷取 (CDC)。 您使用的介面取決於變更資料的來源:

  • 使用 APPLY CHANGES 來處理從變更數據摘要 (CDF) 取得的變更。
  • 使用 APPLY CHANGES FROM SNAPSHOT (公開預覽) 來處理資料庫快照集的變更。

先前,MERGE INTO 語句通常用於處理 Azure Databricks 上的 CDC 記錄。 不過,MERGE INTO 可能會因為順序錯亂記錄而產生不正確的結果,或需要複雜的邏輯來重新排序記錄。

DLT SQL 和 Python 介面支援 APPLY CHANGES API。 DLT Python 介面支援 APPLY CHANGES FROM SNAPSHOT API。

APPLY CHANGESAPPLY CHANGES FROM SNAPSHOT 支援使用 SCD 型態 1 與類型 2 更新資料表:

  • 使用 SCD 類型 1 直接更新記錄。 更新紀錄時不會保留歷史資料。
  • 使用 SCD 類型 2 來保留記錄的歷程記錄,不論是所有更新,還是僅更新到一組指定的欄位。

如需語法和其他參考,請參閱:

注意

本文說明如何根據源數據變更來更新 DLT 管線中的資料表。 若要瞭解如何記錄和查詢 Delta 表的資料列層級變更資訊,請參閱 在 Azure Databricks 上使用 Delta Lake 變更資料提要

要求

若要使用 CDC API,您的管線必須設定為使用 無伺服器 DLT 管線 或 DLT ProAdvanced版本

CDC 如何使用 APPLY CHANGES API 實作?

藉由自動處理順序外記錄,DLT 中的 APPLY CHANGES API 可確保正確處理 CDC 記錄,並不需要開發複雜的邏輯來處理順序外記錄。 您必須在源數據中指定一個用於排序記錄的欄,DLT會將其解釋為一種單調遞增地表示源數據正確順序的方法。 DLT 會自動處理無序抵達的數據。 針對 SCD 類型 2 變更,DLT 會將適當的排序值傳播至目標數據表的 __START_AT__END_AT 資料行。 每個排序值的每個索引鍵都應該有一個不同的更新,而且不支援 NULL 排序值。

若要使用 APPLY CHANGES執行 CDC 處理,請先建立串流數據表,然後使用 SQL 中的 APPLY CHANGES INTO 語句,或 Python 中的 apply_changes() 函式來指定變更摘要的來源、索引鍵和排序。 若要建立目標串流數據表,請在 SQL 中使用 CREATE OR REFRESH STREAMING TABLE 語句,或在 Python 中使用 create_streaming_table() 函式。 請參閱 SCD 類型 1 和類型 2 處理 範例。

如需語法詳細數據,請參閱 DLT SQL 參考Python 參考

CDC 如何使用 APPLY CHANGES FROM SNAPSHOT API 實作?

重要

APPLY CHANGES FROM SNAPSHOT API 處於 公開預覽

APPLY CHANGES FROM SNAPSHOT 是宣告式 API,可藉由比較一系列順序的快照集,有效率地判斷源數據變更,然後執行 CDC 處理快照集中記錄所需的處理。 只有 DLT Python 介面才支援 APPLY CHANGES FROM SNAPSHOT

APPLY CHANGES FROM SNAPSHOT 支援從多個來源類型擷取快照集:

  • 使用定期匯入快照,從現有的資料表或檢視中匯入快照。 APPLY CHANGES FROM SNAPSHOT 具有簡單的簡化介面,可支援從現有的資料庫物件定期擷取快照集。 每次更新管道時,系統會載入新的快照,並將載入時間用作快照版本。 當管線在連續模式下運行時,每次更新時會匯入多個快照,匯入的頻率由設定包含APPLY CHANGES FROM SNAPSHOT處理的流程之 觸發間隔 決定。
  • 使用歷程記錄快照集擷取來處理包含資料庫快照集的檔案,例如從 Oracle 或 MySQL 資料庫或數據倉儲產生的快照集。

若要從具有 APPLY CHANGES FROM SNAPSHOT的任何來源類型執行 CDC 處理,請先建立串流數據表,然後使用 Python 中的 apply_changes_from_snapshot() 函式來指定實作處理所需的快照集、索引鍵和其他自變數。 請參閱 定期快照集擷取歷程記錄快照擷取 範例。

傳遞至 API 的快照集必須依版本遞增順序。 如果 DLT 偵測到順序錯亂的快照集,將會引發錯誤。

如需語法詳細數據,請參閱 DLT Python 參考

局限性

用於排序的數據行必須是可排序的數據類型。

範例:使用CDF源數據處理SCD類型1和SCD類型2

下列各節提供 DLT SCD 類型 1 和類型 2 查詢的範例,這些查詢會根據變更資料摘要中的來源事件來更新目標數據表:

  1. 建立新的用戶記錄。
  2. 刪除用戶記錄。
  3. 更新用戶記錄。 在 SCD 類型 1 範例中,最後一個 UPDATE 作業遲到且從目標數據表中刪除,說明如何處理順序錯亂的事件。

下列範例假設熟悉設定和更新 DLT 管線。 請參閱 教學課程:運行您的第一個 DLT 資料管線

若要執行這些範例,您必須從建立範例數據集開始。 請參閱 產生測試資料

以下是這些範例的輸入記錄:

userId 名字 城市 操作 序列號
124 勞爾 瓦哈卡州 INSERT 1
123 伊莎貝爾 蒙特雷 INSERT 1
125 梅賽德斯 提 華納 INSERT 2
126 百合 坎昆 INSERT 2
123 刪除 6
125 梅賽德斯 瓜達拉哈拉 UPDATE 6
125 梅賽德斯 Mexicali UPDATE 5
123 伊莎貝爾 吉娃娃 UPDATE 5

如果您取消批注範例數據中的最後一個數據列,它會插入下列記錄,指定應截斷記錄的位置:

userId 名字 城市 操作 序列號
截斷 3

注意

下列所有範例都包含選項,可同時指定 DELETETRUNCATE 作業,但每個都是選擇性的。

進程 SCD 類型 1 更新

下列範例示範處理 SCD 類型 1 更新:

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

執行 SCD 類型 1 範例之後,目標資料表會包含下列記錄:

userId 名字 城市
124 勞爾 瓦哈卡州
125 梅賽德斯 瓜達拉哈拉
126 Lily 坎昆

執行包含額外 TRUNCATE 記錄的 SCD 類型 1 範例後,由於 sequenceNum=3TRUNCATE 操作,記錄 124126 被截斷,而目標數據表中包含以下記錄:

userId 名字 城市
125 梅賽德斯 瓜達拉哈拉

進程 SCD 類型 2 更新

下列範例示範處理 SCD 類型 2 更新:

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

執行 SCD 類型 2 範例之後,目標資料表會包含下列記錄:

userId 名字 城市 __START_AT __END_AT
123 伊莎貝爾 蒙特雷 1 5
123 伊莎貝爾 吉娃娃 5 6
124 勞爾 瓦哈卡州 1
125 梅賽德斯 提 華納 2 5
125 梅賽德斯 Mexicali 5 6
125 梅賽德斯 瓜達拉哈拉 6
126 莉莉 坎昆 2

SCD 類型 2 查詢也可以指定在目標資料表中要記錄歷史資料的輸出資料行子集。 其他數據行的變更會在原地更新,而不是產生新的歷史記錄。 下列範例示範如何排除 city 欄位的追蹤:

下列範例示範搭配 SCD 類型 2 使用追蹤歷程記錄:

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

執行此範例時,若沒有額外的 TRUNCATE 記錄,目標數據表將包含以下記錄:

userId 名字 城市 __START_AT __END_AT
123 伊莎貝爾 吉娃娃 1 6
124 勞爾 瓦哈卡州 1
125 梅賽德斯 瓜達拉哈拉 2
126 Lily 坎昆 2

產生測試數據

下列程式代碼會提供來產生範例數據集,以用於本教學課程中的範例查詢。 假設您有適當的認證來建立新的架構並建立新的數據表,您可以使用筆記本或 Databricks SQL 來執行這些語句。 下列程式代碼 並非 作為 DLT 管線的一部分執行:

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

範例:定期快照處理

下列範例示範 SCD 類型 2 處理,以擷取儲存在 mycatalog.myschema.mytable之數據表的快照集。 處理的結果會寫入名為 target的數據表。

mycatalog.myschema.mytable 在時間戳記 2024-01-01 00:00:00 的記錄

鑰匙 價值
1 a1
2 a2

mycatalog.myschema.mytable 的記錄在時間戳記 2024-01-01 12:00:00

鑰匙 價值
2 b2
3 a3
import dlt

@dlt.view(name="source")
def source():
 return spark.read.table("mycatalog.myschema.mytable")

dlt.create_streaming_table("target")

dlt.apply_changes_from_snapshot(
 target="target",
 source="source",
 keys=["key"],
 stored_as_scd_type=2
)

處理快照集之後,目標數據表會包含下列記錄:

鑰匙 價值 __START_AT __END_AT
1 a1 2024年01月01日 00時00分00秒 2024-01-01 12:00:00
2 a2 2024-01-01 00:00:00 2024-01-01 12:00:00
2 b2 2024-01-01 12:00:00
3 a3 2024-01-01 12:00:00

範例:歷史快照處理

下列範例示範 SCD 類型 2 處理,根據儲存在雲端記憶體系統中的兩個快照集的來源事件來更新目標數據表:

timestamp的快照集將儲存在 /<PATH>/filename1.csv

鑰匙 追蹤欄 非追蹤欄
1 a1 b1
2 a2 b2
4 a4 b4

timestamp + 5快照儲存於 /<PATH>/filename2.csv

鑰匙 追踪欄 非追蹤欄位
2 a2_new b2
3 a3 b3
4 a4 b4_new

下列程式代碼範例示範使用這些快照集處理 SCD 類型 2 更新:

import dlt

def exist(file_name):
  # Storage system-dependent function that returns true if file_name exists, false otherwise

# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
  latest_snapshot_version = latest_snapshot_version or 0
  next_version = latest_snapshot_version + 1
  file_name = "dir_path/filename_" + next_version + ".csv"
  if (exist(file_name)):
    return (spark.read.load(file_name), next_version)
   else:
     # No snapshot available
     return None

dlt.create_streaming_live_table("target")

dlt.apply_changes_from_snapshot(
  target = "target",
  source = next_snapshot_and_version,
  keys = ["Key"],
  stored_as_scd_type = 2,
  track_history_column_list = ["TrackingCol"]
)

處理快照集之後,目標數據表會包含下列記錄:

鑰匙 追踪欄 非追蹤欄 __START_AT __END_AT
1 a1 b1 1 2
2 a2 b2 1 2
2 a2_new b2 2
3 a3 b3 2
4 a4 b4_new 1

在目標串流數據表中新增、變更或刪除數據

如果您的管線將數據表發行至 Unity 目錄,您可以使用 數據作語言 (DML) 語句,包括插入、更新、刪除和合併語句,修改 APPLY CHANGES INTO 語句所建立的目標串流數據表。

注意

  • 不支援修改串流數據表之數據表架構的 DML 語句。 請確定您的 DML 語句不會嘗試演進數據表架構。
  • 更新串流數據表的 DML 語句只能在共用的 Unity 目錄叢集或 SQL 倉儲中使用 Databricks Runtime 13.3 LTS 和更新版本來執行。
  • 因為串流需要僅附加數據源,如果您的處理需要從具有變更的來源串流數據表進行串流處理(例如,透過 DML 語句),請在讀取來源串流數據表時設定 skipChangeCommits 旗標。 設定 skipChangeCommits 時,會忽略刪除或修改源數據表上記錄的交易。 如果您的處理不需要串流數據表,您可以使用物化檢視(不受僅附加限制)作為目標表。

由於 DLT 使用指定的 SEQUENCE BY 資料行,並將適當的排序值傳播至目標數據表的 __START_AT__END_AT 資料行(針對 SCD 類型 2),因此您必須確保 DML 語句使用這些數據行的有效值,以維護記錄的適當順序。 請參閱 如何使用 APPLY CHANGES API 實作 CDC?

如需搭配串流數據表使用 DML 語句的詳細資訊,請參閱 在串流數據表中新增、變更或刪除資料

下列範例會插入使用中記錄,開始序列為 5:

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);

APPLY CHANGES 目標數據表讀取變更數據摘要

在 Databricks Runtime 15.2 和更新版本中,您可以從串流資料表讀取變更資料提要,該串流資料表是 APPLY CHANGESAPPLY CHANGES FROM SNAPSHOT 查詢的目標,與從其他 Delta 資料表讀取變更資料提要的方式相同。 要讀取目標串流表的變更資料摘要,需具備以下條件:

  • 目標串流數據表必須發佈至 Unity 目錄。 請參閱 使用 Unity 目錄搭配您的 DLT 管線
  • 若要從目標串流數據表讀取變更數據摘要,您必須使用 Databricks Runtime 15.2 或更新版本。 若要讀取另一個 DLT 管線中的變更數據摘要,管線必須設定為使用 Databricks Runtime 15.2 或更新版本。

您可以從 DLT 管線中建立的目標串流數據表讀取變更數據摘要,就像從其他 Delta 數據表讀取變更數據摘要一樣。 若要深入瞭解如何使用 Delta Lake 的變更資料饋送功能,包括 Python 和 SQL 中的範例,請參閱 在 Azure Databricks 上使用 Delta Lake 變更資料饋送功能

注意

變更數據摘要記錄包含 元數據 識別變更事件類型。 當記錄在數據表中被更新時,相關聯變更記錄的元數據通常會將 _change_type 值設定為 update_preimageupdate_postimage 事件。

不過,如果更新包含變更主鍵值的目標串流數據表,則 _change_type 值會有所不同。 當變更包含主鍵的更新時,_change_type 元數據欄位會設定為 insertdelete 事件。 主鍵可能會變更,當使用 UPDATEMERGE 陳述式手動更新其中一個主鍵欄位時,或在 SCD 類型 2 表中,當 __start_at 字段變更以反映先前的開始順序值時。

APPLY CHANGES 查詢會決定主鍵值,這與 SCD 類型 1 和 SCD 類型 2 處理不同:

  • 針對 SCD 類型 1 處理和 DLT Python 介面,主鍵是 apply_changes() 函式中 keys 參數的值。 對於 DLT SQL 介面,主鍵是 APPLY CHANGES INTO 語句中 KEYS 子句所定義的欄位。
  • 針對 SCD 類型 2,主鍵是 keys 參數或 KEYS 子句,加上來自 coalesce(__START_AT, __END_AT) 作業的傳回值,其中 __START_AT__END_AT 是目標串流數據表中的對應數據行。

取得有關 DLT CDC 查詢所處理紀錄的資料

注意

下列計量只會由 APPLY CHANGES 查詢擷取,而不是透過 APPLY CHANGES FROM SNAPSHOT 查詢來擷取。

APPLY CHANGES 查詢將收集下列指標:

  • num_upserted_rows:更新期間插入數據集的輸出數據列數目。
  • num_deleted_rows:更新期間從數據集刪除的現有輸出數據列數目。

apply changes 查詢不會捕捉與非 CDC 流程相關的 num_output_rows 指標。

哪些數據對象用於 DLT CDC 處理?

注意

  • 這些數據結構只適用於 APPLY CHANGES 處理,而不是 APPLY CHANGES FROM SNAPSHOT 處理。
  • 只有當目標數據表發佈至Hive中繼存放區時,才會套用這些數據結構。 如果管線發佈至 Unity 目錄,用戶無法存取內部備份數據表。

當您在Hive中繼存放區中宣告目標數據表時,會建立兩個數據結構:

  • 使用分配給目標資料表的名稱來檢視。
  • DLT 用來管理 CDC 處理的內部備份數據表。 此資料表的命名方式是在目標數據表名稱前面加上 __apply_changes_storage_

例如,如果您宣告名為 dlt_cdc_target的目標數據表,您會看到名為 dlt_cdc_target 的檢視,以及中繼存放區中名為 __apply_changes_storage_dlt_cdc_target 的數據表。 建立視圖可讓 DLT 篩除處理非順序數據所需的額外資訊(例如,墓碑和版本)。 若要檢視已處理的數據,請查詢目標檢視。 因為 __apply_changes_storage_ 數據表的架構可能會變更以支持未來的功能或增強功能,因此您不應該查詢數據表以供生產環境使用。 如果您手動將數據新增至表格,系統會假設這些記錄先於其他變更,因為缺少版本欄位。