Condividi tramite


Usare la cronologia di Delta Lake table

Ogni operazione che modifica Delta Lake table crea una nuova versione table. È possibile usare le informazioni sulla cronologia per controllare le operazioni, eseguire il rollback di un tableo eseguire una query su un table in un momento specifico usando il viaggio nel tempo.

Nota

Databricks non consiglia di usare la cronologia di delta Lake table come soluzione di backup a lungo termine per l'archiviazione dei dati. Databricks consiglia di usare solo gli ultimi 7 giorni per le operazioni di viaggio nel tempo, a meno che non siano presenti set configurazioni di conservazione dei dati e dei log a un valore maggiore.

Recuperare la cronologia del Delta table

È possibile recuperare informazioni incluse le operazioni, l'utente e il timestamp per ogni scrittura su un Delta table eseguendo il comando history. Le operazioni vengono restituite in ordine cronologico inverso.

La conservazione della cronologia Table è determinata dall'impostazione tabledelta.logRetentionDuration: ovvero 30 giorni per impostazione predefinita.

Nota

Il tempo di viaggio e la cronologia table sono controllati da soglie di conservazione diverse. Vedere Che cos'è lo spostamento cronologico di Delta Lake?.

DESCRIBE HISTORY table_name       -- get the full history of the table

DESCRIBE HISTORY table_name LIMIT 1  -- get the last operation only

Per informazioni dettagliate sulla sintassi di Spark SQL, vedere DESCRIBE HISTORY.

Per informazioni dettagliate sulla sintassi scala/Java/Python, vedere la documentazione dell'API Delta Lake.

Catalog Explorer offre una visualizzazione visiva delle informazioni dettagliate table e della cronologia per Delta tables. Oltre ai dati di esempio tableschema, è possibile fare clic sulla scheda cronologia per vedere la cronologia table visualizzata con DESCRIBE HISTORY.

Cronologia schema

L'output dell'operazione di history ha il seguente columns.

Column Type Description
versione long Table versione generata dall'operazione.
timestamp timestamp Quando è stato eseguito il commit di questa versione.
userId string ID dell'utente che ha eseguito l'operazione.
userName string Nome dell'utente che ha eseguito l'operazione.
operation (operazione) string Nome dell'operazione.
operationParameters mappa Parameters dell'operazione (ad esempio, predicati).
processo struct Dettagli del processo che ha eseguito l'operazione.
notebook struct Dettagli del notebook da cui è stata eseguita l'operazione.
clusterId string ID del cluster in cui è stata eseguita l'operazione.
readVersion long Versione del table che è stata letta per eseguire l'operazione di scrittura.
isolationLevel string Livello di isolamento usato per questa operazione.
isBlindAppend boolean Indica se questa operazione ha accodato dati.
operationMetrics mappa Metriche dell'operazione (ad esempio, numero di righe e file modificati).
userMetadata string Metadati di commit definiti dall'utente se è stato specificato
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+

Nota

Chiavi delle metriche delle operazioni

L'operazione history restituisce una raccolta di metriche operative nella mappa operationMetricscolumn.

Di seguito tableslist le definizioni delle chiavi della mappa in base all'operazione.

