Usare la cronologia di Delta Lake table
Ogni operazione che modifica Delta Lake table crea una nuova versione table. È possibile usare le informazioni sulla cronologia per controllare le operazioni, eseguire il rollback di un tableo eseguire una query su un table in un momento specifico usando il viaggio nel tempo.
Nota
Databricks non consiglia di usare la cronologia di delta Lake table come soluzione di backup a lungo termine per l'archiviazione dei dati. Databricks consiglia di usare solo gli ultimi 7 giorni per le operazioni di viaggio nel tempo, a meno che non siano presenti set configurazioni di conservazione dei dati e dei log a un valore maggiore.
Recuperare la cronologia del Delta table
È possibile recuperare informazioni incluse le operazioni, l'utente e il timestamp per ogni scrittura su un Delta table eseguendo il comando history
. Le operazioni vengono restituite in ordine cronologico inverso.
La conservazione della cronologia Table è determinata dall'impostazione tabledelta.logRetentionDuration
: ovvero 30 giorni per impostazione predefinita.
Nota
Il tempo di viaggio e la cronologia table sono controllati da soglie di conservazione diverse. Vedere Che cos'è lo spostamento cronologico di Delta Lake?.
DESCRIBE HISTORY table_name -- get the full history of the table
DESCRIBE HISTORY table_name LIMIT 1 -- get the last operation only
Per informazioni dettagliate sulla sintassi di Spark SQL, vedere DESCRIBE HISTORY.
Per informazioni dettagliate sulla sintassi scala/Java/Python, vedere la documentazione dell'API Delta Lake.
Catalog Explorer offre una visualizzazione visiva delle informazioni dettagliate table e della cronologia per Delta tables. Oltre ai dati di esempio tableschema, è possibile fare clic sulla scheda cronologia per vedere la cronologia table visualizzata con DESCRIBE HISTORY
.
Cronologia schema
L'output dell'operazione di history
ha il seguente columns.
Column | Type | Description |
---|---|---|
versione | long | Table versione generata dall'operazione. |
timestamp | timestamp | Quando è stato eseguito il commit di questa versione. |
userId | string | ID dell'utente che ha eseguito l'operazione. |
userName | string | Nome dell'utente che ha eseguito l'operazione. |
operation (operazione) | string | Nome dell'operazione. |
operationParameters | mappa | Parameters dell'operazione (ad esempio, predicati). |
processo | struct | Dettagli del processo che ha eseguito l'operazione. |
notebook | struct | Dettagli del notebook da cui è stata eseguita l'operazione. |
clusterId | string | ID del cluster in cui è stata eseguita l'operazione. |
readVersion | long | Versione del table che è stata letta per eseguire l'operazione di scrittura. |
isolationLevel | string | Livello di isolamento usato per questa operazione. |
isBlindAppend | boolean | Indica se questa operazione ha accodato dati. |
operationMetrics | mappa | Metriche dell'operazione (ad esempio, numero di righe e file modificati). |
userMetadata | string | Metadati di commit definiti dall'utente se è stato specificato |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
Nota
- Alcune delle altre columns non sono disponibili se si scrive in un Delta table utilizzando i metodi seguenti:
- Columns aggiunti in futuro, verranno sempre aggiunti dopo l'ultimo column.
Chiavi delle metriche delle operazioni
L'operazione history
restituisce una raccolta di metriche operative nella mappa operationMetrics
column.
Di seguito tableslist le definizioni delle chiavi della mappa in base all'operazione.
Operazione | Nome metrica | Description |
---|---|---|
SCRIVI, CREATE TABLE AS SELECT, SOSTITUISCI TABLE AS SELECT, COPY INTO | ||
numFiles | Numero di file scritti. | |
numOutputBytes | Dimensioni in byte del contenuto scritto. | |
numOutputRows | Numero di righe scritte. | |
Streaming UPDATE | ||
numAddedFiles | Numero di file aggiunti. | |
numRemovedFiles | Numero di file rimossi. | |
numOutputRows | Numero di righe scritte. | |
numOutputBytes | Dimensione della scrittura in byte. | |
DELETE | ||
numAddedFiles | Numero di file aggiunti. Non specificato quando vengono eliminate le partizioni del table. | |
numRemovedFiles | Numero di file rimossi. | |
numDeletedRows | Numero di righe rimosse. Non specificato quando vengono eliminate le partizioni del table. | |
numCopiedRows | Numero di righe copiate nel processo di eliminazione dei file. | |
executionTimeMs | Tempo impiegato per eseguire l'intera operazione. | |
scanTimeMs | Tempo impiegato per analizzare i file per individuare le corrispondenze. | |
rewriteTimeMs | Tempo impiegato per riscrivere i file corrispondenti. | |
TRUNCATE | ||
numRemovedFiles | Numero di file rimossi. | |
executionTimeMs | Tempo impiegato per eseguire l'intera operazione. | |
MERGE | ||
numSourceRows | Numero di righe nel dataframe di origine. | |
numTargetRowsInserted | Numero di righe inserite nel target table. | |
numTargetRowsUpdated | Numero di righe aggiornate nel target table. | |
numTargetRowsDeleted | Numero di righe eliminate nella destinazione table. | |
numTargetRowsCopied | Numero di righe di destinazione copiate. | |
numOutputRows | Numero totale di righe scritte. | |
numTargetFilesAdded | Numero di file aggiunti al sink(destinazione). | |
numTargetFilesRemoved | Numero di file rimossi dal sink(destinazione). | |
executionTimeMs | Tempo impiegato per eseguire l'intera operazione. | |
scanTimeMs | Tempo impiegato per analizzare i file per individuare le corrispondenze. | |
rewriteTimeMs | Tempo impiegato per riscrivere i file corrispondenti. | |
UPDATE | ||
numAddedFiles | Numero di file aggiunti. | |
numRemovedFiles | Numero di file rimossi. | |
numUpdatedRows | Numero di righe aggiornate. | |
numCopiedRows | Numero di righe appena copiate nel processo di aggiornamento dei file. | |
executionTimeMs | Tempo impiegato per eseguire l'intera operazione. | |
scanTimeMs | Tempo impiegato per analizzare i file per individuare le corrispondenze. | |
rewriteTimeMs | Tempo impiegato per riscrivere i file corrispondenti. | |
FSCK | numRemovedFiles | Numero di file rimossi. |
CONVERT | numConvertedFiles | Numero di file Parquet convertiti. |
OPTIMIZE | ||
numAddedFiles | Numero di file aggiunti. | |
numRemovedFiles | Numero di file ottimizzati. | |
numAddedBytes | Numero di byte aggiunti dopo l'ottimizzazione del table. | |
numRemovedBytes | Numero di byte rimossi. | |
minFileSize | Dimensioni del file più piccolo dopo l'ottimizzazione del table. | |
p25FileSize | Dimensione del file al 25° percentile dopo che table è stato ottimizzato. | |
p50FileSize | Dimensioni del file mediano dopo l'ottimizzazione del table. | |
p75FileSize | Dimensione del file al 75° percentile dopo l'ottimizzazione del table. | |
maxFileSize | Dimensione del file più grande dopo che il table è stato ottimizzato. | |
CLONE | ||
sourceTableSize | Dimensioni in byte del table di origine nella versione clonata. | |
sourceNumOfFiles | Numero di file nella table di origine nella versione che è stata clonata. | |
numRemovedFiles | Numero di file rimossi dal table di destinazione se è stata sostituita una Delta table precedente. | |
removedFilesSize | Dimensione totale in byte dei file rimossi dal table di destinazione nel caso sia stato sostituito un precedente Delta table. | |
numCopiedFiles | Numero di file copiati nel nuovo percorso. 0 per cloni superficiali. | |
copiedFilesSize | Dimensioni totali in byte dei file copiati nel nuovo percorso. 0 per cloni superficiali. | |
RESTORE | ||
tableSizeAfterRestore | Table dimensione in byte dopo restore. | |
numOfFilesAfterRestore | Numero di file nel table dopo restore. | |
numRemovedFiles | Numero di file rimossi dall'operazione di restore. | |
numRestoredFiles | Numero di file aggiunti come risultato di restore. | |
removedFilesSize | Dimensioni in byte dei file rimossi dal restore. | |
restoredFilesSize | Dimensione in byte dei file aggiunti dal restore. | |
VACUUM | ||
numDeletedFiles | Numero di file eliminati. | |
numVacuumedDirectories | Numero di directory a vuoto. | |
numFilesToDelete | Numero di file da eliminare. |
Che cos'è il viaggio nel tempo di Delta Lake?
Il viaggio nel tempo di Delta Lake supporta l'esecuzione di query sulle versioni precedenti table in base al timestamp o alla versione table (come registrato nel log delle transazioni). È possibile usare il tempo di viaggio per le applicazioni, ad esempio le seguenti:
- Ricreare analisi, report o output (ad esempio, l'output di un modello di Machine Learning). Questo può essere utile per il debug o il controllo, in particolare nei settori regolamentati.
- Scrivere query temporali complesse.
- Correggere gli errori nei dati.
- Fornire isolamento dello snapshot per un set di query per la rapida modifica di tables.
Importante
Table versioni accessibili tramite viaggio nel tempo sono determinate da una combinazione della soglia di conservazione per i file di log delle transazioni e della frequenza e conservazione specificata per le operazioni di VACUUM
. Se si esegue VACUUM
giornaliero con l'impostazione predefinita values, sono disponibili 7 giorni di dati per il viaggio nel tempo.
Sintassi dello spostamento cronologico di Delta Lake
Per eseguire una query su un table Delta, aggiungere una clausola dopo la specifica del nome table.
-
timestamp_expression
può essere uno qualsiasi di:-
'2018-10-18T22:15:12.013Z'
ovvero una stringa di cui è possibile eseguire il cast a un timestamp cast('2018-10-18 13:36:32 CEST' as timestamp)
-
'2018-10-18'
, ovvero una stringa di data current_timestamp() - interval 12 hours
date_sub(current_date(), 1)
- Qualsiasi altra espressione che è o può essere eseguita il cast a un timestamp
-
-
version
è un valore long che può essere ottenuto dall'output diDESCRIBE HISTORY table_spec
.
Né timestamp_expression
né version
possono essere sottoquery.
Vengono accettate solo stringhe di data o timestamp. Ad esempio, "2019-01-01"
e "2019-01-01T00:00:00.000Z"
. Vedere il codice seguente per una sintassi di esempio:
SQL
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;
Python
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")
È anche possibile usare la sintassi @
per specificare il timestamp o la versione come parte del nome table. Il timestamp deve essere in yyyyMMddHHmmssSSS
formato . È possibile specificare una versione dopo @
anteponendo una v
alla versione. Vedere il codice seguente per una sintassi di esempio:
SQL
SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123
Python
spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")
Che cosa sono i checkpoint del log delle transazioni?
Delta Lake registra table versioni come file JSON all'interno della directory _delta_log
, archiviate insieme ai dati table. Per le query di checkpoint optimize, Delta Lake aggrega le versioni table nei file di checkpoint Parquet, evitando la necessità di leggere tutte le versioni JSON nella cronologia di table. Azure Databricks ottimizza la frequenza di checkpoint per le dimensioni dei dati e il carico di lavoro. Gli utenti non devono interagire direttamente con i checkpoint. La frequenza del checkpoint è soggetta a modifiche senza preavviso.
Configurare la conservazione dei dati per le query di spostamento del tempo
Per eseguire una query su una versione table precedente, è necessario conservare sia il log che i file di dati per tale versione.
I file di dati vengono eliminati quando VACUUM
viene eseguito su un table. Delta Lake gestisce automaticamente la rimozione dei file di log dopo il checkpoint del table.
Poiché la maggior parte dei tables Delta ha VACUUM
eseguito regolarmente, le query temporizzato devono rispettare la soglia di conservazione per VACUUM
, ovvero 7 giorni per impostazione predefinita.
Per aumentare la soglia di conservazione dei dati per Delta tables, è necessario configurare le proprietà table seguenti:
-
delta.logRetentionDuration = "interval <interval>"
: controlla per quanto tempo viene mantenuta la cronologia di un table. Il valore predefinito èinterval 30 days
. -
delta.deletedFileRetentionDuration = "interval <interval>"
: determina la soglia cheVACUUM
usa per remove i file di dati ai quali non si fa più riferimento nella versione corrente table. Il valore predefinito èinterval 7 days
.
È possibile specificare le proprietà Delta durante la creazione di table o set con un'istruzione ALTER TABLE
. Vedere il riferimento alle proprietà Delta table.
Nota
È necessario set entrambe queste proprietà per assicurarsi che la cronologia di table venga mantenuta per una durata più lunga per tables con operazioni di VACUUM
frequenti. Ad esempio, per accedere a 30 giorni di dati cronologici, setdelta.deletedFileRetentionDuration = "interval 30 days"
(che corrisponde all'impostazione predefinita per delta.logRetentionDuration
).
L'aumento della soglia di conservazione dei dati può causare l'aumento dei costi di archiviazione, man mano che vengono mantenuti più file di dati.
Restore portare table Delta a uno stato precedente
È possibile restore un table Delta allo stato precedente usando il comando RESTORE
. Un table Delta mantiene internamente le versioni storiche del table che ne consentono il ripristino a uno stato precedente.
Una versione corrispondente allo stato precedente o a un timestamp di quando è stato creato lo stato precedente sono supportate come opzioni dal comando RESTORE
.
Importante
- È possibile restore un tablegià ripristinato.
- È possibile restore un clonatotable.
- È necessario disporre dell'autorizzazione
MODIFY
per il table da ripristinare. - Non è possibile restore un table a una versione precedente where perché i file dei dati sono stati eliminati manualmente o da
vacuum
. È ancora possibile ripristinare parzialmente questa versione sespark.sql.files.ignoreMissingFiles
è set atrue
. - Il formato timestamp per il ripristino in uno stato precedente è
yyyy-MM-dd HH:mm:ss
. È supportata anche solo una stringa date(yyyy-MM-dd
).
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;
Per informazioni dettagliate sulla sintassi, vedere RESTORE.
Importante
Restore viene considerata un'operazione di modifica dei dati. Le voci di log Delta Lake aggiunte dal comando RESTORE
contengono "dataChange"set su "true". Se è presente un'applicazione downstream, ad esempio un flusso strutturato processo che elabora gli aggiornamenti a un delta Lake table, le voci del log delle modifiche dei dati aggiunte dall'operazione di restore vengono considerate come nuovi aggiornamenti dei dati e l'elaborazione può comportare dati duplicati.
Ad esempio:
Table ver. | Operazione | Aggiornamenti del log differenziale | Record negli aggiornamenti del log delle modifiche dei dati |
---|---|---|---|
0 | INSERT | AddFile(/path/to/file-1, dataChange = true) | (name = Victor, age = 29, (name = George, age = 55) |
1 | INSERT | AddFile(/path/to/file-2, dataChange = true) | (nome = George, età = 39) |
2 | OPTIMIZE | AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) | (Nessun record poiché la compattazione Optimize non modifica i dati nel table) |
3 | RESTORE(versione=1) | RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) | (name = Victor, age = 29), (name = George, age = 55), (name = George, age = 39) |
Nell'esempio precedente, il comando RESTORE
genera aggiornamenti già visualizzati durante la lettura di Delta table versione 0 e 1. Se una query di streaming stava leggendo questa table, questi file verranno considerati come dati appena aggiunti e verranno elaborati di nuovo.
metriche Restore
RESTORE
segnala le metriche seguenti come singolo dataframe di riga al termine dell'operazione:
table_size_after_restore
: dimensioni del table dopo il ripristino.num_of_files_after_restore
: numero di file nel table dopo il ripristino.num_removed_files
: numero di file rimossi (eliminati logicamente) dal table.num_restored_files
: numero di file ripristinati a causa del rollback.removed_files_size
: dimensioni totali in byte dei file rimossi dalla table.restored_files_size
: dimensioni totali in byte dei file ripristinati.
Esempi di utilizzo dello spostamento cronologico di Delta Lake
Correggere le eliminazioni accidentali in un table per l'utente
111
:INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111
Correggere gli aggiornamenti accidentali non corretti in un table:
MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *
Eseguire una query sul numero di nuovi clienti aggiunti nell'ultima settimana.
SELECT count(distinct userId) FROM my_table - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
Ricerca per categorie trovare la versione dell'ultimo commit nella sessione Spark?
Per get il numero di versione dell'ultimo commit scritto dal SparkSession
corrente su tutti i thread e in tutti i tables, esegui una query sulla configurazione SQL spark.databricks.delta.lastCommitVersionInSession
.
SQL
SET spark.databricks.delta.lastCommitVersionInSession
Python
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Scala
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Se non sono stati eseguiti commit da , l'esecuzione SparkSession
di query sulla chiave restituisce un valore vuoto.
Nota
Se si condivide lo stesso SparkSession
tra più thread, è simile alla condivisione di una variabile tra più thread. È possibile che si verifichino race condition man mano che il valore di configurazione viene aggiornato contemporaneamente.