Trabajar con el historial de tablas de Delta Lake
Cada operación que modifica una tabla de Delta Lake crea una nueva versión de tabla. Puede usar información de historial para auditar las operaciones, revertir una tabla o consultar una tabla en un momento dado específico mediante el viaje en el tiempo.
Nota:
Databricks no recomienda usar el historial de tablas de Delta Lake como una solución de copia de seguridad a largo plazo para el archivado de datos. Databricks recomienda usar solo los últimos 7 días para las operaciones de viaje en el tiempo, a menos que haya establecido configuraciones de retención de datos y registros en un valor mayor.
Recuperación del historial de la tabla Delta
Puede recuperar información incluidas las operaciones, el usuario, la marca de tiempo para cada escritura en una tabla Delta mediante la ejecución del comando history
. Las operaciones se devuelven en orden cronológico inverso.
La retención del historial de tablas viene determinada por la configuración de tabla delta.logRetentionDuration
, que es de 30 días de manera predeterminada.
Nota:
Los distintos umbrales de retención controlan el historial de tabla y el viaje en el tiempo. Consulte ¿Qué es el viaje en el tiempo de Delta Lake?
DESCRIBE HISTORY table_name -- get the full history of the table
DESCRIBE HISTORY table_name LIMIT 1 -- get the last operation only
Para obtener información sobre la sintaxis de Spark SQL, consulte DESCRIBE HISTORY.
Para obtener información sobre la sintaxis de Scala, Java y Python, consulte la Documentación de la API de Delta Lake.
Catalog Explorer proporciona una vista de esta información detallada de las tablas y el historial de las tablas Delta. Además del esquema de la tabla y los datos de ejemplo, puede hacer clic en la pestaña History (Historial) para ver el historial de la tabla que se muestra con DESCRIBE HISTORY
.
Esquema del historial
La salida de la operación history
tiene las columnas siguientes.
Columna | Type | Descripción |
---|---|---|
version | long | Versión de tabla generada por la operación. |
timestamp | timestamp | Cuándo se ha confirmado esta versión. |
userId | string | Identificador del usuario que ejecutó la operación. |
userName | string | Nombre del usuario que ejecutó la operación. |
operation | string | Nombre de la operación. |
operationParameters | map | Parámetros de la operación (por ejemplo, predicados). |
trabajo | struct | Detalles del trabajo que ejecutó la operación. |
notebook | struct | Detalles del cuaderno desde el que se ha ejecutado la operación. |
clusterId | string | Identificador del clúster en el que se ejecutó la operación. |
readVersion | long | Versión de la tabla que se leyó para realizar la operación de escritura. |
isolationLevel | string | Nivel de aislamiento utilizado para esta operación. |
isBlindAppend | boolean | Indica si esta operación ha anexado datos. |
operationMetrics | map | Métricas de la operación (por ejemplo, el número de filas y archivos modificados). |
userMetadata | string | Metadatos de confirmación definidos por el usuario, si se especificaron |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
Nota:
- Algunas de las otras columnas no están disponibles si escribe en una tabla Delta mediante los métodos siguientes:
- Las columnas agregadas en el futuro siempre se agregarán después de la última columna.
Claves de métricas de operación
La operación history
devuelve una colección de métricas de operaciones en la asignación de columnas operationMetrics
.
En las tablas siguientes, se muestran las definiciones de clave de asignación por operación.
Operación | Nombre de métrica | Descripción |
---|---|---|
WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO | ||
numFiles | Número de archivos escritos. | |
numOutputBytes | Tamaño en bytes del contenido escrito. | |
numOutputRows | Número de filas escritas. | |
STREAMING UPDATE | ||
numAddedFiles | Número de archivos agregados. | |
numRemovedFiles | Número de archivos eliminados. | |
numOutputRows | Número de filas escritas. | |
numOutputBytes | Tamaño de escritura en bytes. | |
Delete | ||
numAddedFiles | Número de archivos agregados. No se proporciona cuando se eliminan las particiones de la tabla. | |
numRemovedFiles | Número de archivos eliminados. | |
numDeletedRows | Número de filas eliminadas. No se proporciona cuando se eliminan las particiones de la tabla. | |
numCopiedRows | Número de filas copiadas en el proceso de eliminación de archivos. | |
executionTimeMs | Tiempo que se necesitó para ejecutar toda la operación. | |
scanTimeMs | Tiempo que se necesitó para examinar los archivos en busca de coincidencias. | |
rewriteTimeMs | Tiempo que se necesitó para volver a escribir los archivos coincidentes. | |
TRUNCATE | ||
numRemovedFiles | Número de archivos eliminados. | |
executionTimeMs | Tiempo que se necesitó para ejecutar toda la operación. | |
MERGE | ||
numSourceRows | Número de filas del DataFrame de origen. | |
numTargetRowsInserted | Número de filas insertadas en la tabla de destino. | |
numTargetRowsUpdated | Número de filas actualizadas en la tabla de destino. | |
numTargetRowsDeleted | Número de filas eliminadas en la tabla de destino. | |
numTargetRowsCopied | Número de filas de destino copiadas. | |
numOutputRows | Número total de filas escritas. | |
numTargetFilesAdded | Número de archivos agregados al receptor (destino). | |
numTargetFilesRemoved | Número de archivos eliminados del receptor (destino). | |
executionTimeMs | Tiempo que se necesitó para ejecutar toda la operación. | |
scanTimeMs | Tiempo que se necesitó para examinar los archivos en busca de coincidencias. | |
rewriteTimeMs | Tiempo que se necesitó para volver a escribir los archivos coincidentes. | |
UPDATE | ||
numAddedFiles | Número de archivos agregados. | |
numRemovedFiles | Número de archivos eliminados. | |
numUpdatedRows | Número de filas actualizadas. | |
numCopiedRows | Número de filas que se acaban de copiar en el proceso de actualización de archivos. | |
executionTimeMs | Tiempo que se necesitó para ejecutar toda la operación. | |
scanTimeMs | Tiempo que se necesitó para examinar los archivos en busca de coincidencias. | |
rewriteTimeMs | Tiempo que se necesitó para volver a escribir los archivos coincidentes. | |
FSCK | numRemovedFiles | Número de archivos eliminados. |
CONVERT | numConvertedFiles | Número de archivos Parquet que se han convertido. |
OPTIMIZE | ||
numAddedFiles | Número de archivos agregados. | |
numRemovedFiles | Número de archivos optimizados. | |
numAddedBytes | Número de bytes agregados después de optimizar la tabla. | |
numRemovedBytes | Número de bytes eliminados. | |
minFileSize | Tamaño del archivo más pequeño después de optimizar la tabla. | |
p25FileSize | Tamaño del archivo del percentil 25 después de optimizar la tabla. | |
p50FileSize | Tamaño medio del archivo después de optimizar la tabla. | |
p75FileSize | Tamaño del archivo del percentil 75 después de optimizar la tabla. | |
maxFileSize | Tamaño del archivo más grande después de optimizar la tabla. | |
CLONE | ||
sourceTableSize | Tamaño en bytes de la tabla de origen en la versión clonada. | |
sourceNumOfFiles | Número de archivos de la tabla de origen en la versión clonada. | |
numRemovedFiles | Número de archivos eliminados de la tabla de destino si se ha reemplazado una tabla delta anterior. | |
removedFilesSize | Tamaño total en bytes de los archivos eliminados de la tabla de destino si se ha reemplazado una tabla delta anterior. | |
numCopiedFiles | Número de archivos que se copiaron en la nueva ubicación. 0 para clones superficiales. | |
copiedFilesSize | Tamaño total en bytes de los archivos que se copiaron en la nueva ubicación. 0 para clones superficiales. | |
RESTORE | ||
tableSizeAfterRestore | Tamaño de la tabla en bytes después de la restauración. | |
numOfFilesAfterRestore | Número de archivos de la tabla después de la restauración. | |
numRemovedFiles | Número de archivos eliminados por la operación de restauración. | |
numRestoredFiles | Número de archivos que se agregaron como resultado de la restauración. | |
removedFilesSize | Tamaño en bytes de los archivos eliminados por la restauración. | |
restoredFilesSize | Tamaño en bytes de los archivos agregados por la restauración. | |
VACUUM | ||
numDeletedFiles | Número de archivos eliminados. | |
numVacuumedDirectories | Número de directorios vaciados. | |
numFilesToDelete | Número de archivos que se van a eliminar. |
¿Qué es el viaje en el tiempo de Delta Lake?
El viaje en el tiempo de Delta Lake admite la consulta de versiones de tabla anteriores basadas en la marca de tiempo o la versión de tabla (como se registra en el registro de transacciones). Puede usar el viaje en el tiempo para aplicaciones como las siguientes:
- Volver a crear análisis, informes o salidas (por ejemplo, la salida de un modelo de aprendizaje automático). Esto puede resultar útil para las depuraciones o las auditorías, en particular en los sectores regulados.
- Escribir consultas temporales complejas.
- Corregir errores en los datos.
- Proporcionar aislamiento de instantáneas a un conjunto de consultas para tablas que cambian rápidamente.
Importante
Las versiones de tabla accesibles con el viaje en el tiempo se determinan mediante una combinación del umbral de retención para los archivos de registro de transacciones y la frecuencia y la retención especificada para las operaciones VACUUM
. Si se ejecuta VACUUM
diariamente con los valores predeterminados, hay 7 días de datos disponibles para el viaje en el tiempo.
Sintaxis del viaje en el tiempo de Delta
Para consultar una tabla Delta con viaje en el tiempo, agregue una cláusula después de la especificación del nombre de tabla.
- El valor de
timestamp_expression
puede ser uno de los siguientes:'2018-10-18T22:15:12.013Z'
, es decir, una cadena que se puede convertir en una marca de tiempocast('2018-10-18 13:36:32 CEST' as timestamp)
'2018-10-18'
, es decir, una cadena de fecha.current_timestamp() - interval 12 hours
date_sub(current_date(), 1)
- Cualquier otra expresión que sea una marca de tiempo o se pueda convertir en una
version
es un valor largo que se puede obtener de la salida deDESCRIBE HISTORY table_spec
.
timestamp_expression
ni version
pueden ser subconsultas.
Solo se aceptan cadenas de fecha o de hora. Por ejemplo, "2019-01-01"
y "2019-01-01T00:00:00.000Z"
. Consulte el código siguiente para obtener una sintaxis de ejemplo:
SQL
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;
Python
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")
También puede usar la sintaxis @
para especificar la marca de tiempo o la versión como parte del nombre de la tabla. La marca de tiempo debe estar en formato yyyyMMddHHmmssSSS
. Puede especificar una versión después de @
si antepone v
a la versión. Consulte el código siguiente para obtener una sintaxis de ejemplo:
SQL
SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123
Python
spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")
¿Qué son los puntos de comprobación del registro de transacciones?
Delta Lake registra las versiones de las tablas como archivos JSON dentro del directorio _delta_log
, que se almacena junto con los datos de las tablas. Para optimizar la consulta de los puntos de control, Delta Lake agrega las versiones de las tablas a los archivos de puntos de comprobación de Parquet, lo que evita la necesidad de leer todas las versiones JSON del historial de las tablas. Azure Databricks optimiza la frecuencia de puntos de comprobación para el tamaño y la carga de trabajo de los datos. Los usuarios no deberían tener que interactuar directamente con los puntos de comprobación. La frecuencia de los puntos de comprobación está sujeta a cambios sin previo aviso.
Configuración de la retención de datos para las consultas de viaje en el tiempo
Para consultar una versión de tabla anterior, debe conservar tanto el registro como los archivos de datos de esa versión.
Los archivos de datos se eliminan cuando se ejecuta VACUUM
en una tabla. Delta Lake administra la eliminación automática de archivos de registro después de realizar puntos de comprobación de las versiones de la tabla.
Dado que la mayoría de las tablas de Delta se han ejecutado con VACUUM
con regularidad, las consultas a un momento dado deben respetar el umbral de retención de VACUUM
, que es de 7 días de manera predeterminada.
Para aumentar el umbral de retención de datos para las tablas de Delta, debe configurar las siguientes propiedades de tabla:
delta.logRetentionDuration = "interval <interval>"
: controla cuánto tiempo se conserva el historial de una tabla. El valor predeterminado esinterval 30 days
.delta.deletedFileRetentionDuration = "interval <interval>"
: determina los usos del umbralVACUUM
para quitar los archivos de datos a los que ya no se hace referencia en la versión de la tabla actual. El valor predeterminado esinterval 7 days
.
Puede especificar las propiedades Delta durante la creación de la tabla o establecerlas con una instrucción ALTER TABLE
. Consulte Referencia de propiedades de tabla Delta.
Nota:
Debe establecer ambas propiedades para asegurarse de que el historial de tablas se conserva durante más tiempo para las tablas con operaciones VACUUM
frecuentes. Por ejemplo, para acceder a 30 días de datos históricos, establezca delta.deletedFileRetentionDuration = "interval 30 days"
(que coincide con la configuración predeterminada de delta.logRetentionDuration
).
Aumentar el umbral de retención de datos puede hacer que los costes de almacenamiento aumenten, a medida que se mantienen más archivos de datos.
Restauración de una tabla Delta a un estado anterior
Puede restaurar una tabla Delta a su estado anterior mediante el comando RESTORE
. Una tabla Delta mantiene internamente versiones históricas de la tabla que permiten restaurarla a un estado anterior.
El comando RESTORE
admite como opciones una versión correspondiente al estado anterior o una marca de tiempo de cuándo se creó el estado anterior.
Importante
- Puede restaurar una tabla ya restaurada.
- Puede restaurar una tabla clonada.
- Debe tener el permiso
MODIFY
en la tabla que se va a restaurar. - No puede restaurar una tabla a una versión anterior en la que los archivos de datos se eliminaron manualmente o mediante
vacuum
. La restauración a esta versión parcialmente sigue siendo posible sispark.sql.files.ignoreMissingFiles
se establece entrue
. - El formato de marca de tiempo para restaurar a un estado anterior es
yyyy-MM-dd HH:mm:ss
. También se admite proporcionar solo una cadena de fecha (yyyy-MM-dd
).
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;
Para más detalles sobre la sintaxis, consulte RESTORE.
Importante
La restauración se considera una operación de cambio de datos. Las entradas de registro de Delta Lake agregadas por el comando RESTORE
contienen dataChange establecido en true. Si hay una aplicación de bajada, como un trabajo de streaming estructurado que procesa las actualizaciones de una tabla de Delta Lake, las entradas del registro de cambios de datos agregadas por la operación de restauración se consideran nuevas actualizaciones de datos y su procesamiento puede dar lugar a datos duplicados.
Por ejemplo:
Versión de tabla | Operación | Actualizaciones de registro Delta | Registros en las actualizaciones del registro de cambios de datos |
---|---|---|---|
0 | INSERT | AddFile(/path/to/file-1, dataChange = true) | (name = Viktor, age = 29, (name = George, age = 55) |
1 | INSERT | AddFile(/path/to/file-2, dataChange = true) | (name = George, age = 39) |
2 | OPTIMIZE | AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) | (No hay registros, ya que optimizar la compactación no cambia los datos de la tabla). |
3 | RESTORE(version=1) | RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) | (name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39) |
En el ejemplo anterior, el comando RESTORE
da como resultado actualizaciones que ya se vieron al leer la versión 0 y 1 de la tabla Delta. Si una consulta de streaming leyó esta tabla, estos archivos se considerarán como datos recién agregados y se procesarán de nuevo.
Métricas de restauración
RESTORE
informa de las métricas siguientes como un dataframe de una sola fila una vez completada la operación:
table_size_after_restore
: tamaño de la tabla después de la restauración.num_of_files_after_restore
: número de archivos de la tabla después de la restauración.num_removed_files
: número de archivos quitados (eliminados lógicamente) de la tabla.num_restored_files
: número de archivos restaurados debido a la reversión.removed_files_size
: tamaño total en bytes de los archivos que se han quitado de la tabla.restored_files_size
: tamaño total en bytes de los archivos que se han restaurado.
Ejemplos de uso del viaje en tiempo de Delta Lake
Corregir las eliminaciones accidentales en una tabla para el usuario
111
:INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111
Corregir las actualizaciones incorrectas accidentales de una tabla:
MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *
Consultar el número de clientes nuevos agregados durante la última semana.
SELECT count(distinct userId) FROM my_table - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
¿Cómo puedo encontrar la versión de la última confirmación en la sesión de Spark?
Para obtener el número de versión de la última confirmación escrita por el elemento SparkSession
actual en todos los subprocesos y todas las tablas, consulte la configuración spark.databricks.delta.lastCommitVersionInSession
de SQL.
SQL
SET spark.databricks.delta.lastCommitVersionInSession
Python
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Scala
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Si no ha realizado ninguna confirmación por parte de SparkSession
, se devuelve un valor vacío al consultar la clave.
Nota:
Si comparte el mismo elemento SparkSession
entre varios subprocesos, es similar a compartir una variable entre varios subprocesos; puede encontrarse con condiciones de carrera porque el valor de configuración se actualiza simultáneamente.