共用方式為


套用變更 API:透過 Delta Live Tables 簡化異動數據擷取

Delta Live Tables 使用 APPLY CHANGESAPPLY CHANGES FROM SNAPSHOT API 簡化異動數據擷取 (CDC)。 您使用的介面取決於變更資料的來源:

  • 用來 APPLY CHANGES 處理變更數據摘要 (CDF) 的變更。
  • 使用 APPLY CHANGES FROM SNAPSHOT (公開預覽) 來處理資料庫快照集的變更。

先前, MERGE INTO 語句通常用於處理 Azure Databricks 上的 CDC 記錄。 不過, MERGE INTO 可能會因為順序錯亂記錄而產生不正確的結果,或需要複雜的邏輯來重新排序記錄。

Delta Live Tables 的 SQL 和 Python 介面均支援 APPLY CHANGES API。 Delta Live Tables Python 介面支援 APPLY CHANGES FROM SNAPSHOT API。

APPLY CHANGESAPPLY CHANGES FROM SNAPSHOT 支援使用 SCD 型態 1 與類型 2 更新資料表:

  • 使用 SCD 類型 1 直接更新記錄。 記錄不會針對更新的記錄保留。
  • 使用 SCD 類型 2 來保留紀錄的歷史,無論是對所有更新,或僅限於某些指定欄位的更新。

如需語法和其他參考,請參閱:

注意

本文說明如何根據源數據變更,更新 Delta Live Tables 管線中的數據表。 若要瞭解如何記錄和查詢 Delta 表格的行層級變更資訊,請參閱 在 Azure Databricks 上使用 Delta Lake 變更數據饋送

需求

若要使用 CDC API,您的管線必須設定為使用 無伺服器 DLT 管線 或 Delta Live Tables ProAdvanced版本

CDC 如何透過 API 實作 APPLY CHANGES

藉由自動處理順序外記錄,Delta Live Tables 中的 APPLY CHANGES API 可確保正確處理 CDC 記錄,並移除開發處理順序外記錄的複雜邏輯的需求。 您必須在源數據中指定一個欄位來進行排序,Delta Live Tables 將其解釋為源數據正確排序的一種遞增表示法。 Delta Live Tables 會自動處理順序錯亂抵達的數據。 針對 SCD 類型 2 變更,Delta Live Tables 會將適當的排序值傳播至目標數據表的 __START_AT__END_AT 數據行。 每個排序值的每個索引鍵都應該有一個不同的更新,而且不支援 NULL 排序值。

若要使用 APPLY CHANGES執行 CDC 處理,請先建立串流數據表,然後使用 SQL 中的 APPLY CHANGES INTO 語句,或 Python 中的 apply_changes() 函式來指定變更摘要的來源、索引鍵和排序。 若要建立目標串流數據表,請在 SQL 中使用 CREATE OR REFRESH STREAMING TABLE 語句,或在 Python 中使用 create_streaming_table() 函式。 請參閱 SCD 類型 1 和類型 2 處理範例。

如需語法詳細數據,請參閱 Delta Live Tables SQL 參考Python 參考

CDC 如何透過 API 實作 APPLY CHANGES FROM SNAPSHOT

重要

APPLY CHANGES FROM SNAPSHOT API 正處於公開預覽

APPLY CHANGES FROM SNAPSHOT 是宣告式 API,可藉由比較一系列順序快照集,以有效率地判斷源數據中的變更,然後執行 CDC 處理快照集中記錄所需的處理。 只有 Delta Live Tables Python 介面才支援 APPLY CHANGES FROM SNAPSHOT

APPLY CHANGES FROM SNAPSHOT 支援從多個來源類型擷取快照集:

  • 使用定期快照擷取,從現有的資料表或視圖中擷取快照。 APPLY CHANGES FROM SNAPSHOT 具有簡單的簡化介面,可支援從現有的資料庫物件定期內嵌快照集。 系統會在每次管線更新時匯入新的快照集,而匯入的時間會作為快照集的版本使用。 當管線以連續模式執行時,會在每個管線更新期間內嵌多個快照集,此期間是由包含APPLY CHANGES FROM SNAPSHOT 處理的流程 觸發間隔 設定所決定。
  • 使用歷程記錄快照集擷取來處理包含資料庫快照集的檔案,例如從 Oracle 或 MySQL 資料庫或數據倉儲產生的快照集。

