Sdílet prostřednictvím


Práce s historií table Delta Lake

Každá operace, která upraví Delta Lake table, vytvoří novou verzi table. Můžete použít informace o historii k auditování operací, vrácení tablezpět nebo k dotazování table v konkrétním časovém okamžiku s využitím možnosti cestování v čase.

Poznámka:

Databricks nedoporučuje používat historii table Delta Lake jako dlouhodobé řešení zálohování pro archivaci dat. Databricks doporučuje používat pouze posledních 7 dnů pro operace časového cestování, pokud nemáte set konfigurace uchovávání dat i protokolů na větší hodnotu.

Načtěte historii Delta table

Spuštěním příkazu history můžete načíst informace, včetně operace, uživatele a časového razítka každého zápisu do Delta table. Operace se vrátí v obráceném chronologickém pořadí.

Table uchovávání historie je určeno nastavením tabledelta.logRetentionDuration, což je ve výchozím nastavení 30 dnů.

Poznámka:

Časová cesta a historie table se řídí různými prahovými hodnotami uchovávání informací. Přečtěte si: Co je pohyb v čase ve službě Delta Lake?.

DESCRIBE HISTORY table_name       -- get the full history of the table

DESCRIBE HISTORY table_name LIMIT 1  -- get the last operation only

Podrobnosti o syntaxi Spark SQL najdete v tématu DESCRIBE HISTORY.

Podrobnosti o syntaxi jazyka Scala/Java/Python najdete v dokumentaci k rozhraní Delta Lake API.

Catalog Explorer poskytuje vizuální zobrazení těchto podrobných table informací a historie pro Delta tables. Kromě tableschema a ukázkových dat můžete kliknutím na kartu Historie zobrazit historii table, která se zobrazuje s DESCRIBE HISTORY.

schema historie

Výstup operace history má následující columns.

Column Type Popis
version long Verze Table vygenerovaná operací.
časové razítko časové razítko Při potvrzení této verze.
userId string ID uživatele, který operaci spustil.
userName string Jméno uživatele, který operaci spustil.
operation string Název operace.
operationParameters map Parameters operace (například predikáty).
úloha struct Podrobnosti o úloze, která operaci spustila.
poznámkový blok struct Podrobnosti o poznámkovém bloku, ze kterého byla operace spuštěna.
clusterId string ID clusteru, na kterém byla operace spuštěna.
readVersion long Verze table, která byla načtena pro provedení operace zápisu.
isolationLevel string Úroveň izolace použitá pro tuto operaci.
isBlindAppend boolean Určuje, jestli tato operace připojila data.
operationMetrics map Metriky operace (například počet řádků a souborů upravených).)
userMetadata string Uživatelsky definovaná metadata potvrzení, pokud byla zadána
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+

Poznámka:

Klíče metrik operací

Operace history vrátí kolekci metrik operací v mapě operationMetricscolumn.

Následující tableslist definice klíčů mapování podle operace.

