Creación de tablas de Delta Lake
Delta Lake se basa en tablas, que proporcionan una abstracción de almacenamiento relacional sobre los archivos de un lago de datos.
Creación de una tabla de Delta Lake a partir de una trama de datos
Una de las formas más fáciles de crear una tabla de Delta Lake es guardar una trama de datos en formato delta y especificar una ruta de acceso donde se deban almacenar los archivos de datos y la información de metadatos relacionada de la tabla.
Por ejemplo, el siguiente código de PySpark carga una trama de datos con datos de un archivo existente y, luego, la guarda en una nueva ubicación de carpeta en formato delta:
# Load a file into a dataframe
df = spark.read.load('/data/mydata.csv', format='csv', header=True)
# Save the dataframe as a delta table
delta_table_path = "/delta/mydata"
df.write.format("delta").save(delta_table_path)
Después de guardar la tabla delta, la ubicación de la ruta de acceso especificada incluye archivos Parquet para los datos (con independencia del formato del archivo de origen que cargó en la trama de datos) y una carpeta _delta_log que contiene el registro de transacciones de la tabla.
Nota
El registro de transacciones anota todas las modificaciones de datos en la tabla. Al registrarse cada modificación, se puede aplicar coherencia transaccional y se puede conservar la información de control de versiones de la tabla.
Puede reemplazar una tabla de Delta Lake existente por el contenido de una trama de datos mediante el modo sobrescribir, como se muestra aquí:
new_df.write.format("delta").mode("overwrite").save(delta_table_path)
También puede agregar filas de una trama de datos a una tabla existente mediante el modo anexar:
new_rows_df.write.format("delta").mode("append").save(delta_table_path)
Realización de actualizaciones condicionales
Aunque puede modificar los datos en una trama de datos y, luego, sobrescribir una tabla de Delta Lake para reemplazarla, un patrón más común en una base de datos es insertar, actualizar o eliminar filas de una tabla existente como operaciones transaccionales discretas. Para realizar estas modificaciones en una tabla de Delta Lake, puede usar el objeto DeltaTable en Delta Lake API, que admite operaciones de actualización, eliminación y combinación. Por ejemplo, podría usar el código siguiente para actualizar la columna price de todas las filas con un valor de columna de category de "Accesories":
from delta.tables import *
from pyspark.sql.functions import *
# Create a deltaTable object
deltaTable = DeltaTable.forPath(spark, delta_table_path)
# Update the table (reduce price of accessories by 10%)
deltaTable.update(
condition = "Category == 'Accessories'",
set = { "Price": "Price * 0.9" })
Las modificaciones de datos se anotan en el registro de transacciones y se crean nuevos archivos Parquet en la carpeta "table" según sea necesario.
Sugerencia
Para obtener más información sobre el uso de la API de Delta Lake, consulte la documentación de la API de Delta Lake.
Consulta de una versión anterior de una tabla
Las tablas de Delta Lake admiten el control de versiones mediante el registro de transacciones. El registro de transacciones anota las modificaciones realizadas en la tabla, junto con la marca de tiempo y el número de versión de cada transacción. Puede usar estos datos de versión registrada para ver las versiones anteriores de la tabla: una característica conocida como viaje en el tiempo.
Puede recuperar datos de una versión específica de una tabla de Delta Lake leyendo los datos de la ubicación de la tabla delta en una trama de datos, especificando la versión necesaria como opción versionAsOf
:
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
Como alternativa, puede especificar una marca de tiempo mediante la opción timestampAsOf
:
df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_table_path)