Práce s historií table Delta Lake
Každá operace, která upraví Delta Lake table, vytvoří novou verzi table. Můžete použít informace o historii k auditování operací, vrácení tablezpět nebo k dotazování table v konkrétním časovém okamžiku s využitím možnosti cestování v čase.
Poznámka:
Databricks nedoporučuje používat historii table Delta Lake jako dlouhodobé řešení zálohování pro archivaci dat. Databricks doporučuje používat pouze posledních 7 dnů pro operace časového cestování, pokud nemáte set konfigurace uchovávání dat i protokolů na větší hodnotu.
Načtěte historii Delta table
Spuštěním příkazu history
můžete načíst informace, včetně operace, uživatele a časového razítka každého zápisu do Delta table. Operace se vrátí v obráceném chronologickém pořadí.
Table uchovávání historie je určeno nastavením tabledelta.logRetentionDuration
, což je ve výchozím nastavení 30 dnů.
Poznámka:
Časová cesta a historie table se řídí různými prahovými hodnotami uchovávání informací. Přečtěte si: Co je pohyb v čase ve službě Delta Lake?.
DESCRIBE HISTORY table_name -- get the full history of the table
DESCRIBE HISTORY table_name LIMIT 1 -- get the last operation only
Podrobnosti o syntaxi Spark SQL najdete v tématu DESCRIBE HISTORY.
Podrobnosti o syntaxi jazyka Scala/Java/Python najdete v dokumentaci k rozhraní Delta Lake API.
Catalog Explorer poskytuje vizuální zobrazení těchto podrobných table informací a historie pro Delta tables. Kromě tableschema a ukázkových dat můžete kliknutím na kartu Historie zobrazit historii table, která se zobrazuje s DESCRIBE HISTORY
.
schema historie
Výstup operace history
má následující columns.
Column | Type | Popis |
---|---|---|
version | long | Verze Table vygenerovaná operací. |
časové razítko | časové razítko | Při potvrzení této verze. |
userId | string | ID uživatele, který operaci spustil. |
userName | string | Jméno uživatele, který operaci spustil. |
operation | string | Název operace. |
operationParameters | map | Parameters operace (například predikáty). |
úloha | struct | Podrobnosti o úloze, která operaci spustila. |
poznámkový blok | struct | Podrobnosti o poznámkovém bloku, ze kterého byla operace spuštěna. |
clusterId | string | ID clusteru, na kterém byla operace spuštěna. |
readVersion | long | Verze table, která byla načtena pro provedení operace zápisu. |
isolationLevel | string | Úroveň izolace použitá pro tuto operaci. |
isBlindAppend | boolean | Určuje, jestli tato operace připojila data. |
operationMetrics | map | Metriky operace (například počet řádků a souborů upravených).) |
userMetadata | string | Uživatelsky definovaná metadata potvrzení, pokud byla zadána |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
Poznámka:
- Pokud zapisujete do Delta table pomocí následujících metod, není k dispozici několik dalších columns:
- Columns bude v budoucnu vždy přidáno po posledním column.
Klíče metrik operací
Operace history
vrátí kolekci metrik operací v mapě operationMetrics
column.
Následující tableslist definice klíčů mapování podle operace.
Operace | Název metriky | Popis |
---|---|---|
PIŠ, CREATE TABLE JAKO SELECT, NAHRAĎ TABLE JAKO SELECT, COPY INTO | ||
numFiles | Počet zapsaných souborů | |
numOutputBytes | Velikost v bajtech psaného obsahu | |
numOutputRows | Počet zapsaných řádků | |
STREAMOVÁNÍ UPDATE | ||
numAddedFiles | Počet přidaných souborů | |
numRemovedFiles | Počet odebraných souborů | |
numOutputRows | Počet zapsaných řádků | |
numOutputBytes | Velikost zápisu v bajtech | |
DELETE | ||
numAddedFiles | Počet přidaných souborů Není poskytnuto při odstranění oddílů table. | |
numRemovedFiles | Počet odebraných souborů | |
numDeletedRows | Počet odebraných řádků Není poskytováno při odstranění oddílů table. | |
numCopiedRows | Počet řádků zkopírovaných v procesu odstraňování souborů | |
executionTimeMs | Doba potřebná ke spuštění celé operace. | |
scanTimeMs | Doba potřebná ke kontrole shodných souborů | |
přepsáníTimeMs | Doba potřebná k přepsání odpovídajících souborů | |
ZKRÁTIT | ||
numRemovedFiles | Počet odebraných souborů | |
executionTimeMs | Doba potřebná ke spuštění celé operace. | |
SLOUČIT | ||
numSourceRows | Počet řádků ve zdrojovém datovém rámci | |
numTargetRowsInserted | Počet řádků vložených do cílového table. | |
numTargetRowsUpdated | Počet řádků aktualizovaných v cílovém table. | |
numTargetRowsDeleted | Počet řádků odstraněných v cílovém table. | |
numTargetRowsCopied | Počet zkopírovaných cílových řádků | |
numOutputRows | Celkový počet odpsaných řádků | |
numTargetFilesAdded | Počet souborů přidaných do jímky (cíle). | |
numTargetFilesRemoved | Počet souborů odebraných z jímky (cíle). | |
executionTimeMs | Doba potřebná ke spuštění celé operace. | |
scanTimeMs | Doba potřebná ke kontrole shodných souborů | |
přepsáníTimeMs | Doba potřebná k přepsání odpovídajících souborů | |
UPDATE | ||
numAddedFiles | Počet přidaných souborů | |
numRemovedFiles | Počet odebraných souborů | |
numUpdatedRows | Počet aktualizovaných řádků | |
numCopiedRows | Početřádkůchch | |
executionTimeMs | Doba potřebná ke spuštění celé operace. | |
scanTimeMs | Doba potřebná ke kontrole shodných souborů | |
přepsáníTimeMs | Doba potřebná k přepsání odpovídajících souborů | |
FSCK | numRemovedFiles | Počet odebraných souborů |
PŘEMĚNIT | numConvertedFiles | Počet převedených souborů Parquet. |
OPTIMIZE | ||
numAddedFiles | Počet přidaných souborů | |
numRemovedFiles | Počet optimalizovaných souborů | |
numAddedBytes | Počet bajtů přidaných po optimalizaci table | |
numRemovedBytes | Počet odebraných bajtů | |
minFileSize | Velikost nejmenšího souboru po optimalizaci table. | |
p25FileSize | Velikost souboru 25. percentilu po optimalizaci table. | |
p50FileSize | Medián velikosti souboru po optimalizaci table | |
p75FileSize | Velikost souboru v 75. percentilu po optimalizaci table | |
maxFileSize | Velikost největšího souboru po optimalizaci table | |
CLONE | ||
sourceTableSize | Velikost v bajtech pro zdroj table v klonované verzi. | |
sourceNumOfFiles | Počet souborů ve zdrojovém table ve verzi, která je naklonovaná. | |
numRemovedFiles | Počet souborů odstraněných z cílového table, pokud byla nahrazena předchozí Delta table. | |
removedFilesSize | Celková velikost souborů v bajtech odebraných z cíle table, pokud byla nahrazena předchozí delta table. | |
numCopiedFiles | Počet souborů, které byly zkopírovány do nového umístění 0 pro mělké klony. | |
copiedFilesSize | Celková velikost v bajtech souborů, které byly zkopírovány do nového umístění. 0 pro mělké klony. | |
RESTORE | ||
tableSizeAfterRestore | Table velikost v bajtech po restore. | |
numOfFilesAfterRestore | Počet souborů v table po restore. | |
numRemovedFiles | Počet souborů odebraných operací restore | |
numRestoredFiles | Počet souborů, které byly přidány jako výsledek restore. | |
removedFilesSize | Velikost souborů odstraněných restorev bajtech. | |
restoredFilesSize | Velikost souborů v bajtech přidaných pomocí restore. | |
VACUUM | ||
numDeletedFiles | Počet odstraněných souborů | |
numVacuumedDirectories | Počet vakuových adresářů. | |
numFilesToDelete | Počet souborů, které chcete odstranit |
Co je časová cesta Delta Lake?
Delta Lake v režimu časové cesty podporuje dotazování na předchozí verze table na základě časového razítka nebo verze table, jak je zaznamenáno v transakčním protokolu. Pro aplikace, jako jsou například tyto, můžete použít časová cesta:
- Opětovné vytváření analýz, sestav nebo výstupů (například výstup modelu strojového učení). To může být užitečné pro ladění nebo auditování, zejména v regulovaných odvětvích.
- Psaní složitých dočasných dotazů
- Oprava chyb v datech
- Poskytuje izolaci snímků pro set dotazů pro rychlé změny tables.
Důležité
Table verze přístupné s časovým cestováním jsou určeny kombinací prahové hodnoty uchovávání pro soubory transakčního protokolu a četností a zadaným uchováváním pro operace VACUUM
. Pokud spouštíte VACUUM
denně s výchozími values, 7 dní dat je k dispozici pro časovou cestu.
Syntaxe pohybu v čase Delta
Provádíte dotaz na Delta table s časovým cestováním po zadání klauzule za specifikaci názvu table.
-
timestamp_expression
může být libovolná z těchto možností:-
'2018-10-18T22:15:12.013Z'
to znamená řetězec, který lze přetypovat na časové razítko. cast('2018-10-18 13:36:32 CEST' as timestamp)
-
'2018-10-18'
to znamená řetězec kalendářního data. current_timestamp() - interval 12 hours
date_sub(current_date(), 1)
- Jakýkoli jiný výraz, který je nebo lze přetypovat na časové razítko
-
-
version
je dlouhá hodnota, kterou lze získat z výstupuDESCRIBE HISTORY table_spec
.
timestamp_expression
Ani version
nemůže být poddotaz.
Akceptují se pouze řetězce data nebo časového razítka. Příklad: "2019-01-01"
a "2019-01-01T00:00:00.000Z"
. Příklad syntaxe najdete v následujícím kódu:
SQL
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;
Python
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")
Pomocí syntaxe @
můžete také zadat časové razítko nebo verzi jako součást názvu table. Časové razítko musí být ve yyyyMMddHHmmssSSS
formátu. Verzi @
můžete zadat tak, že ji předejdete v
. Příklad syntaxe najdete v následujícím kódu:
SQL
SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123
Python
spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")
Co jsou kontrolní body transakčního protokolu?
Delta Lake zaznamenává table verze jako soubory JSON v adresáři _delta_log
, který se ukládá společně s table daty. Pro dotazování na kontrolní body optimize agreguje Delta Lake verze table do souborů kontrolních bodů Parquet, čímž se zabrání nutnosti číst všechny JSON verze historie table. Azure Databricks optimalizuje frekvenci vytváření kontrolních bodů pro velikost dat a úlohu. Uživatelé by neměli pracovat s kontrolními body přímo. Četnost kontrolních bodů se může bez předchozího upozornění změnit.
Konfigurace uchovávání dat pro dotazy na časové cesty
Pokud chcete dotazovat předchozí table verzi, musíte zachovat jak protokolu, tak datové soubory pro tuto verzi.
Datové soubory se odstraní, když VACUUM
poběží proti table. Delta Lake automaticky spravuje odstraňování protokolových souborů po kontrolním bodu verze table.
Vzhledem k tomu, že většina Delta tables má pravidelně spuštěný VACUUM
, měly by dotazy k danému okamžiku respektovat prahovou hodnotu uchovávání informací pro VACUUM
, což je ve výchozím nastavení 7 dnů.
Pokud chcete zvýšit prahovou hodnotu uchovávání dat pro delta tables, musíte nakonfigurovat následující vlastnosti table:
-
delta.logRetentionDuration = "interval <interval>"
: určuje, jak dlouho se historie table uchovává. Výchozí hodnota jeinterval 30 days
. -
delta.deletedFileRetentionDuration = "interval <interval>"
: určuje prahovou hodnotu, kterouVACUUM
používá k remove datové soubory, na které se už v aktuální verzi table neodkazuje. Výchozí hodnota jeinterval 7 days
.
Během vytváření table můžete zadat vlastnosti Delta nebo je set příkazem ALTER TABLE
. Viz referenčnívlastnosti
Poznámka:
Musíte provést set obě tyto vlastnosti, aby se zajistilo, že se historie table zachovává na delší dobu v rámci tables s častými operacemi VACUUM
. Pokud například chcete získat přístup k historickým datům za 30 dnů, setdelta.deletedFileRetentionDuration = "interval 30 days"
(který odpovídá výchozímu nastavení pro delta.logRetentionDuration
).
Zvýšení prahové hodnoty uchovávání dat může způsobit zvýšení nákladů na úložiště, protože se udržuje více datových souborů.
Restore a Delta table do dřívějšího stavu
Pomocí příkazu RESTORE
můžete vrátit Delta s restoretable do jeho dřívějšího stavu. Delta table interně udržuje historické verze table, které umožňují jeho obnovení do dřívějšího stavu.
Verze odpovídající dřívějšímu stavu nebo časovému razítku toho, když došlo k vytvoření dřívějšího stavu, se příkazem RESTORE
podporují jako možnosti.
Důležité
- Můžete restore již obnovenou table.
- Můžete restoreklonovanétable.
- K obnovení table musíte mít oprávnění
MODIFY
. - Nemůžete restoretable starší verzi, where datové soubory byly odstraněny ručně nebo
vacuum
. Částečné obnovení na tuto verzi je stále možné, pokud jespark.sql.files.ignoreMissingFiles
od set dotrue
. - Formát časového razítka pro obnovení do dřívějšího stavu je
yyyy-MM-dd HH:mm:ss
. Je také podporováno poskytnutí řetězce date(yyyy-MM-dd
).
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;
Podrobnosti o syntaxi najdete v tématu RESTORE.
Důležité
Restore se považuje za operaci změny dat. Položky protokolu Delta Lake přidané příkazem RESTORE
obsahují změny dat , kde jeset nastaveno na true. Pokud existuje podřízená aplikace, například úloha strukturovaného streamování, která zpracovává aktualizace tableDelta Lake , položky protokolu změn dat přidané operací restore se považují za nové aktualizace dat a jejich zpracování může vést k duplicitním datům.
Příklad:
verze Table | Operace | Rozdílové aktualizace protokolů | Záznamy v aktualizacích protokolu změn dat |
---|---|---|---|
0 | INSERT | AddFile(/path/to/file-1, dataChange = true) | (name = Viktor, age = 29, (name = George, age = 55) |
0 | INSERT | AddFile(/path/to/file-2; dataChange = true) | (name = George, age = 39) |
2 | OPTIMIZE | AddFile(/path/to/file-3; dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) | (Žádné záznamy, protože komprimace Optimize nemění data v table) |
3 | RESTORE(verze=1) | RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) | (name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39) |
V předchozím příkladu příkaz RESTORE
vede k aktualizacím, které už byly zachyceny při čtení Delta verze table, verzí 0 a 1. Pokud streamovací dotaz četl tento table, budou tyto soubory považovány za nově přidaná data a budou znovu zpracovány.
metriky Restore
RESTORE
Po dokončení operace sestaví následující metriky jako datový rámec s jedním řádkem:
table_size_after_restore
: Velikost table po obnovení.num_of_files_after_restore
: Počet souborů v table po obnovení.num_removed_files
: Počet odebraných souborů (logicky odstraněných) z table.num_restored_files
: Počet obnovených souborů kvůli vrácení zpětremoved_files_size
: Celková velikost v bajtech souborů odebraných z table.restored_files_size
: Celková velikost v bajtech obnovených souborů.příklad metrik
Příklady použití pohybu v čase Delta Lake
Oprava náhodného odstranění table pro uživatele
111
:INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111
Oprava náhodných nesprávných aktualizací na položku table:
MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *
Zadejte dotaz na počet nových zákazníků přidaných za poslední týden.
SELECT count(distinct userId) FROM my_table - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
Návody najít verzi posledního potvrzení v relaci Sparku?
Chcete-li get číslo verze posledního commitu napsaného aktuálním SparkSession
ve všech vláknech a všech tables, proveďte dotaz na konfiguraci SQL spark.databricks.delta.lastCommitVersionInSession
.
SQL
SET spark.databricks.delta.lastCommitVersionInSession
Python
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Scala
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Pokud nebyly provedeny SparkSession
žádné potvrzení , dotazování na klíč vrátí prázdnou hodnotu.
Poznámka:
Pokud sdílíte stejnou hodnotu SparkSession
ve více vláknech, je to podobné sdílení proměnné napříč více vlákny. Pokud se hodnota konfigurace aktualizuje současně, můžete narazit na podmínky časování.