共用方式為


套用變更 API:使用差異實時數據表簡化異動數據擷取

Delta Live Tables 會使用 APPLY CHANGESAPPLY CHANGES FROM SNAPSHOT API 簡化異動數據擷取 (CDC)。 您使用的介面取決於變更資料的來源:

  • 用來 APPLY CHANGES 處理變更數據摘要 (CDF) 的變更。
  • 使用 APPLY CHANGES FROM SNAPSHOT (公開預覽) 來處理資料庫快照集的變更。

先前, MERGE INTO 語句通常用於處理 Azure Databricks 上的 CDC 記錄。 不過, MERGE INTO 可能會因為順序錯亂記錄而產生不正確的結果,或需要複雜的邏輯來重新排序記錄。

差異 APPLY CHANGES 實時數據表 SQL 和 Python 介面支援 API。 Delta APPLY CHANGES FROM SNAPSHOT Live Tables Python 介面支援 API。

APPLY CHANGESAPPLY CHANGES FROM SNAPSHOT 支援使用 SCD 型態 1 與類型 2 更新資料表:

  • 使用 SCD 類型 1 直接更新記錄。 記錄不會針對更新的記錄保留。
  • 使用 SCD 類型 2 來保留記錄的歷程記錄,無論是在所有更新或更新到一組指定的數據行上。

如需語法和其他參考,請參閱:

注意

本文說明如何根據源數據變更,更新 Delta Live Tables 管線中的數據表。 若要瞭解如何記錄和查詢 Delta 數據表的數據列層級變更資訊,請參閱 在 Azure Databricks 上使用 Delta Lake 變更數據摘要。

需求

若要使用 CDC API,您的管線必須設定為使用無伺服器 DLT 管線或 Delta Live Tables ProAdvanced版本

CDC 如何透過 API 實作 APPLY CHANGES

藉由自動處理順序外記錄, APPLY CHANGES Delta 即時數據表中的 API 可確保正確處理 CDC 記錄,並移除開發處理順序外記錄的複雜邏輯的需求。 您必須在源數據中指定要排序記錄的數據行,而 Delta Live Tables 會將其解譯為單調增加源數據順序的表示法。 Delta Live Tables 會自動處理依序抵達的數據。 針對 SCD 類型 2 變更,Delta Live Tables 會將適當的排序值傳播至目標數據表的 __START_AT__END_AT 數據行。 每個排序值的每個索引鍵都應該有一個不同的更新,而且不支援 NULL 排序值。

若要使用 APPLY CHANGES執行 CDC 處理,請先建立串流數據表,然後使用 APPLY CHANGES INTO SQL 中的 語句或 apply_changes() Python 中的 函式來指定變更摘要的來源、索引鍵和排序。 若要建立目標串流數據表,請在 SQL 中使用 CREATE OR REFRESH STREAMING TABLE 語句,或在 Python 中使用 函 create_streaming_table() 式。 請參閱 SCD 類型 1 和類型 2 處理範例。

如需語法詳細數據,請參閱 Delta Live Tables SQL 參考Python 參考

CDC 如何透過 API 實作 APPLY CHANGES FROM SNAPSHOT

重要

APPLY CHANGES FROM SNAPSHOT API 正處於公開預覽

APPLY CHANGES FROM SNAPSHOT 是宣告式 API,可藉由比較一系列順序快照集,以有效率地判斷源數據中的變更,然後執行 CDC 處理快照集中記錄所需的處理。 APPLY CHANGES FROM SNAPSHOT 只有 Delta Live Tables Python 介面才支援。

APPLY CHANGES FROM SNAPSHOT 支援從多個來源類型擷取快照集:

  • 使用定期快照集擷取,從現有的數據表或檢視中擷取快照集。 APPLY CHANGES FROM SNAPSHOT 具有簡單的簡化介面,可支援從現有的資料庫物件定期內嵌快照集。 系統會在每個管線更新中擷取新的快照集,而擷取時間會當做快照集版本使用。 當管線以連續模式執行時,系統會在包含APPLY CHANGES FROM SNAPSHOT處理的流程觸發間隔設定所決定的期間內嵌多個快照集與每個管線更新。
  • 使用歷程記錄快照集擷取來處理包含資料庫快照集的檔案,例如從 Oracle 或 MySQL 資料庫或數據倉儲產生的快照集。

