Werken met delta lake-tabelgeschiedenis
Elke bewerking waarmee een Delta Lake-tabel wordt gewijzigd, maakt een nieuwe tabelversie. U kunt geschiedenisgegevens gebruiken om bewerkingen te controleren, een tabel terug te draaien of een tabel op een bepaald tijdstip op te vragen met behulp van tijdreizen.
Notitie
Databricks raadt het gebruik van de tabelgeschiedenis van Delta Lake niet aan als langetermijnoplossing voor de back-up van gegevensopslag. Databricks raadt aan om alleen de afgelopen 7 dagen te gebruiken voor time travel, tenzij u configuraties voor gegevens- en logboekretentie hebt ingesteld op een hogere waarde.
Delta-tabelgeschiedenis ophalen
U kunt informatie ophalen, waaronder bewerkingen, gebruikers en tijdstempels voor elke schrijfbewerking naar een Delta-tabel door de history
opdracht uit te voeren. De bewerkingen worden geretourneerd in omgekeerde chronologische volgorde.
Retentie van tabelgeschiedenis wordt bepaald door de tabelinstelling delta.logRetentionDuration
, die standaard 30 dagen is.
Notitie
Time travel en tabelgeschiedenis worden bepaald door verschillende retentiedrempels. Raadpleeg Wat is time travel in Delta Lake?
DESCRIBE HISTORY table_name -- get the full history of the table
DESCRIBE HISTORY table_name LIMIT 1 -- get the last operation only
Zie DESCRIBE HISTORY voor meer informatie over de Spark SQL-syntaxis.
Zie de Documentatie voor de Delta Lake-API voor de syntaxis van Scala/Java/Python.
Catalog Explorer biedt een visuele weergave van deze gedetailleerde tabelinformatie en geschiedenis voor Delta-tabellen. Naast het tabelschema en de voorbeeldgegevens kunt u op het tabblad Geschiedenis klikken om de tabelgeschiedenis weer te geven met DESCRIBE HISTORY
.
Geschiedenisschema
De uitvoer van de history
bewerking heeft de volgende kolommen.
Column | Type | Description |
---|---|---|
version | long | Tabelversie gegenereerd door de bewerking. |
timestamp | timestamp | Wanneer deze versie is doorgevoerd. |
userId | tekenreeks | Id van de gebruiker die de bewerking heeft uitgevoerd. |
gebruikersnaam | tekenreeks | Naam van de gebruiker die de bewerking heeft uitgevoerd. |
schakelapparatuur optimaliseren | tekenreeks | Naam van de bewerking. |
operationParameters | map | Parameters van de bewerking (bijvoorbeeld predicaten.) |
taak | Struct | Details van de taak die de bewerking heeft uitgevoerd. |
notebook | Struct | Details van notebook waaruit de bewerking is uitgevoerd. |
clusterId | tekenreeks | Id van het cluster waarop de bewerking is uitgevoerd. |
readVersion | long | Versie van de tabel die is gelezen om de schrijfbewerking uit te voeren. |
isolationLevel | tekenreeks | Isolatieniveau dat wordt gebruikt voor deze bewerking. |
isBlindAppend | boolean | Of aan deze bewerking gegevens zijn toegevoegd. |
operationMetrics | map | Metrische gegevens van de bewerking (bijvoorbeeld het aantal rijen en bestanden dat is gewijzigd.) |
userMetadata | tekenreeks | Door de gebruiker gedefinieerde doorvoermetagegevens als deze is opgegeven |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
Notitie
- Een aantal andere kolommen zijn niet beschikbaar als u met de volgende methoden naar een Delta-tabel schrijft:
- Kolommen die in de toekomst worden toegevoegd, worden altijd toegevoegd na de laatste kolom.
Sleutels voor metrische gegevens voor bewerking
De history
bewerking retourneert een verzameling metrische gegevens voor bewerkingen in de operationMetrics
kolomtoewijzing.
In de volgende tabellen worden de toewijzingssleuteldefinities per bewerking weergegeven.
Operation | Naam van meetwaarde | Beschrijving |
---|---|---|
SCHRIJVEN, TABEL MAKEN ALS SELECTEREN, TABEL VERVANGEN ALS SELECTEREN, KOPIËREN NAAR | ||
numFiles | Aantal geschreven bestanden. | |
numOutputBytes | Grootte in bytes van de geschreven inhoud. | |
numOutputRows | Aantal rijen dat is geschreven. | |
STREAMINGUPDATE | ||
numAddedFiles | Aantal toegevoegde bestanden. | |
numRemovedFiles | Aantal bestanden verwijderd. | |
numOutputRows | Aantal rijen dat is geschreven. | |
numOutputBytes | Grootte van schrijfbewerkingen in bytes. | |
DELETE | ||
numAddedFiles | Aantal toegevoegde bestanden. Niet opgegeven wanneer partities van de tabel worden verwijderd. | |
numRemovedFiles | Aantal bestanden verwijderd. | |
numDeletedRows | Aantal rijen verwijderd. Niet opgegeven wanneer partities van de tabel worden verwijderd. | |
numCopiedRows | Het aantal rijen dat is gekopieerd tijdens het verwijderen van bestanden. | |
executionTimeMs | De tijd die nodig is om de hele bewerking uit te voeren. | |
scanTimeMs | De tijd die nodig is om de bestanden te scannen op overeenkomsten. | |
rewriteTimeMs | De tijd die nodig is om de overeenkomende bestanden te herschrijven. | |
TRUNCATE | ||
numRemovedFiles | Aantal bestanden verwijderd. | |
executionTimeMs | De tijd die nodig is om de hele bewerking uit te voeren. | |
SAMENVOEGEN | ||
numSourceRows | Het aantal rijen in het dataframe van de bron. | |
numTargetRowsInserted | Het aantal rijen dat is ingevoegd in de doeltabel. | |
numTargetRowsUpdated | Het aantal rijen dat is bijgewerkt in de doeltabel. | |
numTargetRowsDeleted | Het aantal rijen dat in de doeltabel is verwijderd. | |
numTargetRowsCopied | Aantal gekopieerde doelrijen. | |
numOutputRows | Totaal aantal rijen dat is geschreven. | |
numTargetFilesAdded | Het aantal bestanden dat is toegevoegd aan de sink(doel). | |
numTargetFilesRemoved | Aantal bestanden verwijderd uit de sink(doel). | |
executionTimeMs | De tijd die nodig is om de hele bewerking uit te voeren. | |
scanTimeMs | De tijd die nodig is om de bestanden te scannen op overeenkomsten. | |
rewriteTimeMs | De tijd die nodig is om de overeenkomende bestanden te herschrijven. | |
UPDATE | ||
numAddedFiles | Aantal toegevoegde bestanden. | |
numRemovedFiles | Aantal bestanden verwijderd. | |
numUpdatedRows | Aantal rijen bijgewerkt. | |
numCopiedRows | Het aantal rijen dat zojuist is gekopieerd tijdens het bijwerken van bestanden. | |
executionTimeMs | De tijd die nodig is om de hele bewerking uit te voeren. | |
scanTimeMs | De tijd die nodig is om de bestanden te scannen op overeenkomsten. | |
rewriteTimeMs | De tijd die nodig is om de overeenkomende bestanden te herschrijven. | |
FSCK | numRemovedFiles | Aantal bestanden verwijderd. |
OMZETTEN | numConvertedFiles | Aantal Parquet-bestanden dat is geconverteerd. |
OPTIMIZE | ||
numAddedFiles | Aantal toegevoegde bestanden. | |
numRemovedFiles | Het aantal bestanden dat is geoptimaliseerd. | |
numAddedBytes | Het aantal bytes dat is toegevoegd nadat de tabel is geoptimaliseerd. | |
numRemovedBytes | Aantal verwijderde bytes. | |
minFileSize | De grootte van het kleinste bestand nadat de tabel is geoptimaliseerd. | |
p25FileSize | De grootte van het 25e percentielbestand nadat de tabel is geoptimaliseerd. | |
p50FileSize | Mediaanbestandsgrootte nadat de tabel is geoptimaliseerd. | |
p75FileSize | De grootte van het 75e percentielbestand nadat de tabel is geoptimaliseerd. | |
maxFileSize | De grootte van het grootste bestand nadat de tabel is geoptimaliseerd. | |
CLONE | ||
sourceTableSize | Grootte in bytes van de brontabel op de versie die is gekloond. | |
sourceNumOfFiles | Het aantal bestanden in de brontabel op de versie die is gekloond. | |
numRemovedFiles | Het aantal bestanden dat uit de doeltabel is verwijderd als een vorige Delta-tabel is vervangen. | |
removedFilesSize | Totale grootte in bytes van de bestanden die uit de doeltabel zijn verwijderd als een vorige Delta-tabel is vervangen. | |
numCopiedFiles | Het aantal bestanden dat naar de nieuwe locatie is gekopieerd. 0 voor ondiepe klonen. | |
copiedFilesSize | Totale grootte in bytes van de bestanden die zijn gekopieerd naar de nieuwe locatie. 0 voor ondiepe klonen. | |
HERSTELLEN | ||
tableSizeAfterRestore | Tabelgrootte in bytes na herstel. | |
numOfFilesAfterRestore | Aantal bestanden in de tabel na herstel. | |
numRemovedFiles | Het aantal bestanden dat is verwijderd door de herstelbewerking. | |
numRestoredFiles | Aantal bestanden dat is toegevoegd als gevolg van de herstelbewerking. | |
removedFilesSize | Grootte in bytes aan bestanden die zijn verwijderd door de herstelbewerking. | |
restoredFilesSize | Grootte in bytes aan bestanden die zijn toegevoegd door de herstelbewerking. | |
VACUUM | ||
numDeletedFiles | Aantal verwijderde bestanden. | |
numVacuumedDirectories | Aantal gevacueerde mappen. | |
numFilesToDelete | Aantal te verwijderen bestanden. |
Wat is Delta Lake time travel?
Delta Lake time travel ondersteunt het uitvoeren van query's op eerdere tabelversies op basis van tijdstempel of tabelversie (zoals vastgelegd in het transactielogboek). U kunt tijdreizen gebruiken voor toepassingen zoals:
- Analyses, rapporten of uitvoer opnieuw maken (bijvoorbeeld de uitvoer van een machine learning-model). Dit kan nuttig zijn voor foutopsporing of controle, met name in gereguleerde branches.
- Complexe tijdelijke query's schrijven.
- Fouten in uw gegevens corrigeren.
- Het bieden van isolatie van momentopnamen voor een set query's voor snel veranderende tabellen.
Belangrijk
Tabelversies die toegankelijk zijn met tijdreizen, worden bepaald door een combinatie van de bewaardrempel voor transactielogboekbestanden en de frequentie en opgegeven retentie voor VACUUM
bewerkingen. Als u dagelijks met de standaardwaarden uitvoert VACUUM
, zijn zeven dagen aan gegevens beschikbaar voor tijdreizen.
Delta-syntaxis voor time travel
U kunt een query uitvoeren op een Delta-tabel met tijdreizen door een component toe te voegen na de tabelnaamspecificatie.
timestamp_expression
kan een van de volgende zijn:'2018-10-18T22:15:12.013Z'
, dat wil gezegd, een tekenreeks die kan worden gecast naar een tijdstempelcast('2018-10-18 13:36:32 CEST' as timestamp)
'2018-10-18'
, dat wil gezegd, een datumtekenreekscurrent_timestamp() - interval 12 hours
date_sub(current_date(), 1)
- Elke andere expressie die wel of niet kan worden omgezet in een tijdstempel
version
is een lange waarde die kan worden verkregen uit de uitvoer vanDESCRIBE HISTORY table_spec
.
version
Geen van beide timestamp_expression
subquery's.
Alleen datum- of tijdstempeltekenreeksen worden geaccepteerd. Bijvoorbeeld "2019-01-01"
en "2019-01-01T00:00:00.000Z"
. Zie de volgende code voor voorbeeldsyntaxis:
SQL
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;
Python
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")
U kunt de @
syntaxis ook gebruiken om de tijdstempel of versie op te geven als onderdeel van de tabelnaam. De tijdstempel moet een yyyyMMddHHmmssSSS
indeling hebben. U kunt een versie opgeven nadat @
u een v
versie wilt toevoegen aan de versie. Zie de volgende code voor voorbeeldsyntaxis:
SQL
SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123
Python
spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")
Wat zijn controlepunten voor transactielogboeken?
Delta Lake registreert tabelversies als JSON-bestanden in de _delta_log
map, die naast tabelgegevens worden opgeslagen. Om controlepuntquery's te optimaliseren, aggregeert Delta Lake tabelversies naar Parquet-controlepuntbestanden, waardoor alle JSON-versies van de tabelgeschiedenis niet hoeven te worden gelezen. Azure Databricks optimaliseert de frequentie van controlepunten voor gegevensgrootte en workload. Gebruikers hoeven niet rechtstreeks met controlepunten te communiceren. De controlepuntfrequentie kan zonder kennisgeving worden gewijzigd.
Gegevensretentie configureren voor query's voor tijdreizen
Als u een query wilt uitvoeren op een eerdere tabelversie, moet u zowel het logboek als de gegevensbestanden voor die versie behouden.
Gegevensbestanden worden verwijderd wanneer VACUUM
ze worden uitgevoerd op een tabel. Delta Lake beheert het automatisch verwijderen van logboekbestanden na de versies van de controlepuntentabel.
Omdat de meeste Delta-tabellen regelmatig op deze tabellen worden VACUUM
uitgevoerd, moeten point-in-time-query's de retentiedrempel VACUUM
respecteren, wat standaard 7 dagen is.
Als u de drempelwaarde voor gegevensretentie voor Delta-tabellen wilt verhogen, moet u de volgende tabeleigenschappen configureren:
delta.logRetentionDuration = "interval <interval>"
: bepaalt hoe lang de geschiedenis voor een tabel wordt bewaard. De standaardwaarde isinterval 30 days
.delta.deletedFileRetentionDuration = "interval <interval>"
: bepaalt de drempelwaardeVACUUM
die wordt gebruikt om gegevensbestanden te verwijderen waarnaar niet meer wordt verwezen in de huidige tabelversie. De standaardwaarde isinterval 7 days
.
U kunt Delta-eigenschappen opgeven tijdens het maken van een tabel of deze instellen met een ALTER TABLE
instructie. Zie naslaginformatie over eigenschappen van Delta-tabellen.
Notitie
U moet beide eigenschappen instellen om ervoor te zorgen dat de tabelgeschiedenis langer wordt bewaard voor tabellen met frequente VACUUM
bewerkingen. Bijvoorbeeld voor toegang tot 30 dagen historische gegevens, ingesteld delta.deletedFileRetentionDuration = "interval 30 days"
(die overeenkomt met de standaardinstelling voor delta.logRetentionDuration
).
Het verhogen van de drempelwaarde voor gegevensretentie kan ertoe leiden dat uw opslagkosten stijgen, omdat er meer gegevensbestanden worden onderhouden.
Een Delta-tabel herstellen naar een eerdere status
U kunt een Delta-tabel herstellen naar de eerdere status met behulp van de RESTORE
opdracht. Een Delta-tabel onderhoudt intern historische versies van de tabel, zodat deze kan worden hersteld naar een eerdere status.
Een versie die overeenkomt met de eerdere status of een timestamp van wanneer de eerdere status is gemaakt, worden ondersteund als opties door de RESTORE
-opdracht .
Belangrijk
- U kunt een al herstelde tabel herstellen.
- U kunt een gekloonde tabel herstellen.
- U moet gemachtigd zijn
MODIFY
voor de tabel die wordt hersteld. - U kunt een tabel niet herstellen naar een oudere versie waarin de gegevensbestanden handmatig of door
vacuum
zijn verwijderd. Herstellen naar deze versie is gedeeltelijk nog steeds mogelijk alsspark.sql.files.ignoreMissingFiles
deze is ingesteld optrue
. - De tijdstempelindeling voor het herstellen naar een eerdere status is
yyyy-MM-dd HH:mm:ss
. Het verstrekken van alleen een tekenreeks voor datum(yyyy-MM-dd
) wordt ook ondersteund.
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;
Zie RESTORE voor meer informatie over de syntaxis.
Belangrijk
Herstellen wordt beschouwd als een bewerking voor het wijzigen van gegevens. Delta Lake-logboekvermeldingen die door de RESTORE
opdracht zijn toegevoegd, bevatten dataChange ingesteld op true. Als er een downstreamtoepassing is, zoals een gestructureerde streamingtaak die de updates naar een Delta Lake-tabel verwerkt, worden de vermeldingen voor gegevenswijzigingslogboeken die door de herstelbewerking zijn toegevoegd, beschouwd als nieuwe gegevensupdates en kunnen deze verwerken tot dubbele gegevens.
Voorbeeld:
Tabelversie | Operation | Delta-logboekupdates | Records in wijzigingenlogboekupdates voor gegevens |
---|---|---|---|
0 | INSERT | AddFile(/path/to/file-1, dataChange = true) | (naam = Victor, leeftijd = 29, (naam = George, leeftijd = 55) |
1 | INSERT | AddFile(/path/to/file-2, dataChange = true) | (naam = George, leeftijd = 39) |
2 | OPTIMIZE | AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) | (Geen records omdat de compressie optimaliseren de gegevens in de tabel niet wijzigt) |
3 | RESTORE(version=1) | RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) | (naam = Victor, leeftijd = 29), (naam = George, leeftijd = 55), (naam = George, leeftijd = 39) |
In het voorgaande voorbeeld resulteert de RESTORE
opdracht in updates die al zijn gezien bij het lezen van versie 0 en 1 van de Delta-tabel. Als een streamingquery deze tabel heeft gelezen, worden deze bestanden beschouwd als nieuw toegevoegde gegevens en worden deze opnieuw verwerkt.
Metrische gegevens herstellen
RESTORE
rapporteert de volgende metrische gegevens als één rij DataFrame zodra de bewerking is voltooid:
table_size_after_restore
: De grootte van de tabel na het herstellen.num_of_files_after_restore
: Het aantal bestanden in de tabel na het herstellen.num_removed_files
: Het aantal bestanden dat uit de tabel is verwijderd (logisch verwijderd).num_restored_files
: Het aantal bestanden dat is hersteld vanwege terugdraaien.removed_files_size
: Totale grootte in bytes van de bestanden die uit de tabel worden verwijderd.restored_files_size
: Totale grootte in bytes van de bestanden die worden hersteld.
Voorbeelden van het gebruik van time travel in Delta Lake
Onopzettelijke verwijderingen in een tabel voor de gebruiker
111
herstellen:INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111
Onopzettelijke onjuiste updates voor een tabel oplossen:
MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *
Voer een query uit op het aantal nieuwe klanten dat de afgelopen week is toegevoegd.
SELECT count(distinct userId) FROM my_table - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
Hoe kan ik de versie van de laatste doorvoering zoeken in de Spark-sessie?
Als u het versienummer van de laatste doorvoering wilt ophalen die door de huidige SparkSession
is geschreven in alle threads en alle tabellen, voert u een query uit op de SQL-configuratie spark.databricks.delta.lastCommitVersionInSession
.
SQL
SET spark.databricks.delta.lastCommitVersionInSession
Python
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Scala
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Als er geen doorvoeringen zijn uitgevoerd door de SparkSession
sleutel, retourneert een query op de sleutel een lege waarde.
Notitie
Als u hetzelfde SparkSession
deelt over meerdere threads, is het vergelijkbaar met het delen van een variabele over meerdere threads. U kunt racevoorwaarden tegenkomen wanneer de configuratiewaarde gelijktijdig wordt bijgewerkt.