Operazione Nome metrica Description
SCRIVI, CREATE TABLE AS SELECT, SOSTITUISCI TABLE AS SELECT, COPY INTO
numFiles Numero di file scritti.
numOutputBytes Dimensioni in byte del contenuto scritto.
numOutputRows Numero di righe scritte.
Streaming UPDATE
numAddedFiles Numero di file aggiunti.
numRemovedFiles Numero di file rimossi.
numOutputRows Numero di righe scritte.
numOutputBytes Dimensione della scrittura in byte.
DELETE
numAddedFiles Numero di file aggiunti. Non specificato quando vengono eliminate le partizioni del table.
numRemovedFiles Numero di file rimossi.
numDeletedRows Numero di righe rimosse. Non specificato quando vengono eliminate le partizioni del table.
numCopiedRows Numero di righe copiate nel processo di eliminazione dei file.
executionTimeMs Tempo impiegato per eseguire l'intera operazione.
scanTimeMs Tempo impiegato per analizzare i file per individuare le corrispondenze.
rewriteTimeMs Tempo impiegato per riscrivere i file corrispondenti.
TRUNCATE
numRemovedFiles Numero di file rimossi.
executionTimeMs Tempo impiegato per eseguire l'intera operazione.
MERGE
numSourceRows Numero di righe nel dataframe di origine.
numTargetRowsInserted Numero di righe inserite nel target table.
numTargetRowsUpdated Numero di righe aggiornate nel target table.
numTargetRowsDeleted Numero di righe eliminate nella destinazione table.
numTargetRowsCopied Numero di righe di destinazione copiate.
numOutputRows Numero totale di righe scritte.
numTargetFilesAdded Numero di file aggiunti al sink(destinazione).
numTargetFilesRemoved Numero di file rimossi dal sink(destinazione).
executionTimeMs Tempo impiegato per eseguire l'intera operazione.
scanTimeMs Tempo impiegato per analizzare i file per individuare le corrispondenze.
rewriteTimeMs Tempo impiegato per riscrivere i file corrispondenti.
UPDATE
numAddedFiles Numero di file aggiunti.
numRemovedFiles Numero di file rimossi.
numUpdatedRows Numero di righe aggiornate.
numCopiedRows Numero di righe appena copiate nel processo di aggiornamento dei file.
executionTimeMs Tempo impiegato per eseguire l'intera operazione.
scanTimeMs Tempo impiegato per analizzare i file per individuare le corrispondenze.
rewriteTimeMs Tempo impiegato per riscrivere i file corrispondenti.
FSCK numRemovedFiles Numero di file rimossi.
CONVERT numConvertedFiles Numero di file Parquet convertiti.
OPTIMIZE
numAddedFiles Numero di file aggiunti.
numRemovedFiles Numero di file ottimizzati.
numAddedBytes Numero di byte aggiunti dopo l'ottimizzazione del table.
numRemovedBytes Numero di byte rimossi.
minFileSize Dimensioni del file più piccolo dopo l'ottimizzazione del table.
p25FileSize Dimensione del file al 25° percentile dopo che table è stato ottimizzato.
p50FileSize Dimensioni del file mediano dopo l'ottimizzazione del table.
p75FileSize Dimensione del file al 75° percentile dopo l'ottimizzazione del table.
maxFileSize Dimensione del file più grande dopo che il table è stato ottimizzato.
CLONE
sourceTableSize Dimensioni in byte del table di origine nella versione clonata.
sourceNumOfFiles Numero di file nella table di origine nella versione che è stata clonata.
numRemovedFiles Numero di file rimossi dal table di destinazione se è stata sostituita una Delta table precedente.
removedFilesSize Dimensione totale in byte dei file rimossi dal table di destinazione nel caso sia stato sostituito un precedente Delta table.
numCopiedFiles Numero di file copiati nel nuovo percorso. 0 per cloni superficiali.
copiedFilesSize Dimensioni totali in byte dei file copiati nel nuovo percorso. 0 per cloni superficiali.
RESTORE
tableSizeAfterRestore Table dimensione in byte dopo restore.
numOfFilesAfterRestore Numero di file nel table dopo restore.
numRemovedFiles Numero di file rimossi dall'operazione di restore.
numRestoredFiles Numero di file aggiunti come risultato di restore.
removedFilesSize Dimensioni in byte dei file rimossi dal restore.
restoredFilesSize Dimensione in byte dei file aggiunti dal restore.
VACUUM
numDeletedFiles Numero di file eliminati.
numVacuumedDirectories Numero di directory a vuoto.
numFilesToDelete Numero di file da eliminare.

Che cos'è il viaggio nel tempo di Delta Lake?