若要使用 執行來自任何來源類型的 APPLY CHANGES FROM SNAPSHOTCDC 處理,請先建立串流數據表,然後使用 apply_changes_from_snapshot() Python 中的 函式來指定實作處理所需的快照集、索引鍵和其他自變數。 請參閱定期快照集擷取歷程記錄快照擷取範例。

傳遞至 API 的快照集必須依版本遞增順序。 如果 Delta Live Tables 偵測到順序錯亂的快照集,則會擲回錯誤。

如需語法詳細數據,請參閱 Delta Live Tables Python 參考

限制

用於排序的數據行必須是可排序的數據類型。

範例:使用CDF源數據處理SCD類型1和SCD類型2

下列各節提供差異實時數據表 SCD 類型 1 和類型 2 查詢的範例,這些查詢會根據變更資料摘要中的來源事件來更新目標數據表:

  1. 建立新的用戶記錄。
  2. 刪除用戶記錄。
  3. 更新用戶記錄。 在 SCD 類型 1 範例中,最後一個 UPDATE 作業遲到且從目標數據表卸除,示範如何處理順序錯亂的事件。

下列範例假設熟悉設定和更新 Delta Live Tables 管線。 請參閱教學課程:執行您的第一個差異即時資料表管線

若要執行這些範例,您必須從建立範例數據集開始。 請參閱 產生測試數據

以下是這些範例的輸入記錄:

userId NAME 市/鎮 作業 sequenceNum
124 勞爾 瓦哈卡州 INSERT 1
123 Isabel 蒙特雷 INSERT 1
125 梅賽德斯 提 華納 INSERT 2
126 百合 坎昆 INSERT 2
123 null null DELETE 6
125 梅賽德斯 Guadalajara UPDATE 6
125 梅賽德斯 Mexicali UPDATE 5
123 Isabel Chihuahua UPDATE 5

如果您取消批注範例數據中的最後一個數據列,它會插入下列記錄,指定應截斷記錄的位置:

userId NAME 市/鎮 作業 sequenceNum
null null null TRUNCATE 3

注意

下列所有範例都包含指定 DELETETRUNCATE 作業的選項,但每個選項都是選擇性的。

處理 SCD 類型 1 更新

下列範例示範處理 SCD 類型 1 更新:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

執行 SCD 類型 1 範例之後,目標資料表會包含下列記錄:

userId NAME 市/鎮
124 勞爾 瓦哈卡州
125 梅賽德斯 Guadalajara
126 百合 坎昆

使用其他TRUNCATE記錄執行 SCD 類型 1 範例之後,會124126因為 TRUNCATE 位於sequenceNum=3的作業而截斷 和 ,而目標數據表包含下列記錄:

userId NAME 市/鎮
125 梅賽德斯 Guadalajara

處理 SCD 類型 2 更新

下列範例示範處理 SCD 類型 2 更新:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

執行 SCD 類型 2 範例之後,目標資料表會包含下列記錄:

userId NAME 市/鎮 __START_AT __END_AT
123 Isabel 蒙特雷 1 5
123 Isabel Chihuahua 5 6
124 勞爾 瓦哈卡州 1 null
125 梅賽德斯 提 華納 2 5
125 梅賽德斯 Mexicali 5 6
125 梅賽德斯 Guadalajara 6 null
126 百合 坎昆 2 null

SCD 類型 2 查詢也可以指定要追蹤目標資料表中記錄的輸出資料行子集。 其他數據行的變更會就地更新,而不是產生新的記錄記錄。 下列範例示範從追蹤中排除數據 city 行:

下列範例示範搭配 SCD 類型 2 使用追蹤歷程記錄:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

在沒有其他 TRUNCATE 記錄的情況下執行此範例之後,目標數據表會包含下列記錄:

userId NAME 市/鎮 __START_AT __END_AT
123 Isabel Chihuahua 1 6
124 勞爾 瓦哈卡州 1 null
125 梅賽德斯 Guadalajara 2 null
126 百合 坎昆 2 null

產生測試數據

下列程式代碼會提供來產生範例數據集,以用於本教學課程中的範例查詢。 假設您有適當的認證來建立新的架構並建立新的數據表,您可以使用筆記本或 Databricks SQL 來執行這些語句。 下列程式代碼 不是 要當做 Delta Live Tables 管線的一部分執行:

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

範例:定期快照集處理