若要從具有 APPLY CHANGES FROM SNAPSHOT的任何來源類型執行 CDC 處理,請先建立串流數據表,然後使用 Python 中的 apply_changes_from_snapshot() 函式來指定實作處理所需的快照集、索引鍵和其他自變數。 請參閱定期快照集擷取歷程記錄快照擷取範例。

傳遞至 API 的快照集必須依版本遞增順序。 如果 Delta Live Tables 偵測到順序不正確的快照集,則會引發錯誤。

如需語法詳細數據,請參閱 Delta Live Tables Python 參考

限制

用於排序的數據行必須是可排序的數據類型。

範例:使用CDF源數據處理SCD類型1和SCD類型2

下列各節提供 Delta Live Tables SCD 類型 1 和類型 2 查詢的範例,這些查詢透過變更資料饋送中的來源事件來更新目標資料表:

  1. 建立新的用戶記錄。
  2. 刪除用戶記錄。
  3. 更新用戶記錄。 在 SCD 類型 1 的範例中,最後的 UPDATE 作業到達延遲並被從目標表中刪除,以展示如何處理無序事件。

假設您熟悉設定和更新 Delta Live Tables 管線,下列範例將有助於理解。 請參閱 教程:執行您的第一個 Delta Live Tables 工作流程

若要執行這些範例,您必須從建立範例數據集開始。 請參閱 產生測試資料

以下是這些範例的輸入記錄:

userId NAME 市/鎮 作業 sequenceNum
124 勞爾 瓦哈卡州 INSERT 1
123 Isabel 蒙特雷 INSERT 1
125 梅賽德斯 提 華納 INSERT 2
126 百合 坎昆 INSERT 2
123 null null DELETE 6
125 梅賽德斯 Guadalajara UPDATE 6
125 梅賽德斯 Mexicali UPDATE 5
123 Isabel Chihuahua UPDATE 5

如果您取消批注範例數據中的最後一個數據列,它會插入下列記錄,指定應截斷記錄的位置:

userId NAME 市/鎮 作業 sequenceNum
null null null TRUNCATE 3

注意

下列所有範例都包含指定 DELETETRUNCATE 作業的選項,但每個選項都是選擇性的。

處理 SCD 類型 1 更新

下列範例示範處理 SCD 類型 1 更新:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

執行 SCD 類型 1 範例之後,目標資料表會包含下列記錄:

userId NAME 市/鎮
124 勞爾 瓦哈卡州
125 梅賽德斯 Guadalajara
126 百合 坎昆

在執行具有新增的 TRUNCATE 記錄的 SCD 類型 1 範例之後,因為 sequenceNum=3TRUNCATE 作業,記錄 124126 被截斷,目標數據表包含以下記錄:

userId NAME 市/鎮
125 梅賽德斯 Guadalajara

處理 SCD 類型 2 更新

下列範例示範處理 SCD 類型 2 更新:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

執行 SCD 類型 2 範例之後,目標資料表會包含下列記錄:

userId NAME 市/鎮 __START_AT __END_AT
123 Isabel 蒙特雷 1 5
123 Isabel Chihuahua 5 6
124 勞爾 瓦哈卡州 1 null
125 梅賽德斯 提 華納 2 5
125 梅賽德斯 Mexicali 5 6
125 梅賽德斯 Guadalajara 6 null
126 百合 坎昆 2 null

SCD 類型 2 查詢也可以指定要在目標資料表中追蹤歷史的輸出欄位子集。 其他欄位的變更會就地更新,而不是產生新的歷史記錄。 下列範例示範排除追蹤 city 欄:

下列範例示範搭配 SCD 類型 2 使用追蹤歷程記錄:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

在執行這個範例時,若沒有附加的 TRUNCATE 記錄,目標數據表會包含下列記錄:

userId NAME 市/鎮 __START_AT __END_AT
123 Isabel Chihuahua 1 6
124 勞爾 瓦哈卡州 1 null
125 梅賽德斯 Guadalajara 2 null
126 百合 坎昆 2 null

產生測試數據

下列程式代碼會提供來產生範例數據集,以用於本教學課程中的範例查詢。 假設您有適當的認證來建立新的架構並建立新的數據表,您可以使用筆記本或 Databricks SQL 來執行這些語句。 下列程式碼 並非設計成作為 Delta Live Tables 流程的一部分執行

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