Operace Název metriky Popis
PIŠ, CREATE TABLE JAKO SELECT, NAHRAĎ TABLE JAKO SELECT, COPY INTO
numFiles Počet zapsaných souborů
numOutputBytes Velikost v bajtech psaného obsahu
numOutputRows Počet zapsaných řádků
STREAMOVÁNÍ UPDATE
numAddedFiles Počet přidaných souborů
numRemovedFiles Počet odebraných souborů
numOutputRows Počet zapsaných řádků
numOutputBytes Velikost zápisu v bajtech
DELETE
numAddedFiles Počet přidaných souborů Není poskytnuto při odstranění oddílů table.
numRemovedFiles Počet odebraných souborů
numDeletedRows Počet odebraných řádků Není poskytováno při odstranění oddílů table.
numCopiedRows Počet řádků zkopírovaných v procesu odstraňování souborů
executionTimeMs Doba potřebná ke spuštění celé operace.
scanTimeMs Doba potřebná ke kontrole shodných souborů
přepsáníTimeMs Doba potřebná k přepsání odpovídajících souborů
ZKRÁTIT
numRemovedFiles Počet odebraných souborů
executionTimeMs Doba potřebná ke spuštění celé operace.
SLOUČIT
numSourceRows Počet řádků ve zdrojovém datovém rámci
numTargetRowsInserted Počet řádků vložených do cílového table.
numTargetRowsUpdated Počet řádků aktualizovaných v cílovém table.
numTargetRowsDeleted Počet řádků odstraněných v cílovém table.
numTargetRowsCopied Počet zkopírovaných cílových řádků
numOutputRows Celkový počet odpsaných řádků
numTargetFilesAdded Počet souborů přidaných do jímky (cíle).
numTargetFilesRemoved Počet souborů odebraných z jímky (cíle).
executionTimeMs Doba potřebná ke spuštění celé operace.
scanTimeMs Doba potřebná ke kontrole shodných souborů
přepsáníTimeMs Doba potřebná k přepsání odpovídajících souborů
UPDATE
numAddedFiles Počet přidaných souborů
numRemovedFiles Počet odebraných souborů
numUpdatedRows Počet aktualizovaných řádků
numCopiedRows Početřádkůchch
executionTimeMs Doba potřebná ke spuštění celé operace.
scanTimeMs Doba potřebná ke kontrole shodných souborů
přepsáníTimeMs Doba potřebná k přepsání odpovídajících souborů
FSCK numRemovedFiles Počet odebraných souborů
PŘEMĚNIT numConvertedFiles Počet převedených souborů Parquet.
OPTIMIZE
numAddedFiles Počet přidaných souborů
numRemovedFiles Počet optimalizovaných souborů
numAddedBytes Počet bajtů přidaných po optimalizaci table
numRemovedBytes Počet odebraných bajtů
minFileSize Velikost nejmenšího souboru po optimalizaci table.
p25FileSize Velikost souboru 25. percentilu po optimalizaci table.
p50FileSize Medián velikosti souboru po optimalizaci table
p75FileSize Velikost souboru v 75. percentilu po optimalizaci table
maxFileSize Velikost největšího souboru po optimalizaci table
CLONE
sourceTableSize Velikost v bajtech pro zdroj table v klonované verzi.
sourceNumOfFiles Počet souborů ve zdrojovém table ve verzi, která je naklonovaná.
numRemovedFiles Počet souborů odstraněných z cílového table, pokud byla nahrazena předchozí Delta table.
removedFilesSize Celková velikost souborů v bajtech odebraných z cíle table, pokud byla nahrazena předchozí delta table.
numCopiedFiles Počet souborů, které byly zkopírovány do nového umístění 0 pro mělké klony.
copiedFilesSize Celková velikost v bajtech souborů, které byly zkopírovány do nového umístění. 0 pro mělké klony.
RESTORE
tableSizeAfterRestore Table velikost v bajtech po restore.
numOfFilesAfterRestore Počet souborů v table po restore.
numRemovedFiles Počet souborů odebraných operací restore
numRestoredFiles Počet souborů, které byly přidány jako výsledek restore.
removedFilesSize Velikost souborů odstraněných restorev bajtech.
restoredFilesSize Velikost souborů v bajtech přidaných pomocí restore.
VACUUM
numDeletedFiles Počet odstraněných souborů
numVacuumedDirectories Počet vakuových adresářů.
numFilesToDelete Počet souborů, které chcete odstranit

Co je časová cesta Delta Lake?

Delta Lake v režimu časové cesty podporuje dotazování na předchozí verze table na základě časového razítka nebo verze table, jak je zaznamenáno v transakčním protokolu. Pro aplikace, jako jsou například tyto, můžete použít časová cesta:

  • Opětovné vytváření analýz, sestav nebo výstupů (například výstup modelu strojového učení). To může být užitečné pro ladění nebo auditování, zejména v regulovaných odvětvích.
  • Psaní složitých dočasných dotazů
  • Oprava chyb v datech
  • Poskytuje izolaci snímků pro set dotazů pro rychlé změny tables.

Důležité