下列範例示範 SCD 類型 2 處理,以擷取儲存在 的 mycatalog.myschema.mytable數據表快照集。 處理的結果會寫入名為 target的數據表。

mycatalog.myschema.mytable 時間戳 2024-01-01 00:00:00 的記錄

機碼
1 a1
2 a2

mycatalog.myschema.mytable 時間戳 2024-01-01 12:00:00 的記錄

機碼
2 b2
3 a3
import dlt

@dlt.view(name="source")
def source():
 return spark.read.table("mycatalog.myschema.mytable")

dlt.create_streaming_table("target")

dlt.apply_changes_from_snapshot(
 target="target",
 source="source",
 keys=["key"],
 stored_as_scd_type=2
)

處理快照集之後,目標數據表會包含下列記錄:

機碼 __START_AT __END_AT
1 a1 2024-01-01 00:00:00 2024-01-01 12:00:00
2 a2 2024-01-01 00:00:00 2024-01-01 12:00:00
2 b2 2024-01-01 12:00:00 null
3 a3 2024-01-01 12:00:00 null

範例:歷程記錄快照集處理

下列範例示範 SCD 類型 2 處理,根據儲存在雲端記憶體系統中的兩個快照集的來源事件來更新目標數據表:

儲存在的 timestamp快照集 /<PATH>/filename1.csv

機碼 TrackingColumn NonTrackingColumn
1 a1 b1
2 a2 b2
4 a4 b4

儲存在的 timestamp + 5快照集 /<PATH>/filename2.csv

機碼 TrackingColumn NonTrackingColumn
2 a2_new b2
3 a3 b3
4 a4 b4_new

下列程式代碼範例示範使用這些快照集處理 SCD 類型 2 更新:

import dlt

def exist(file_name):
  # Storage system-dependent function that returns true if file_name exists, false otherwise

# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
  latest_snapshot_version = latest_snapshot_version or 0
  next_version = latest_snapshot_version + 1
  file_name = "dir_path/filename_" + next_version + ".csv"
  if (exist(file_name)):
    return (spark.read.load(file_name), next_version)
   else:
     # No snapshot available
     return None

dlt.create_streaming_live_table("target")

dlt.apply_changes_from_snapshot(
  target = "target",
  source = next_snapshot_and_version,
  keys = ["Key"],
  stored_as_scd_type = 2,
  track_history_column_list = ["TrackingCol"]
)

處理快照集之後,目標數據表會包含下列記錄:

機碼 TrackingColumn NonTrackingColumn __START_AT __END_AT
1 a1 b1 1 2
2 a2 b2 1 2
2 a2_new b2 2 null
3 a3 b3 2 null
4 a4 b4_new 1 null

在目標串流數據表中新增、變更或刪除數據

如果您的管線將數據表發佈至 Unity 目錄,您可以使用 資料操作語言 (DML) 語句,包括插入、更新、刪除和合併語句,修改語句所 APPLY CHANGES INTO 建立的目標串流數據表。

注意

  • 不支援修改串流資料表之資料表結構描述的 DML 陳述式。 請確定您的 DML 陳述式不會嘗試演進資料表結構描述。
  • 更新串流資料表的 DML 陳述式只能在共用的 Unity Catalog 叢集或 SQL 倉儲中使用 Databricks Runtime 13.3 LTS 和更新版本來執行。
  • 因為串流需要「僅附加」資料來源,如果您的處理需要從具有變更的來源串流資料表進行串流處理 (例如透過 DML 陳述式),請在讀取來源串流資料表時設定 skipChangeCommits 旗標。 設定 skipChangeCommits 時,會忽略刪除或修改來源資料表中記錄的交易。 如果您的處理不需要串流資料表,則可以使用具體化檢視 (沒有「僅附加」限制) 作為目標資料表。

由於 Delta Live Tables 會使用指定的 SEQUENCE BY 數據行,並將適當的排序值傳播至 __START_AT 目標數據表的 和 __END_AT 數據行(針對 SCD 類型 2),因此您必須確保 DML 語句使用這些數據行的有效值,以維護記錄的適當順序。 請參閱 如何使用APPLY CHANGES API實作 CDC?

如需搭配串流數據表使用 DML 語句的詳細資訊,請參閱 在串流數據表中新增、變更或刪除數據。

下列範例會插入使用中記錄,開始序列為 5:

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);

