Dela via


Arbeta med Delta Lake-tabellhistorik

Varje åtgärd som ändrar en Delta Lake-tabell skapar en ny tabellversion. Du kan använda historikinformation för att granska åtgärder, återställa en tabell eller köra frågor mot en tabell vid en viss tidpunkt med hjälp av tidsresor.

Kommentar

I Databricks rekommenderas inte att du använder Delta Lake-tabellhistorik som långsiktig säkerhetskopieringslösning för dataarkivering. I Databricks rekommenderas att du bara använder de senaste 7 dagarna för tidsresor om du inte har angett ett större värde för kvarhållningsinställningar för både data och loggar.

Hämta historik för deltatabell

Du kan hämta information, inklusive åtgärder, användare och tidsstämpel för varje skrivning till en Delta-tabell genom att köra history kommandot . Åtgärderna returneras i omvänd kronologisk ordning.

Kvarhållning av tabellhistorik bestäms av tabellinställningen delta.logRetentionDuration, som är 30 dagar som standard.

Kommentar

Tidsresor och tabellhistorik styrs av olika tröskelvärden för kvarhållning. Läs mer i Vad är Delta Lake-tidsresor?.

DESCRIBE HISTORY table_name       -- get the full history of the table

DESCRIBE HISTORY table_name LIMIT 1  -- get the last operation only

Information om Spark SQL-syntax finns i BESKRIVA HISTORIK.

Se Delta Lake API-dokumentationen för Scala/Java/Python-syntaxinformation.

Katalogutforskaren innehåller en visuell vy över den här detaljerade tabellinformationen och historiken för Delta-tabeller. Förutom tabellschemat och exempeldata kan du klicka på fliken Historik för att se tabellhistoriken som visas med DESCRIBE HISTORY.

Historikschema

Utdata för åtgärden history har följande kolumner.

Column Type Beskrivning
version lång Tabellversion som genereras av åtgärden.
timestamp timestamp När den här versionen har checkats in.
Användar-ID sträng ID för användaren som körde åtgärden.
userName sträng Namnet på den användare som körde åtgärden.
operation sträng Namnet på åtgärden.
operationParameters map Parametrar för åtgärden (till exempel predikat.)
jobb Struct Information om jobbet som körde åtgärden.
notebook-fil Struct Information om notebook-filen som åtgärden kördes från.
clusterId sträng ID för klustret där åtgärden kördes.
readVersion lång Version av tabellen som lästes för att utföra skrivåtgärden.
isolationLevel sträng Isoleringsnivå som används för den här åtgärden.
isBlindAppend boolean Om den här åtgärden har bifogat data.
operationMetrics map Mått för åtgärden (till exempel antal rader och filer som ändrats.)
userMetadata sträng Användardefinierade incheckningsmetadata om det har angetts
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+

Kommentar

Åtgärdsmåttnycklar

Åtgärden history returnerar en samling åtgärdsmått i kolumnkartan operationMetrics .

I följande tabeller visas mappningsnyckeldefinitionerna efter åtgärd.

