共用方式為


在 Azure Databricks 上使用 Delta Lake 變更數據摘要

變更數據摘要可讓 Azure Databricks 追蹤差異數據表版本之間的數據列層級變更。 在 Delta 資料表上啟用時,執行時間會記錄 寫入數據表中所有資料的變更事件 。 這包括數據列數據以及元數據,指出指定的數據列是否已插入、刪除或更新。

重要

變更數據摘要會與數據表歷程記錄一起運作,以提供變更資訊。 因為複製 Delta 數據表會建立個別的歷程記錄,因此複製數據表上的變更數據摘要與原始數據表的記錄不符。

累加處理變更數據

Databricks 建議搭配結構化串流使用變更數據摘要,以累加方式處理差異數據表的變更。 您必須使用 Azure Databricks 的結構化串流來自動追蹤數據表變更數據摘要的版本。

注意

Delta Live Tables 提供功能,可讓您輕鬆地傳播變更數據,並將結果儲存為 SCD(緩時變維度)類型 1 或類型 2 數據表。 請參閱套用變更 API:使用差異即時資料表簡化異動資料擷取

若要從資料表讀取變更數據摘要,您必須在該數據表上啟用變更數據摘要。 請參閱啟用變更資料摘要

針對數據表設定數據流以讀取變更數據摘要時,將選項readChangeFeedtrue設定為 ,如下列語法範例所示:

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

根據預設,當數據流第一次啟動時 INSERT ,數據流會傳回數據表的最新快照集,而未來會隨著變更數據而變更。

變更數據認可做為 Delta Lake 交易的一部分,並會在數據表的新數據認可同時變成可用。

您可以選擇性地指定起始版本。 請參閱 我是否應該指定起始版本?

變更數據摘要也支援批次執行,這需要指定起始版本。 請參閱 讀取批次查詢中的變更。

讀取變更數據時也支援速率限制等maxFilesPerTriggermaxBytesPerTriggerexcludeRegex選項。

啟動快照集版本以外的版本,速率限制可以是不可部分完成的。 也就是說,整個認可版本會受到速率限制,否則會傳回整個認可。

我應該指定起始版本嗎?

如果您想要忽略特定版本之前發生的變更,您可以選擇性地指定起始版本。 您可以使用時間戳或差異事務歷史記錄中記錄的版本識別碼來指定版本。

注意

批次讀取需要起始版本,而且許多批次模式可以受益於設定選擇性的結束版本。

當您設定涉及變更數據摘要的結構化串流工作負載時,請務必瞭解如何指定起始版本會影響處理。

許多串流工作負載,尤其是新的數據處理管線,都受益於默認行為。 使用預設行為時,當數據流第一次記錄數據表中的所有現有記錄做為 INSERT 變更數據摘要中的作業時,就會處理第一個批次。

如果您的目標數據表已包含所有具有適當變更的記錄,請指定起始版本以避免將源數據表狀態當做 INSERT 事件處理。

下列範例語法會從損毀檢查點的串流失敗中復原。 在此範例中,假設有下列條件:

  1. 在數據表建立時,源數據表上已啟用變更數據摘要。
  2. 目標下游數據表已處理所有變更,最多且包含版本 75。
  3. 源數據表的版本歷程記錄適用於版本70和更新版本。

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")

在此範例中,您也必須指定新的檢查點位置。

重要

如果您指定起始版本,如果數據表歷程記錄中不再存在起始版本,數據流將無法從新的檢查點啟動。 Delta Lake 會自動清除歷程記錄版本,這表示最終會刪除所有指定的起始版本。

請參閱 是否可以使用變更數據摘要來重新執行數據表的整個歷程記錄?

讀取批次查詢中的變更

您可以使用批次查詢語法來讀取從特定版本開始的所有變更,或讀取指定版本範圍內的變更。

您可以將版本指定為整數,並以 格式 yyyy-MM-dd[ HH:mm:ss[.SSS]]指定時間戳做為字串。

查詢中包含開始和結束版本。 若要讀取從特定起始版本到最新版數據表的變更,請只指定起始版本。

如果您提供的版本較低或時間戳早於記錄變更事件的版本,也就是啟用變更數據摘要時,就會擲回錯誤,指出變更數據摘要未啟用。

下列語法範例示範如何搭配批次讀取使用開始和結束版本選項:

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

Python

# version as ints or longs
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

Scala

// version as ints or longs
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

注意

根據預設,如果使用者傳入的版本或時間戳超過數據表上最後一個認可,就會擲回錯誤 timestampGreaterThanLatestCommit 。 在 Databricks Runtime 11.3 LTS 和更新版本中,如果使用者將下列 true設定設為 ,變更數據摘要可以處理超出範圍的版本案例:

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

如果您提供的開始版本大於數據表的最後一個認可,或比數據表上最後一次認可還新的開始時間戳,則啟用上述設定時,會傳回空的讀取結果。

