Arbeta med deltatabeller i Spark

Slutförd

Du kan arbeta med deltatabeller (eller deltaformatfiler) för att hämta och ändra data på flera sätt.

Använda Spark SQL

Det vanligaste sättet att arbeta med data i deltatabeller i Spark är att använda Spark SQL. Du kan bädda in SQL-instruktioner på andra språk (till exempel PySpark eller Scala) med hjälp av spark.sql-biblioteket . Följande kod infogar till exempel en rad i produkttabellen.

spark.sql("INSERT INTO products VALUES (1, 'Widget', 'Accessories', 2.99)")

Du kan också använda magin %%sql i en notebook-fil för att köra SQL-instruktioner.

%%sql

UPDATE products
SET Price = 2.49 WHERE ProductId = 1;

Använda Delta-API:et

När du vill arbeta med deltafiler i stället för katalogtabeller kan det vara enklare att använda Delta Lake-API:et. Du kan skapa en instans av en DeltaTable från en mappplats som innehåller filer i deltaformat och sedan använda API:et för att ändra data i tabellen.

from delta.tables import *
from pyspark.sql.functions import *

# Create a DeltaTable object
delta_path = "Files/mytable"
deltaTable = DeltaTable.forPath(spark, delta_path)

# Update the table (reduce price of accessories by 10%)
deltaTable.update(
    condition = "Category == 'Accessories'",
    set = { "Price": "Price * 0.9" })

Använda tidsresor för att arbeta med tabellversioner

Ändringar som görs i deltatabeller loggas i transaktionsloggen för tabellen. Du kan använda de loggade transaktionerna för att visa historiken för ändringar som gjorts i tabellen och för att hämta äldre versioner av data (kallas tidsresor)

Om du vill se historiken för en tabell kan du använda DESCRIBE SQL-kommandot som du ser här.

%%sql

DESCRIBE HISTORY products

Resultatet av den här instruktionen visar de transaktioner som har tillämpats på tabellen, som visas här (vissa kolumner har utelämnats):

version timestamp operation operationParameters
2 2023-04-04T21:46:43Z UPPDATERA {"predikat":"(ProductId = 1)"}
1 2023-04-04T21:42:48Z SKRIVA {"mode":"Append","partitionBy":"[]"}
0 2023-04-04T20:04:23Z CREATE TABLE {"isManaged":"true","description":null,"partitionBy":"[]","properties":"{}"}

Om du vill se historiken för en extern tabell kan du ange mappplatsen i stället för tabellnamnet.

%%sql

DESCRIBE HISTORY 'Files/mytable'

Du kan hämta data från en viss version av data genom att läsa deltafilens plats i en dataram och ange den version som krävs som ett versionAsOf alternativ:

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)

Du kan också ange en tidsstämpel med hjälp av alternativet timestampAsOf :

df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_path)