使用 Delta Lake 數據表歷程記錄
修改 Delta Lake 數據表的每個作業都會建立新的數據表版本。 您可以使用歷程記錄資訊來稽核作業、復原數據表,或使用時間移動在特定時間點查詢數據表。
注意
Databricks 不建議使用 Delta Lake 資料表歷程記錄作為資料封存的長期備份解決方案。 除非您已將資料和記錄保留設定為較大的值,否則 Databricks 建議只使用過去 7 天進行時間移動作業。
擷取差異數據表歷程記錄
您可以執行 history
命令,以擷取資訊,包括每個寫入 Delta 資料表的作業、使用者和時間戳。 作業會以反向時間順序傳回。
數據表記錄保留是由數據表設定 delta.logRetentionDuration
所決定,預設為30天。
注意
時間移動和資料表歷程記錄是由不同的保留期閾值所控制。 請參閱什麼是 Delta Lake 時間移動?。
DESCRIBE HISTORY table_name -- get the full history of the table
DESCRIBE HISTORY table_name LIMIT 1 -- get the last operation only
如需Spark SQL語法詳細數據,請參閱 DESCRIBE HISTORY。
如需 Scala/Java/Python 語法詳細數據,請參閱 Delta Lake API 檔。
目錄總 管提供這個詳細數據表資訊和 Delta 數據表記錄的可視化檢視。 除了數據表架構和範例數據之外,您也可以按兩下 [歷程記錄 ] 索引標籤來查看顯示資料表 DESCRIBE HISTORY
記錄。
歷程記錄架構
作業的 history
輸出具有下列數據行。
資料行 | 類型 | 描述 |
---|---|---|
version | long | 作業所產生的數據表版本。 |
timestamp | timestamp | 認可此版本時。 |
userId | 字串 | 執行作業的使用者識別碼。 |
userName | 字串 | 執行作業的用戶名稱。 |
作業 | 字串 | 作業名稱。 |
operationParameters | map | 工作的參數(例如述詞。) |
job | struct | 執行作業之作業的詳細數據。 |
筆記本 | struct | 執行作業的筆記本詳細數據。 |
clusterId | 字串 | 作業執行所在的叢集標識碼。 |
readVersion | long | 讀取以執行寫入作業的數據表版本。 |
isolationLevel | 字串 | 用於這項作業的隔離等級。 |
isBlindAppend | boolean | 此作業是否附加數據。 |
operationMetrics | map | 作業的計量(例如,修改的數據列和檔案數目。) |
userMetadata | 字串 | 若已指定使用者定義認可元數據,則為 |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
注意
- 如果您使用下列方法寫入 Delta 資料表,則無法使用其他幾個資料列:
- 未來新增的資料行一律會新增在最後一個資料行之後。
作業計量索引鍵
作業會 history
傳回數據行對應中的 operationMetrics
作業計量集合。
下表列出依作業的對應索引鍵定義。
作業 | 度量名稱 | 描述 |
---|---|---|
WRITE、CREATE TABLE AS SELECT、REPLACE TABLE AS SELECT、COPY INTO | ||
numFiles | 寫入的檔案數目。 | |
numOutputBytes | 寫入內容的位元組大小。 | |
numOutputRows | 寫入的數據列數目。 | |
STREAMING UPDATE | ||
numAddedFiles | 新增的檔案數目。 | |
numRemovedFiles | 已移除的檔案數目。 | |
numOutputRows | 寫入的數據列數目。 | |
numOutputBytes | 以位元組為單位寫入的大小。 | |
DELETE | ||
numAddedFiles | 新增的檔案數目。 刪除資料表的資料分割時, 不會提供 。 | |
numRemovedFiles | 已移除的檔案數目。 | |
numDeletedRows | 已移除的數據列數目。 刪除資料表的資料分割時, 不會提供 。 | |
numCopiedRows | 在刪除檔案的過程中複製的數據列數目。 | |
executionTimeMs | 執行整個作業所花費的時間。 | |
scanTimeMs | 掃描檔案是否有相符專案所花費的時間。 | |
rewriteTimeMs | 重寫相符檔案所花費的時間。 | |
TRUNCATE | ||
numRemovedFiles | 已移除的檔案數目。 | |
executionTimeMs | 執行整個作業所花費的時間。 | |
MERGE | ||
numSourceRows | 源數據框架中的數據列數目。 | |
numTargetRowsInserted | 插入目標數據表的數據列數目。 | |
numTargetRowsUpdated | 目標數據表中更新的數據列數目。 | |
numTargetRowsDeleted | 目標數據表中刪除的數據列數目。 | |
numTargetRowsCopied | 複製的目標數據列數目。 | |
numOutputRows | 寫出的數據列總數。 | |
numTargetFilesAdded | 新增至接收(target) 的檔案數目。 | |
numTargetFilesRemoved | 已從 sink(target) 移除的檔案數目。 | |
executionTimeMs | 執行整個作業所花費的時間。 | |
scanTimeMs | 掃描檔案是否有相符專案所花費的時間。 | |
rewriteTimeMs | 重寫相符檔案所花費的時間。 | |
UPDATE | ||
numAddedFiles | 新增的檔案數目。 | |
numRemovedFiles | 已移除的檔案數目。 | |
numUpdatedRows | 更新的數據列數目。 | |
numCopiedRows | 在更新檔案的過程中,剛複製的數據列數目。 | |
executionTimeMs | 執行整個作業所花費的時間。 | |
scanTimeMs | 掃描檔案是否有相符專案所花費的時間。 | |
rewriteTimeMs | 重寫相符檔案所花費的時間。 | |
FSCK | numRemovedFiles | 已移除的檔案數目。 |
CONVERT | numConvertedFiles | 已轉換的 Parquet 檔案數目。 |
最佳化 | ||
numAddedFiles | 新增的檔案數目。 | |
numRemovedFiles | 優化檔案數目。 | |
numAddedBytes | 優化數據表之後新增的位元元組數目。 | |
numRemovedBytes | 已移除的位元元組數目。 | |
minFileSize | 優化數據表之後最小檔案的大小。 | |
p25FileSize | 優化數據表之後的第25個百分位數檔案大小。 | |
p50FileSize | 數據表優化后的檔案大小中位數。 | |
p75FileSize | 優化數據表之後的第75個百分位數檔案大小。 | |
maxFileSize | 優化數據表之後的最大檔案大小。 | |
克隆 | ||
sourceTableSize | 複製版本之源數據表的位元組大小。 | |
sourceNumOfFiles | 複製版本之源數據表中的檔案數目。 | |
numRemovedFiles | 如果取代了先前的 Delta 數據表,則從目標數據表移除的檔案數目。 | |
removedFilesSize | 如果取代了先前的 Delta 數據表,則從目標數據表移除的檔案大小總計,以位元組為單位。 | |
numCopiedFiles | 複製到新位置的檔案數目。 0 表示淺層複製。 | |
copiedFilesSize | 複製到新位置的檔案大小總計,以位元組為單位。 0 表示淺層複製。 | |
RESTORE | ||
tableSizeAfterRestore | 還原後以位元組為單位的數據表大小。 | |
numOfFilesAfterRestore | 還原之後數據表中的檔案數目。 | |
numRemovedFiles | 還原作業移除的檔案數目。 | |
numRestoredFiles | 還原所新增的檔案數目。 | |
removedFilesSize | 還原移除的檔案位元組大小。 | |
restoredFilesSize | 還原所新增檔案的位元組大小。 | |
真空 | ||
numDeletedFiles | 已刪除的檔案數目。 | |
numVacuumedDirectories | 清理目錄的數目。 | |
numFilesToDelete | 要刪除的檔案數目。 |
什麼是 Delta Lake 時間旅行?
Delta Lake time travel 支援根據時間戳或數據表版本查詢先前的數據表版本(如事務歷史記錄中所記錄)。 您可以針對應用程式使用時間移動,例如:
- 重新建立分析、報告或輸出 (例如,機器學習模型的輸出)。 這可能適用於偵錯或稽核,特別是在受管制的產業中。
- 撰寫複雜的時態性查詢。
- 修正資料中的錯誤。
- 為快速變更資料表的一組查詢提供快照集隔離。
重要
可透過時間移動存取的數據表版本是由事務歷史記錄檔的保留臨界值以及作業的頻率和指定的保留 VACUUM
時間所決定。 如果您使用預設值每天執行 VACUUM
,7 天的數據可供時間移動。
Delta 時間移動語法
您可以藉由在數據表名稱規格後面新增 子句,以查詢具有時間移動的 Delta 資料表。
timestamp_expression
可以是下列任一項:'2018-10-18T22:15:12.013Z'
,也就是可以轉換成時間戳的字串cast('2018-10-18 13:36:32 CEST' as timestamp)
'2018-10-18'
,也就是日期字串current_timestamp() - interval 12 hours
date_sub(current_date(), 1)
- 任何其他可轉換成時間戳的表達式
version
是一個長值,可從的DESCRIBE HISTORY table_spec
輸出取得。
兩version
者timestamp_expression
都不能是子查詢。
只接受日期或時間戳字串。 例如,"2019-01-01"
與 "2019-01-01T00:00:00.000Z"
。 如需範例語法,請參閱下列程序代碼:
SQL
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;
Python
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")
您也可以使用 @
語法來指定時間戳或版本做為資料表名稱的一部分。 時間戳的格式必須為 yyyyMMddHHmmssSSS
。 您可以在 之後 @
指定版本,方法是在 版本前面加上 v
。 如需範例語法,請參閱下列程序代碼:
SQL
SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123
Python
spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")
什麼是事務歷史記錄檢查點?
Delta Lake 會將數據表版本記錄為目錄內的 _delta_log
JSON 檔案,其會與數據表數據一起儲存。 為了優化檢查點查詢,Delta Lake 會將數據表版本匯總至 Parquet 檢查點檔案,以避免讀取數據表歷程記錄的所有 JSON 版本。 Azure Databricks 會將數據大小和工作負載的檢查點頻率優化。 使用者不需要直接與檢查點互動。 檢查點頻率可能會變更而不通知。
設定時間旅行查詢的數據保留
若要查詢先前的數據表版本,您必須保留該版本的記錄檔和數據檔。
針對資料表執行時 VACUUM
,會刪除資料檔。 Delta Lake 會在檢查點數據表版本之後自動管理記錄檔移除。
由於大部分的差異數據表會 VACUUM
定期針對它們執行,因此時間點查詢應該遵守的保留閾值 VACUUM
,預設為 7 天。
若要增加 Delta 資料表的數據保留閾值,您必須設定下表屬性:
delta.logRetentionDuration = "interval <interval>"
:控制保留數據表記錄的時間長度。 預設值為interval 30 days
。delta.deletedFileRetentionDuration = "interval <interval>"
:決定臨界值VACUUM
用來移除目前數據表版本中不再參考的數據檔。 預設值為interval 7 days
。
您可以在資料表建立期間指定 Delta 屬性,或使用 語句加以設定 ALTER TABLE
。 請參閱差異資料表屬性參考。
注意
您必須設定這兩個屬性,以確保數據表記錄會保留較長的持續時間,以供經常作業的 VACUUM
數據表使用。 例如,若要存取 30 天的歷史數據,請設定 delta.deletedFileRetentionDuration = "interval 30 days"
(這符合的預設 delta.logRetentionDuration
設定)。
增加數據保留閾值可能會導致記憶體成本增加,因為會維護更多數據檔。
將 Delta 數據表還原至先前的狀態
您可以使用 命令,將 Delta 數據表還原到其先前的狀態 RESTORE
。 Delta 資料表會在內部維護資料表的歷史版本,讓資料表還原至先前的狀態。
與早期狀態相對應的版本,或建立早期狀態的時間戳記,會作為選項由 RESTORE
命令支援。
重要
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;
如需語法詳細數據,請參閱 RESTORE。
重要
還原會被視為數據變更作業。 命令新增的 RESTORE
Delta Lake 記錄專案包含 dataChange 設定為 true。 如果有下游應用程式,例如 處理 Delta Lake 資料表更新的結構化串流 作業,還原作業所新增的數據變更記錄專案會被視為新的資料更新,而且處理它們可能會導致重複的數據。
例如:
數據表版本 | 作業 | 差異記錄更新 | 數據變更記錄更新中的記錄 |
---|---|---|---|
0 | INSERT | AddFile(/path/to/file-1, dataChange = true) | (name = Viktor, age = 29, (name = George, age = 55) |
1 | INSERT | AddFile(/path/to/file-2, dataChange = true) | (name = George, age = 39) |
2 | 最佳化 | AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) | (沒有記錄,因為優化壓縮不會變更數據表中的數據) |
3 | RESTORE(version=1) | RemoveFile(/path/to/file-3)、AddFile(/path/to/file-1、dataChange = true)、AddFile(/path/to/file-2、dataChange = true) | (name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39) |
在上述範例中 RESTORE
,命令會導致讀取 Delta 數據表第 0 版和 1 時已看到更新。 如果串流查詢正在讀取此數據表,則這些檔案會被視為新加入的數據,並會再次處理。
還原計量
RESTORE
在作業完成之後,會將下列計量報告為單一數據列 DataFrame:
table_size_after_restore
:還原之後數據表的大小。num_of_files_after_restore
:還原之後數據表中的檔案數目。num_removed_files
:已從資料表中移除的檔案數目(邏輯刪除)。num_restored_files
:由於復原而還原的檔案數目。removed_files_size
:從數據表中移除之檔案的大小總計,以位元組為單位。restored_files_size
:還原之檔案的大小總計,以位元組為單位。
使用 Delta Lake 時間移動的範例
修正使用者
111
意外刪除資料表:INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111
修正資料表意外不正確的更新:
MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *
查詢上周新增的新客戶數目。
SELECT count(distinct userId) FROM my_table - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
如何? 在Spark工作階段中尋找最後一個認可的版本嗎?
若要取得目前 SparkSession
在所有線程和所有資料表中寫入的最後一個認可版本號碼,請查詢 SQL 組態 spark.databricks.delta.lastCommitVersionInSession
。
SQL
SET spark.databricks.delta.lastCommitVersionInSession
Python
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Scala
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
如果 尚未進行 SparkSession
任何認可,則查詢索引鍵會傳回空值。
注意
如果您在多個線程之間共用相同 SparkSession
,它類似於在多個線程之間共用變數;您可能會在組態值同時更新時達到競爭條件。