APPLY CHANGES 目標數據表讀取變更數據摘要

在 Databricks Runtime 15.2 和更新版本中,您可以從串流數據表讀取變更數據摘要,該數據流數據表是 或 APPLY CHANGES FROM SNAPSHOT 查詢的目標APPLY CHANGES,就像從其他 Delta 數據表讀取變更數據摘要一樣。 需要下列專案,才能從目標串流數據表讀取變更數據摘要:

  • 目標串流數據表必須發佈至 Unity 目錄。 請參閱 搭配您的 Delta Live Tables 管線使用 Unity 目錄。
  • 若要從目標串流數據表讀取變更數據摘要,您必須使用 Databricks Runtime 15.2 或更新版本。 若要讀取另一個 Delta Live Tables 管線中的變更數據摘要,管線必須設定為使用 Databricks Runtime 15.2 或更新版本。

您可以從在 Delta Live Tables 管線中建立的目標串流數據表讀取變更數據摘要,就像從其他 Delta 數據表讀取變更數據摘要一樣。 若要深入瞭解如何使用差異變更數據摘要功能,包括 Python 和 SQL 中的範例,請參閱 在 Azure Databricks 上使用 Delta Lake 變更數據摘要。

注意

變更數據摘要記錄包含 識別變更事件類型的元數據 。 在數據表中更新記錄時,相關聯變更記錄的元數據通常包含 _change_type 設定為 update_preimageupdate_postimage 事件的值。

不過,如果對包含變更主鍵值的目標串流數據表進行更新,這些 _change_type 值會有所不同。 當變更包含主鍵的更新時, _change_type 元數據欄位會設定為 insertdelete 事件。 當欄位變更以反映先前的開始順序值時,可能會對具有 UPDATEMERGE 語句的其中一個索引鍵字段進行手動更新,或 __start_at 針對 SCD 類型 2 數據表進行手動更新。

查詢 APPLY CHANGES 會決定主鍵值,這與 SCD 類型 1 和 SCD 類型 2 處理不同:

  • 針對 SCD 類型 1 處理和 Delta Live Tables Python 介面,主鍵是函式中 apply_changes() 參數的值keys。 對於 Delta Live Tables SQL 介面,主鍵是 語句中 APPLY CHANGES INTO 子句所KEYS定義的數據行。
  • 針對 SCD 類型 2,主鍵是 keys 參數或 KEYS 子句,加上作業的傳回值 coalesce(__START_AT, __END_AT) ,其中 __START_AT__END_AT 是目標串流數據表中的對應數據行。

取得差異實時數據表 CDC 查詢所處理記錄的相關數據

注意

下列計量只會由 APPLY CHANGES 查詢擷取,而不是由查詢擷 APPLY CHANGES FROM SNAPSHOT 取。

查詢會 APPLY CHANGES 擷取下列計量:

  • num_upserted_rows:更新期間,插入數據集的輸出數據列數目。
  • num_deleted_rows:更新期間從數據集刪除的現有輸出數據列數目。

num_output_rows不會針對apply changes查詢擷取非 CDC 流程的計量輸出。

哪些數據對象用於 Delta Live Tables CDC 處理?

注意:下列數據結構僅適用於 APPLY CHANGES 處理,而非 APPLY CHANGES FROM SNAPSHOT 處理。

當您在Hive中繼存放區中宣告目標數據表時,會建立兩個數據結構:

  • 使用指派給目標數據表的名稱檢視。
  • Delta Live Tables 用來管理 CDC 處理的內部備份數據表。 此數據表是以目標數據表名稱前面加上 __apply_changes_storage_ 來命名。

例如,如果您宣告名為 dlt_cdc_target的目標數據表,您會看到名為 dlt_cdc_target 的檢視和中繼存放區中名為 __apply_changes_storage_dlt_cdc_target 的數據表。 建立檢視可讓 Delta Live Tables 篩選出處理順序錯亂數據所需的額外資訊(例如,墓碑和版本)。 若要檢視已處理的數據,請查詢目標檢視。 因為數據表的 __apply_changes_storage_ 架構可能會變更以支持未來的功能或增強功能,因此您不應該查詢數據表以供生產環境使用。 如果您手動將數據新增至數據表,則會假設記錄在其他變更之前,因為版本數據行遺失。

如果管線發佈至 Unity 目錄,用戶無法存取內部備份數據表。