Åtgärd Måttnamn beskrivning
WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO
numFiles Antal filer som skrivits.
numOutputBytes Storlek i byte för det skriftliga innehållet.
numOutputRows Antal rader som skrivits.
DIREKTUPPSPELNINGSUPPDATERING
numAddedFiles Antal filer som har lagts till.
numRemovedFiles Antal borttagna filer.
numOutputRows Antal rader som skrivits.
numOutputBytes Storleken på skrivning i byte.
DELETE
numAddedFiles Antal filer som har lagts till. Tillhandahålls inte när partitioner i tabellen tas bort.
numRemovedFiles Antal borttagna filer.
numDeletedRows Antal borttagna rader. Tillhandahålls inte när partitioner i tabellen tas bort.
numCopiedRows Antal rader som kopierats i processen för att ta bort filer.
executionTimeMs Tiden det tar att köra hela åtgärden.
scanTimeMs Det tar tid att söka igenom filerna efter matchningar.
rewriteTimeMs Tiden det tar att skriva om de matchade filerna.
TRUNCATE
numRemovedFiles Antal borttagna filer.
executionTimeMs Tiden det tar att köra hela åtgärden.
SLÅ IHOP
numSourceRows Antal rader i källdataramen.
numTargetRowsInserted Antal rader som infogats i måltabellen.
numTargetRowsUpdated Antal rader som har uppdaterats i måltabellen.
numTargetRowsDeleted Antal rader som tagits bort i måltabellen.
numTargetRows Kopierade Antal kopierade målrader.
numOutputRows Totalt antal rader som skrivits ut.
numTargetFilesLägg till Antal filer som har lagts till i mottagare(mål).
numTargetFilesRemoved Antal filer som tagits bort från mottagare(mål).
executionTimeMs Tiden det tar att köra hela åtgärden.
scanTimeMs Det tar tid att söka igenom filerna efter matchningar.
rewriteTimeMs Tiden det tar att skriva om de matchade filerna.
UPDATE
numAddedFiles Antal filer som har lagts till.
numRemovedFiles Antal borttagna filer.
numUpdatedRows Antal rader som har uppdaterats.
numCopiedRows Antal rader som precis kopierats över under uppdateringsprocessen av filer.
executionTimeMs Tiden det tar att köra hela åtgärden.
scanTimeMs Det tar tid att söka igenom filerna efter matchningar.
rewriteTimeMs Tiden det tar att skriva om de matchade filerna.
FSCK numRemovedFiles Antal borttagna filer.
KONVERTERA numConvertedFiles Antal Parquet-filer som har konverterats.
OPTIMIZE
numAddedFiles Antal filer som har lagts till.
numRemovedFiles Antal filer som optimerats.
numAddedBytes Antal byte som lagts till efter att tabellen har optimerats.
numRemovedBytes Antal borttagna byte.
minFileSize Storleken på den minsta filen efter att tabellen har optimerats.
p25FileSize Storleken på den 25:e percentilfilen efter att tabellen har optimerats.
p50FileSize Medianfilstorlek efter att tabellen har optimerats.
p75FileSize Storleken på den 75:e percentilfilen efter att tabellen har optimerats.
maxFileSize Storleken på den största filen efter att tabellen har optimerats.
CLONE
sourceTableSize Storlek i byte för källtabellen i den version som klonas.
sourceNumOfFiles Antal filer i källtabellen i den version som klonas.
numRemovedFiles Antal filer som tagits bort från måltabellen om en tidigare Delta-tabell ersattes.
removedFilesSize Total storlek i byte för de filer som tagits bort från måltabellen om en tidigare Delta-tabell ersattes.
numCopiedFiles Antal filer som kopierades till den nya platsen. 0 för grunda kloner.
kopieradeFilesSize Total storlek i byte för de filer som kopierades till den nya platsen. 0 för grunda kloner.
ÅTERSTÄLLA
tableSizeAfterRestore Tabellstorlek i byte efter återställning.
numOfFilesAfterRestore Antal filer i tabellen efter återställningen.
numRemovedFiles Antal filer som tagits bort av återställningsåtgärden.
numRestoredFiles Antal filer som har lagts till som ett resultat av återställningen.
removedFilesSize Storlek i byte av filer som tagits bort av återställningen.
restoredFilesSize Storlek i byte av filer som lagts till av återställningen.
VACUUM
numDeletedFiles Antal borttagna filer.
numVacuumedDirectories Antal dammsugna kataloger.
numFilesToDelete Antal filer som ska tas bort.

Vad är Delta Lake-tidsresor?

Delta Lake-tidsresor stöder frågor mot tidigare tabellversioner baserat på tidsstämpel eller tabellversion (som registrerats i transaktionsloggen). Du kan använda tidsresor för program, till exempel följande:

  • Återskapa analyser, rapporter eller utdata (till exempel utdata från en maskininlärningsmodell). Detta kan vara användbart för felsökning eller granskning, särskilt i reglerade branscher.
  • Skriva komplexa temporala frågor.
  • Åtgärda misstag i dina data.
  • Tillhandahålla ögonblicksbildisolering för en uppsättning frågor för snabbväxande tabeller.

Viktigt!