Table verze přístupné s časovým cestováním jsou určeny kombinací prahové hodnoty uchovávání pro soubory transakčního protokolu a četností a zadaným uchováváním pro operace VACUUM. Pokud spouštíte VACUUM denně s výchozími values, 7 dní dat je k dispozici pro časovou cestu.

Syntaxe pohybu v čase Delta

Provádíte dotaz na Delta table s časovým cestováním po zadání klauzule za specifikaci názvu table.

  • timestamp_expression může být libovolná z těchto možností:
    • '2018-10-18T22:15:12.013Z'to znamená řetězec, který lze přetypovat na časové razítko.
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18'to znamená řetězec kalendářního data.
    • current_timestamp() - interval 12 hours
    • date_sub(current_date(), 1)
    • Jakýkoli jiný výraz, který je nebo lze přetypovat na časové razítko
  • version je dlouhá hodnota, kterou lze získat z výstupu DESCRIBE HISTORY table_spec.

timestamp_expression Ani version nemůže být poddotaz.

Akceptují se pouze řetězce data nebo časového razítka. Příklad: "2019-01-01" a "2019-01-01T00:00:00.000Z". Příklad syntaxe najdete v následujícím kódu:

SQL

SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;

Python

df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")

Pomocí syntaxe @ můžete také zadat časové razítko nebo verzi jako součást názvu table. Časové razítko musí být ve yyyyMMddHHmmssSSS formátu. Verzi @ můžete zadat tak, že ji předejdete v . Příklad syntaxe najdete v následujícím kódu:

SQL

SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123

Python

spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")

Co jsou kontrolní body transakčního protokolu?

Delta Lake zaznamenává table verze jako soubory JSON v adresáři _delta_log, který se ukládá společně s table daty. Pro dotazování na kontrolní body optimize agreguje Delta Lake verze table do souborů kontrolních bodů Parquet, čímž se zabrání nutnosti číst všechny JSON verze historie table. Azure Databricks optimalizuje frekvenci vytváření kontrolních bodů pro velikost dat a úlohu. Uživatelé by neměli pracovat s kontrolními body přímo. Četnost kontrolních bodů se může bez předchozího upozornění změnit.

Konfigurace uchovávání dat pro dotazy na časové cesty

Pokud chcete dotazovat předchozí table verzi, musíte zachovat jak protokolu, tak datové soubory pro tuto verzi.

Datové soubory se odstraní, když VACUUM poběží proti table. Delta Lake automaticky spravuje odstraňování protokolových souborů po kontrolním bodu verze table.

Vzhledem k tomu, že většina Delta tables má pravidelně spuštěný VACUUM, měly by dotazy k danému okamžiku respektovat prahovou hodnotu uchovávání informací pro VACUUM, což je ve výchozím nastavení 7 dnů.

Pokud chcete zvýšit prahovou hodnotu uchovávání dat pro delta tables, musíte nakonfigurovat následující vlastnosti table:

  • delta.logRetentionDuration = "interval <interval>": určuje, jak dlouho se historie table uchovává. Výchozí hodnota je interval 30 days.
  • delta.deletedFileRetentionDuration = "interval <interval>": určuje prahovou hodnotu, kterou VACUUM používá k remove datové soubory, na které se už v aktuální verzi table neodkazuje. Výchozí hodnota je interval 7 days.

Během vytváření table můžete zadat vlastnosti Delta nebo je set příkazem ALTER TABLE. Viz referenčnívlastnosti Delta .

Poznámka:

Musíte provést set obě tyto vlastnosti, aby se zajistilo, že se historie table zachovává na delší dobu v rámci tables s častými operacemi VACUUM. Pokud například chcete získat přístup k historickým datům za 30 dnů, setdelta.deletedFileRetentionDuration = "interval 30 days" (který odpovídá výchozímu nastavení pro delta.logRetentionDuration).

Zvýšení prahové hodnoty uchovávání dat může způsobit zvýšení nákladů na úložiště, protože se udržuje více datových souborů.

Restore a Delta table do dřívějšího stavu

