Uso de tablas delta en Spark

Completado

Puede trabajar con tablas delta (o archivos de formato delta) para recuperar y modificar datos de varias maneras.

Uso de Spark SQL

La forma más común de trabajar con datos en tablas delta en Spark es usar Spark SQL. Puede insertar instrucciones SQL en otros lenguajes (como PySpark o Scala) mediante la biblioteca spark.sql. Por ejemplo, el código siguiente inserta una fila en la tabla de productos.

spark.sql("INSERT INTO products VALUES (1, 'Widget', 'Accessories', 2.99)")

Como alternativa, puede usar el comando magic %%sql en un cuaderno para ejecutar instrucciones SQL.

%%sql

UPDATE products
SET Price = 2.49 WHERE ProductId = 1;

Uso de Delta API

Si desea trabajar con archivos delta en lugar de tablas de catálogo, puede resultar más sencillo usar Delta Lake API. Puede crear una instancia de DeltaTable desde una ubicación de carpeta que contenga archivos en formato delta y, a continuación, usar la API para modificar los datos de la tabla.

from delta.tables import *
from pyspark.sql.functions import *

# Create a DeltaTable object
delta_path = "Files/mytable"
deltaTable = DeltaTable.forPath(spark, delta_path)

# Update the table (reduce price of accessories by 10%)
deltaTable.update(
    condition = "Category == 'Accessories'",
    set = { "Price": "Price * 0.9" })

Uso de viaje en el tiempo para trabajar con control de versiones de tabla

Las modificaciones realizadas en tablas delta se registran en el registro de transacciones de la tabla. Puede usar las transacciones registradas para ver el historial de cambios realizados en la tabla y recuperar versiones anteriores de los datos (lo que se conoce como viaje en el tiempo).

Para ver el historial de una tabla, puede usar el comando SQL DESCRIBE como se muestra aquí.

%%sql

DESCRIBE HISTORY products

Los resultados de esta instrucción muestran las transacciones que se han aplicado a la tabla, como se muestra aquí (se han omitido algunas columnas):

version timestamp operation operationParameters
2 2023-04-04T21:46:43Z UPDATE {"predicate":"(ProductId = 1)"}
1 2023-04-04T21:42:48Z WRITE {"mode":"Append","partitionBy":"[]"}
0 2023-04-04T20:04:23Z CREATE TABLE {"isManaged":"true","description":null,"partitionBy":"[]","properties":"{}"}

Para ver el historial de una tabla externa, puede especificar la ubicación de la carpeta en lugar del nombre de tabla.

%%sql

DESCRIBE HISTORY 'Files/mytable'

Puede recuperar datos de una versión específica de los datos leyendo la ubicación del archivo delta en un dataframe, especificando la versión necesaria como opción versionAsOf:

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)

Como alternativa, puede especificar una marca de tiempo mediante la opción timestampAsOf:

df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_path)