Tabellversioner som är tillgängliga med tidsresor bestäms av en kombination av kvarhållningströskeln för transaktionsloggfiler och frekvensen och den angivna kvarhållningen för VACUUM åtgärder. Om du kör VACUUM dagligen med standardvärdena är 7 dagars data tillgängliga för tidsresor.

Syntax för Delta-tidsresor

Du kör frågor mot en Delta-tabell med tidsåtgång genom att lägga till en sats efter tabellnamnspecifikationen.

  • timestamp_expression kan vara något av följande:
    • '2018-10-18T22:15:12.013Z', det vill: en sträng som kan gjutas till en tidsstämpel
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18', det vill: en datumsträng
    • current_timestamp() - interval 12 hours
    • date_sub(current_date(), 1)
    • Andra uttryck som är eller kan gjutas till en tidsstämpel
  • version är ett långt värde som kan hämtas från utdata DESCRIBE HISTORY table_specfrån .

Varken timestamp_expression eller version kan vara underfrågor.

Endast datum- eller tidsstämpelsträngar accepteras. Till exempel "2019-01-01" och "2019-01-01T00:00:00.000Z". Se följande kod för exempelsyntax:

SQL

SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;

Python

df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")

Du kan också använda syntaxen @ för att ange tidsstämpeln eller versionen som en del av tabellnamnet. Tidsstämpeln måste vara i yyyyMMddHHmmssSSS format. Du kan ange en version efter @ genom att lägga till en v till versionen. Se följande kod för exempelsyntax:

SQL

SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123

Python

spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")

Vad är kontrollpunkter för transaktionsloggar?

Delta Lake registrerar tabellversioner som JSON-filer i _delta_log katalogen, som lagras tillsammans med tabelldata. För att optimera frågor om kontrollpunkter aggregerar Delta Lake tabellversioner till Parquet-kontrollpunktsfiler, vilket förhindrar att alla JSON-versioner av tabellhistoriken behöver läsas. Azure Databricks optimerar kontrollpunktsfrekvensen för datastorlek och arbetsbelastning. Användare bör inte behöva interagera direkt med kontrollpunkter. Kontrollpunktsfrekvensen kan komma att ändras utan föregående meddelande.

Konfigurera datakvarhållning för frågor om tidsresor

Om du vill köra frågor mot en tidigare tabellversion måste du behålla både loggen och datafilerna för den versionen.

Datafiler tas bort när VACUUM de körs mot en tabell. Delta Lake hanterar borttagning av loggfiler automatiskt efter kontrollpunktstabellversioner.

Eftersom de flesta Delta-tabeller har VACUUM körts mot dem regelbundet bör punkt-i-tid-frågor respektera kvarhållningströskeln för VACUUM, vilket är 7 dagar som standard.

För att öka tröskelvärdet för datakvarhållning för Delta-tabeller måste du konfigurera följande tabellegenskaper:

  • delta.logRetentionDuration = "interval <interval>": styr hur länge historiken för en tabell sparas. Standardvärdet är interval 30 days.
  • delta.deletedFileRetentionDuration = "interval <interval>": anger tröskelvärdet VACUUM som används för att ta bort datafiler som inte längre refereras till i den aktuella tabellversionen. Standardvärdet är interval 7 days.

Du kan ange Delta-egenskaper när tabellen skapas eller ange dem med en ALTER TABLE instruktion. Se Referens för egenskaper för Delta-tabell.

Kommentar

Du måste ange båda dessa egenskaper för att säkerställa att tabellhistoriken bevaras under längre tid för tabeller med frekventa VACUUM åtgärder. Om du till exempel vill få åtkomst till 30 dagars historiska data anger du delta.deletedFileRetentionDuration = "interval 30 days" (som matchar standardinställningen för delta.logRetentionDuration).

Om du ökar tröskelvärdet för datakvarhållning kan lagringskostnaderna öka i takt med att fler datafiler underhålls.

Återställa en Delta-tabell till ett tidigare tillstånd

Du kan återställa en Delta-tabell till dess tidigare tillstånd med hjälp RESTORE av kommandot . En Delta-tabell underhåller internt historiska versioner av tabellen som gör att den kan återställas till ett tidigare tillstånd. En version som motsvarar det tidigare tillståndet eller en tidsstämpel för när det tidigare tillståndet skapades stöds som alternativ med kommandot RESTORE.