範例:定期快照集處理

下列範例示範 SCD 類型 2 處理,以擷取儲存在 mycatalog.myschema.mytable之數據表的快照集。 處理的結果會寫入名為 target的數據表。

mycatalog.myschema.mytable 時間戳 2024-01-01 00:00:00 的記錄

機碼
1 a1
2 a2

mycatalog.myschema.mytable 時間戳 2024-01-01 12:00:00 的記錄

機碼
2 b2
3 a3
import dlt

@dlt.view(name="source")
def source():
 return spark.read.table("mycatalog.myschema.mytable")

dlt.create_streaming_table("target")

dlt.apply_changes_from_snapshot(
 target="target",
 source="source",
 keys=["key"],
 stored_as_scd_type=2
)

處理快照集之後,目標數據表會包含下列記錄:

機碼 __START_AT __END_AT
1 a1 2024-01-01 00:00:00 2024-01-01 12:00:00
2 a2 2024-01-01 00:00:00 2024-01-01 12:00:00
2 b2 2024-01-01 12:00:00 null
3 a3 2024-01-01 12:00:00 null

範例:歷程記錄快照集處理

下列範例示範 SCD 類型 2 處理,根據儲存在雲端記憶體系統中的兩個快照集的來源事件來更新目標數據表:

儲存在的 timestamp快照集 /<PATH>/filename1.csv

機碼 TrackingColumn NonTrackingColumn
1 a1 b1
2 a2 b2
4 a4 b4

儲存在的 timestamp + 5快照集 /<PATH>/filename2.csv

機碼 TrackingColumn NonTrackingColumn
2 a2_new b2
3 a3 b3
4 a4 b4_new

下列程式代碼範例示範使用這些快照集處理 SCD 類型 2 更新:

import dlt

def exist(file_name):
  # Storage system-dependent function that returns true if file_name exists, false otherwise

# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
  latest_snapshot_version = latest_snapshot_version or 0
  next_version = latest_snapshot_version + 1
  file_name = "dir_path/filename_" + next_version + ".csv"
  if (exist(file_name)):
    return (spark.read.load(file_name), next_version)
   else:
     # No snapshot available
     return None

dlt.create_streaming_live_table("target")

dlt.apply_changes_from_snapshot(
  target = "target",
  source = next_snapshot_and_version,
  keys = ["Key"],
  stored_as_scd_type = 2,
  track_history_column_list = ["TrackingCol"]
)

處理快照集之後,目標數據表會包含下列記錄:

機碼 TrackingColumn NonTrackingColumn __START_AT __END_AT
1 a1 b1 1 2
2 a2 b2 1 2
2 a2_new b2 2 null
3 a3 b3 2 null
4 a4 b4_new 1 null

在目標串流數據表中新增、變更或刪除數據

如果您的管線將數據表發行至 Unity 目錄,您可以使用 數據操作語言 (DML) 語句,包括插入、更新、刪除和合併語句,修改 APPLY CHANGES INTO 語句所建立的目標串流數據表。

注意

  • 不支援修改串流數據表之數據表架構的 DML 語句。 請確定您的 DML 語句不會嘗試演進數據表架構。
  • 更新串流數據表的 DML 語句只能在共用的 Unity 目錄叢集或 SQL 倉儲中使用 Databricks Runtime 13.3 LTS 和更新版本來執行。
  • 因為串流需要僅附加數據源,如果您的處理需要從具有變更的來源串流數據表進行串流處理(例如,透過 DML 語句),請在讀取來源串流數據表時設定 skipChangeCommits 旗標。 設定 skipChangeCommits 時,會忽略刪除或修改源數據表上記錄的交易。 如果您的處理不需要串流表格,您可以使用實體化檢視(沒有僅限於附加的限制)作為目標表格。

由於 Delta Live Tables 使用指定的 SEQUENCE BY 數據行,並將適當的排序值傳播至目標數據表 __START_AT__END_AT 數據行(針對 SCD 類型 2),因此您必須確保 DML 語句使用這些數據行的有效值,以維持記錄的適當順序。 請參閱 如何使用APPLY CHANGES API實作 CDC?

如需搭配串流數據表使用 DML 語句的詳細資訊,請參閱 在串流數據表中新增、變更或刪除資料

下列範例會插入使用中記錄,開始序列為 5:

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);

APPLY CHANGES 目標數據表讀取變更數據摘要

