Arbeta med deltatabeller i Spark
Du kan arbeta med deltatabeller (eller deltaformatfiler) för att hämta och ändra data på flera sätt.
Använda Spark SQL
Det vanligaste sättet att arbeta med data i deltatabeller i Spark är att använda Spark SQL. Du kan bädda in SQL-instruktioner på andra språk (till exempel PySpark eller Scala) med hjälp av spark.sql-biblioteket . Följande kod infogar till exempel en rad i produkttabellen.
spark.sql("INSERT INTO products VALUES (1, 'Widget', 'Accessories', 2.99)")
Du kan också använda magin %%sql
i en notebook-fil för att köra SQL-instruktioner.
%%sql
UPDATE products
SET Price = 2.49 WHERE ProductId = 1;
Använda Delta-API:et
När du vill arbeta med deltafiler i stället för katalogtabeller kan det vara enklare att använda Delta Lake-API:et. Du kan skapa en instans av en DeltaTable från en mappplats som innehåller filer i deltaformat och sedan använda API:et för att ändra data i tabellen.
from delta.tables import *
from pyspark.sql.functions import *
# Create a DeltaTable object
delta_path = "Files/mytable"
deltaTable = DeltaTable.forPath(spark, delta_path)
# Update the table (reduce price of accessories by 10%)
deltaTable.update(
condition = "Category == 'Accessories'",
set = { "Price": "Price * 0.9" })
Använda tidsresor för att arbeta med tabellversioner
Ändringar som görs i deltatabeller loggas i transaktionsloggen för tabellen. Du kan använda de loggade transaktionerna för att visa historiken för ändringar som gjorts i tabellen och för att hämta äldre versioner av data (kallas tidsresor)
Om du vill se historiken för en tabell kan du använda DESCRIBE
SQL-kommandot som du ser här.
%%sql
DESCRIBE HISTORY products
Resultatet av den här instruktionen visar de transaktioner som har tillämpats på tabellen, som visas här (vissa kolumner har utelämnats):
version | timestamp | operation | operationParameters |
---|---|---|---|
2 | 2023-04-04T21:46:43Z | UPPDATERA | {"predikat":"(ProductId = 1)"} |
1 | 2023-04-04T21:42:48Z | SKRIVA | {"mode":"Append","partitionBy":"[]"} |
0 | 2023-04-04T20:04:23Z | CREATE TABLE | {"isManaged":"true","description":null,"partitionBy":"[]","properties":"{}"} |
Om du vill se historiken för en extern tabell kan du ange mappplatsen i stället för tabellnamnet.
%%sql
DESCRIBE HISTORY 'Files/mytable'
Du kan hämta data från en viss version av data genom att läsa deltafilens plats i en dataram och ange den version som krävs som ett versionAsOf
alternativ:
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)
Du kan också ange en tidsstämpel med hjälp av alternativet timestampAsOf
:
df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_path)