Viktigt!

  • Du kan återställa en redan återställd tabell.
  • Du kan återställa en klonad tabell.
  • Du måste ha MODIFY behörighet för tabellen som återställs.
  • Du kan inte återställa en tabell till en äldre version där datafilerna togs bort manuellt eller av vacuum. Det går fortfarande att återställa till den här versionen delvis om spark.sql.files.ignoreMissingFiles den är inställd på true.
  • Tidsstämpelformatet för återställning till ett tidigare tillstånd är yyyy-MM-dd HH:mm:ss. Endast en date(yyyy-MM-dd)-sträng stöds också.
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;

Syntaxinformation finns i ÅTERSTÄLL.

Viktigt!

Återställning anses vara en dataförändrande åtgärd. Delta Lake-loggposter som lagts till av RESTORE kommandot innehåller dataChange inställt på true. Om det finns ett underordnat program, till exempel ett strukturerat direktuppspelningsjobb som bearbetar uppdateringarna till en Delta Lake-tabell, betraktas posterna i dataändringsloggen som lagts till av återställningsåtgärden som nya datauppdateringar, och bearbetning av dem kan resultera i dubbletter av data.

Till exempel:

Tabellversion Åtgärd Deltalogguppdateringar Poster i uppdateringar av dataändringsloggar
0 INSERT AddFile(/path/to/file-1, dataChange = true) (namn = Viktor, ålder = 29, (namn = George, ålder = 55)
1 INSERT AddFile(/path/to/file-2, dataChange = true) (namn = George, ålder = 39)
2 OPTIMIZE AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) (Inga poster eftersom Optimera komprimering inte ändrar data i tabellen)
3 RESTORE(version=1) RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) (namn = Viktor, ålder = 29), (namn = George, ålder = 55), (namn = George, ålder = 39)

I föregående exempel RESTORE resulterar kommandot i uppdateringar som redan visades när deltatabellversion 0 och 1 lästes. Om en strömmande fråga läste den här tabellen betraktas dessa filer som nyligen tillagda data och bearbetas igen.

Återställa mått

RESTORE rapporterar följande mått som en dataram på en rad när åtgärden är klar:

  • table_size_after_restore: Tabellens storlek efter återställningen.

  • num_of_files_after_restore: Antalet filer i tabellen efter återställningen.

  • num_removed_files: Antal filer som tagits bort (logiskt borttagna) från tabellen.

  • num_restored_files: Antal filer som återställts på grund av återställning.

  • removed_files_size: Total storlek i byte för de filer som tas bort från tabellen.

  • restored_files_size: Total storlek i byte för de filer som återställs.

    Exempel på återställningsmått

Exempel på användning av Delta Lake-tidsresor

  • Åtgärda oavsiktliga borttagningar till en tabell för användaren 111:

    INSERT INTO my_table
      SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
      WHERE userId = 111
    
  • Åtgärda oavsiktliga felaktiga uppdateringar av en tabell:

    MERGE INTO my_table target
      USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
      ON source.userId = target.userId
      WHEN MATCHED THEN UPDATE SET *
    
  • Fråga antalet nya kunder som lagts till under den senaste veckan.

    SELECT count(distinct userId)
    FROM my_table  - (
      SELECT count(distinct userId)
      FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
    

Hur gör jag för att hitta den senaste incheckningens version i Spark-sessionen?

Om du vill hämta versionsnumret för den senaste incheckningen som skrivits av den aktuella SparkSession i alla trådar och alla tabeller frågar du SQL-konfigurationen spark.databricks.delta.lastCommitVersionInSession.

SQL

SET spark.databricks.delta.lastCommitVersionInSession

Python

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Scala

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Om inga incheckningar har gjorts av SparkSessionreturnerar frågan till nyckeln ett tomt värde.

Kommentar

Om du delar samma SparkSession sak i flera trådar liknar det att dela en variabel mellan flera trådar. Du kan nå konkurrensförhållanden eftersom konfigurationsvärdet uppdateras samtidigt.