如果您提供大於數據表上最後一次認可的結束版本,或比數據表上最後一次認可還新的結束時間戳,則在批次讀取模式中啟用上述設定時,會傳回開始版本與最後一個認可之間的所有變更。

變更數據摘要的架構為何?

當您從數據表的變更數據摘要讀取時,會使用最新數據表版本的架構。

注意

大部分的架構變更和演進作業都完全受到支援。 已啟用數據行對應的數據表不支援所有使用案例,並示範不同的行為。 請參閱 變更已啟用數據行對應之數據表的數據摘要限制。

除了 Delta 資料表架構中的數據行之外,變更數據摘要還包含可識別變更事件類型的元數據行:

資料行名稱 類型
_change_type String insert、 、 update_preimagedelete update_postimage(1)
_commit_version Long 包含變更的 Delta 記錄檔或數據表版本。
_commit_timestamp 時間戳記 建立認可時相關聯的時間戳。

(1) preimage 是更新前的值, postimage 是更新之後的值。

注意

如果架構包含與這些新增數據行名稱相同的數據行,則您無法在數據表上啟用變更數據摘要。 在嘗試啟用變更數據摘要之前,重新命名數據表中的數據行,以解決此衝突。

啟用變更數據摘要

您只能讀取已啟用資料表的變更資料摘要。 您必須使用下列其中一種方法,明確啟用變更資料摘要選項:

  • 新增資料表:在 命令中CREATE TABLE設定資料表屬性delta.enableChangeDataFeed = true

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • 現有資料表:在 命令中ALTER TABLE設定資料表屬性delta.enableChangeDataFeed = true

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • 所有新的資料表

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

重要

只會記錄啟用變更數據摘要之後所做的變更。 不會擷取數據表的過去變更。

變更數據記憶體

啟用變更數據摘要會導致數據表的記憶體成本小幅增加。 變更數據記錄會在查詢執行時產生,而且通常小於重寫檔案的大小總計。

Azure Databricks 會記錄資料表目錄下資料夾中的、 DELETEMERGE 作業_change_data變更資料UPDATE。 某些作業,例如僅插入作業和完整分割區刪除,不會在目錄中產生數據 _change_data ,因為 Azure Databricks 可以直接從事務歷史記錄中有效地計算變更數據摘要。

針對資料夾中資料檔 _change_data 的所有讀取都應該經過支援的 Delta Lake API。

資料夾中的 _change_data 檔案會遵循數據表的保留原則。 命令執行時 VACUUM 會刪除變更資料摘要數據。

我可以使用變更數據摘要來重新執行數據表的整個歷程記錄嗎?

變更數據摘要不是用來做為數據表所有變更的永久記錄。 變更數據摘要只會記錄啟用之後發生的變更。

變更數據摘要和 Delta Lake 可讓您一律重新建構源數據表的完整快照集,這表示您可以針對已啟用變更數據摘要的數據表啟動新的串流讀取,並擷取該數據表的目前版本,以及之後發生的所有變更。

您必須將變更數據摘要中的記錄視為暫時性的,而且只能供指定的保留時段存取。 Delta 事務歷史記錄會定期移除數據表版本及其對應的變更數據摘要版本。 從事務歷史記錄中移除版本時,您無法再讀取該版本的變更數據摘要。

如果您的使用案例需要維護數據表所有變更的永久歷程記錄,您應該使用累加邏輯,將記錄從變更數據摘要寫入新數據表。 下列程式代碼範例示範如何使用 trigger.AvailableNow,其會利用結構化串流的累加處理,但會以批次工作負載的形式處理可用的數據。 您可以使用主要處理管線以異步方式排程此工作負載,以建立變更數據摘要的備份,以進行稽核或完整重新執行。

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("target_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("target_table")

變更已啟用數據行對應之數據表的數據摘要限制

在 Delta 資料表上啟用資料行對應時,您可以卸載或重新命名數據表中的數據行,而不需重寫現有數據的數據檔。 啟用數據行對應之後,變更數據摘要在執行非加總架構變更之後有限制,例如重新命名或卸除數據行、變更數據類型或可 Null 性變更。

重要

  • 您無法讀取使用批次語意發生非加總架構變更的交易或範圍變更數據摘要。
  • 在 Databricks Runtime 12.2 LTS 和以下的數據表中,已啟用數據行對應且發生非加總架構變更的數據表不支援變更數據摘要上的串流讀取。 請參閱使用資料行對應和結構描述變更進行串流
  • 在 Databricks Runtime 11.3 LTS 中,您無法讀取已啟用數據行重新命名或卸除之數據行對應之數據表的變更數據摘要。

在 Databricks Runtime 12.2 LTS 和更新版本中,您可以針對已啟用數據行對應且發生非加總架構變更的數據表,對變更數據摘要執行批次讀取。 讀取作業會使用查詢中指定的數據表結束版本架構,而不是使用最新版數據表的架構。 如果指定的版本範圍跨越非加總架構變更,查詢仍會失敗。