Pomocí příkazu RESTORE můžete vrátit Delta s restoretable do jeho dřívějšího stavu. Delta table interně udržuje historické verze table, které umožňují jeho obnovení do dřívějšího stavu. Verze odpovídající dřívějšímu stavu nebo časovému razítku toho, když došlo k vytvoření dřívějšího stavu, se příkazem RESTORE podporují jako možnosti.

Důležité

  • Můžete restore již obnovenou table.
  • Můžete restoreklonovanétable.
  • K obnovení table musíte mít oprávnění MODIFY.
  • Nemůžete restoretable starší verzi, where datové soubory byly odstraněny ručně nebo vacuum. Částečné obnovení na tuto verzi je stále možné, pokud je spark.sql.files.ignoreMissingFiles od set do true.
  • Formát časového razítka pro obnovení do dřívějšího stavu je yyyy-MM-dd HH:mm:ss. Je také podporováno poskytnutí řetězce date(yyyy-MM-dd).
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;

Podrobnosti o syntaxi najdete v tématu RESTORE.

Důležité

Restore se považuje za operaci změny dat. Položky protokolu Delta Lake přidané příkazem RESTORE obsahují změny dat , kde jeset nastaveno na true. Pokud existuje podřízená aplikace, například úloha strukturovaného streamování, která zpracovává aktualizace tableDelta Lake , položky protokolu změn dat přidané operací restore se považují za nové aktualizace dat a jejich zpracování může vést k duplicitním datům.

Příklad:

verze Table Operace Rozdílové aktualizace protokolů Záznamy v aktualizacích protokolu změn dat
0 INSERT AddFile(/path/to/file-1, dataChange = true) (name = Viktor, age = 29, (name = George, age = 55)
0 INSERT AddFile(/path/to/file-2; dataChange = true) (name = George, age = 39)
2 OPTIMIZE AddFile(/path/to/file-3; dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) (Žádné záznamy, protože komprimace Optimize nemění data v table)
3 RESTORE(verze=1) RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) (name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39)

V předchozím příkladu příkaz RESTORE vede k aktualizacím, které už byly zachyceny při čtení Delta verze table, verzí 0 a 1. Pokud streamovací dotaz četl tento table, budou tyto soubory považovány za nově přidaná data a budou znovu zpracovány.

metriky Restore

RESTORE Po dokončení operace sestaví následující metriky jako datový rámec s jedním řádkem:

  • table_size_after_restore: Velikost table po obnovení.

  • num_of_files_after_restore: Počet souborů v table po obnovení.

  • num_removed_files: Počet odebraných souborů (logicky odstraněných) z table.

  • num_restored_files: Počet obnovených souborů kvůli vrácení zpět

  • removed_files_size: Celková velikost v bajtech souborů odebraných z table.

  • restored_files_size: Celková velikost v bajtech obnovených souborů.

    příklad metrik Restore

Příklady použití pohybu v čase Delta Lake

  • Oprava náhodného odstranění table pro uživatele 111:

    INSERT INTO my_table
      SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
      WHERE userId = 111
    
  • Oprava náhodných nesprávných aktualizací na položku table:

    MERGE INTO my_table target
      USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
      ON source.userId = target.userId
      WHEN MATCHED THEN UPDATE SET *
    
  • Zadejte dotaz na počet nových zákazníků přidaných za poslední týden.

    SELECT count(distinct userId)
    FROM my_table  - (
      SELECT count(distinct userId)
      FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
    

Návody najít verzi posledního potvrzení v relaci Sparku?

Chcete-li get číslo verze posledního commitu napsaného aktuálním SparkSession ve všech vláknech a všech tables, proveďte dotaz na konfiguraci SQL spark.databricks.delta.lastCommitVersionInSession.

SQL

SET spark.databricks.delta.lastCommitVersionInSession

Python

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Scala

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Pokud nebyly provedeny SparkSessionžádné potvrzení , dotazování na klíč vrátí prázdnou hodnotu.

Poznámka:

Pokud sdílíte stejnou hodnotu SparkSession ve více vláknech, je to podobné sdílení proměnné napříč více vlákny. Pokud se hodnota konfigurace aktualizuje současně, můžete narazit na podmínky časování.