Il viaggio nel tempo di Delta Lake supporta l'esecuzione di query sulle versioni precedenti table in base al timestamp o alla versione table (come registrato nel log delle transazioni). È possibile usare il tempo di viaggio per le applicazioni, ad esempio le seguenti:

  • Ricreare analisi, report o output (ad esempio, l'output di un modello di Machine Learning). Questo può essere utile per il debug o il controllo, in particolare nei settori regolamentati.
  • Scrivere query temporali complesse.
  • Correggere gli errori nei dati.
  • Fornire isolamento dello snapshot per un set di query per la rapida modifica di tables.

Importante

Table versioni accessibili tramite viaggio nel tempo sono determinate da una combinazione della soglia di conservazione per i file di log delle transazioni e della frequenza e conservazione specificata per le operazioni di VACUUM. Se si esegue VACUUM giornaliero con l'impostazione predefinita values, sono disponibili 7 giorni di dati per il viaggio nel tempo.

Sintassi dello spostamento cronologico di Delta Lake

Per eseguire una query su un table Delta, aggiungere una clausola dopo la specifica del nome table.

  • timestamp_expression può essere uno qualsiasi di:
    • '2018-10-18T22:15:12.013Z'ovvero una stringa di cui è possibile eseguire il cast a un timestamp
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18', ovvero una stringa di data
    • current_timestamp() - interval 12 hours
    • date_sub(current_date(), 1)
    • Qualsiasi altra espressione che è o può essere eseguita il cast a un timestamp
  • version è un valore long che può essere ottenuto dall'output di DESCRIBE HISTORY table_spec.

timestamp_expressionversion possono essere sottoquery.

Vengono accettate solo stringhe di data o timestamp. Ad esempio, "2019-01-01" e "2019-01-01T00:00:00.000Z". Vedere il codice seguente per una sintassi di esempio:

SQL

SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;

Python

df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")

È anche possibile usare la sintassi @ per specificare il timestamp o la versione come parte del nome table. Il timestamp deve essere in yyyyMMddHHmmssSSS formato . È possibile specificare una versione dopo @ anteponendo una v alla versione. Vedere il codice seguente per una sintassi di esempio:

SQL

SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123

Python

spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")

Che cosa sono i checkpoint del log delle transazioni?

Delta Lake registra table versioni come file JSON all'interno della directory _delta_log, archiviate insieme ai dati table. Per le query di checkpoint optimize, Delta Lake aggrega le versioni table nei file di checkpoint Parquet, evitando la necessità di leggere tutte le versioni JSON nella cronologia di table. Azure Databricks ottimizza la frequenza di checkpoint per le dimensioni dei dati e il carico di lavoro. Gli utenti non devono interagire direttamente con i checkpoint. La frequenza del checkpoint è soggetta a modifiche senza preavviso.

Configurare la conservazione dei dati per le query di spostamento del tempo

Per eseguire una query su una versione table precedente, è necessario conservare sia il log che i file di dati per tale versione.

I file di dati vengono eliminati quando VACUUM viene eseguito su un table. Delta Lake gestisce automaticamente la rimozione dei file di log dopo il checkpoint del table.

Poiché la maggior parte dei tables Delta ha VACUUM eseguito regolarmente, le query temporizzato devono rispettare la soglia di conservazione per VACUUM, ovvero 7 giorni per impostazione predefinita.

Per aumentare la soglia di conservazione dei dati per Delta tables, è necessario configurare le proprietà table seguenti:

  • delta.logRetentionDuration = "interval <interval>": controlla per quanto tempo viene mantenuta la cronologia di un table. Il valore predefinito è interval 30 days.
  • delta.deletedFileRetentionDuration = "interval <interval>": determina la soglia che VACUUM usa per remove i file di dati ai quali non si fa più riferimento nella versione corrente table. Il valore predefinito è interval 7 days.

È possibile specificare le proprietà Delta durante la creazione di table o set con un'istruzione ALTER TABLE. Vedere il riferimento alle proprietà Delta table.

Nota

È necessario set entrambe queste proprietà per assicurarsi che la cronologia di table venga mantenuta per una durata più lunga per tables con operazioni di VACUUM frequenti. Ad esempio, per accedere a 30 giorni di dati cronologici, setdelta.deletedFileRetentionDuration = "interval 30 days" (che corrisponde all'impostazione predefinita per delta.logRetentionDuration).

L'aumento della soglia di conservazione dei dati può causare l'aumento dei costi di archiviazione, man mano che vengono mantenuti più file di dati.

Restore portare table Delta a uno stato precedente

È possibile restore un table Delta allo stato precedente usando il comando RESTORE. Un table Delta mantiene internamente le versioni storiche del table che ne consentono il ripristino a uno stato precedente. Una versione corrispondente allo stato precedente o a un timestamp di quando è stato creato lo stato precedente sono supportate come opzioni dal comando RESTORE.

Importante

  • È possibile restore un tablegià ripristinato.
  • È possibile restore un clonatotable.
  • È necessario disporre dell'autorizzazione MODIFY per il table da ripristinare.
  • Non è possibile restore un table a una versione precedente where perché i file dei dati sono stati eliminati manualmente o da vacuum. È ancora possibile ripristinare parzialmente questa versione se spark.sql.files.ignoreMissingFiles è set a true.
  • Il formato timestamp per il ripristino in uno stato precedente è yyyy-MM-dd HH:mm:ss. È supportata anche solo una stringa date(yyyy-MM-dd).
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;

Per informazioni dettagliate sulla sintassi, vedere RESTORE.

Importante

Restore viene considerata un'operazione di modifica dei dati. Le voci di log Delta Lake aggiunte dal comando RESTORE contengono "dataChange"set su "true". Se è presente un'applicazione downstream, ad esempio un flusso strutturato processo che elabora gli aggiornamenti a un delta Lake table, le voci del log delle modifiche dei dati aggiunte dall'operazione di restore vengono considerate come nuovi aggiornamenti dei dati e l'elaborazione può comportare dati duplicati.

Ad esempio:

Table ver. Operazione Aggiornamenti del log differenziale Record negli aggiornamenti del log delle modifiche dei dati
0 INSERT AddFile(/path/to/file-1, dataChange = true) (name = Victor, age = 29, (name = George, age = 55)
1 INSERT AddFile(/path/to/file-2, dataChange = true) (nome = George, età = 39)
2 OPTIMIZE AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) (Nessun record poiché la compattazione Optimize non modifica i dati nel table)
3 RESTORE(versione=1) RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) (name = Victor, age = 29), (name = George, age = 55), (name = George, age = 39)

Nell'esempio precedente, il comando RESTORE genera aggiornamenti già visualizzati durante la lettura di Delta table versione 0 e 1. Se una query di streaming stava leggendo questa table, questi file verranno considerati come dati appena aggiunti e verranno elaborati di nuovo.

metriche Restore

RESTORE segnala le metriche seguenti come singolo dataframe di riga al termine dell'operazione:

  • table_size_after_restore: dimensioni del table dopo il ripristino.

  • num_of_files_after_restore: numero di file nel table dopo il ripristino.

  • num_removed_files: numero di file rimossi (eliminati logicamente) dal table.

  • num_restored_files: numero di file ripristinati a causa del rollback.

  • removed_files_size: dimensioni totali in byte dei file rimossi dalla table.

  • restored_files_size: dimensioni totali in byte dei file ripristinati.

    Restore esempio di metriche

Esempi di utilizzo dello spostamento cronologico di Delta Lake

  • Correggere le eliminazioni accidentali in un table per l'utente 111:

    INSERT INTO my_table
      SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
      WHERE userId = 111
    
  • Correggere gli aggiornamenti accidentali non corretti in un table:

    MERGE INTO my_table target
      USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
      ON source.userId = target.userId
      WHEN MATCHED THEN UPDATE SET *
    
  • Eseguire una query sul numero di nuovi clienti aggiunti nell'ultima settimana.

    SELECT count(distinct userId)
    FROM my_table  - (
      SELECT count(distinct userId)
      FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
    

Ricerca per categorie trovare la versione dell'ultimo commit nella sessione Spark?

Per get il numero di versione dell'ultimo commit scritto dal SparkSession corrente su tutti i thread e in tutti i tables, esegui una query sulla configurazione SQL spark.databricks.delta.lastCommitVersionInSession.

SQL

SET spark.databricks.delta.lastCommitVersionInSession

Python

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Scala

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Se non sono stati eseguiti commit da , l'esecuzione SparkSessiondi query sulla chiave restituisce un valore vuoto.

Nota

Se si condivide lo stesso SparkSession tra più thread, è simile alla condivisione di una variabile tra più thread. È possibile che si verifichino race condition man mano che il valore di configurazione viene aggiornato contemporaneamente.