在 Databricks Runtime 15.2 和更高版本中,您可以從作為 APPLY CHANGESAPPLY CHANGES FROM SNAPSHOT 查詢目標的串流表中讀取變更數據饋送,就像從其他 Delta 表中讀取變更數據饋送一樣。 需要下列條件,才能在目標串流資料表讀取變更資料流:

  • 目標串流數據表必須發佈至 Unity 目錄。 請參閱 使用 Unity 目錄搭配您的 Delta Live Tables 管線
  • 若要從目標串流數據表讀取變更數據摘要,您必須使用 Databricks Runtime 15.2 或更新版本。 若要讀取另一個 Delta Live Tables 管線中的變更數據摘要,管線必須設定為使用 Databricks Runtime 15.2 或更新版本。

您可以從在 Delta Live Tables 管線中建立的目標串流數據表讀取變更數據摘要,就像從其他 Delta 數據表讀取變更數據摘要一樣。 若要深入瞭解如何使用差異變更數據摘要功能,包括 Python 和 SQL 中的範例,請參閱 在 Azure Databricks 上使用 Delta Lake 變更數據摘要。

注意

變更數據摘要記錄包含 識別變更事件類型的元數據 。 當表格中的記錄被更新時,相關變更記錄的元數據通常會包含設定為update_preimageupdate_postimage事件的_change_type值。

如果對包含變更主鍵值的目標串流表進行更新,則 _change_type 值會有所不同。 當變更包含主鍵的更新時,_change_type 元數據欄位會設定為 insertdelete 事件。 當使用 UPDATEMERGE 語句對任一主鍵欄位進行手動更新時,或者當 SCD 類型 2 表中 __start_at 欄位變更以反映之前的初始序列值時,主鍵可能會發生變更。

APPLY CHANGES 查詢會決定主鍵值,這與 SCD 類型 1 和 SCD 類型 2 處理不同:

  • 針對 SCD 類型 1 處理和 Delta Live Tables Python 介面,主鍵是 apply_changes() 函式中 keys 參數的值。 對於 Delta Live Tables SQL 介面,主鍵是 APPLY CHANGES INTO 語句中 KEYS 子句所定義的數據行。
  • 針對 SCD 類型 2,主鍵是 keys 參數或 KEYS 子句,加上來自 coalesce(__START_AT, __END_AT) 作業的傳回值,其中 __START_AT__END_AT 是目標串流數據表中的對應數據行。

取得Delta Live Tables CDC查詢中處理記錄的數據

注意

下列計量只會由 APPLY CHANGES 查詢擷取,而不是由查詢擷 APPLY CHANGES FROM SNAPSHOT 取。

查詢會 APPLY CHANGES 擷取下列計量:

  • num_upserted_rows:更新期間插入數據集的輸出數據列數目。
  • num_deleted_rows:更新期間從數據集刪除的現有輸出數據列數目。

num_output_rows不會針對apply changes查詢擷取非 CDC 流程的計量輸出。

Delta Live Tables CDC 處理使用哪些資料物件?

注意

  • 這些數據結構只適用於 APPLY CHANGES 處理,而不是 APPLY CHANGES FROM SNAPSHOT 處理。
  • 只有當目標數據表發佈至Hive中繼存放區時,才會套用這些數據結構。 如果管線發佈至 Unity 目錄,用戶無法存取內部備份數據表。

當您在Hive中繼存放區中宣告目標數據表時,會建立兩個數據結構:

  • 使用指派給目標數據表的名稱檢視。
  • Delta Live Tables 用來管理 CDC 處理的內部備份數據表。 此資料表的命名方式是在目標數據表名稱前面加上 __apply_changes_storage_

例如,如果您宣告名為 dlt_cdc_target的目標數據表,您會看到名為 dlt_cdc_target 的檢視,以及中繼存放區中名為 __apply_changes_storage_dlt_cdc_target 的數據表。 建立檢視可讓 Delta Live Tables 篩選出處理順序錯亂數據所需的額外資訊(例如,墓碑和版本)。 若要檢視已處理的數據,請查詢目標檢視。 因為 __apply_changes_storage_ 數據表的架構可能會變更以支持未來的功能或增強功能,因此您不應該查詢數據表以供生產環境使用。 如果您手動將數據新增至表格,則會假設這些記錄項目發生在其他變更之前,因為缺少版本欄位。