Práce s rozdílovými tabulkami ve Sparku
S rozdílovými tabulkami (nebo soubory s rozdílovým formátem) můžete pracovat a načítat a upravovat data několika způsoby.
Použití Spark SQL
Nejběžnějším způsobem práce s daty v rozdílových tabulkách ve Sparku je použití Spark SQL. Příkazy SQL můžete vložit do jiných jazyků (například PySpark nebo Scala) pomocí knihovny spark.sql . Následující kód například vloží řádek do tabulky products .
spark.sql("INSERT INTO products VALUES (1, 'Widget', 'Accessories', 2.99)")
Případně můžete pomocí %%sql
magie v poznámkovém bloku spouštět příkazy SQL.
%%sql
UPDATE products
SET Price = 2.49 WHERE ProductId = 1;
Použití rozhraní Delta API
Pokud chcete pracovat s rozdílovými soubory místo s tabulkami katalogu, může být jednodušší použít rozhraní Delta Lake API. Instanci tabulky DeltaTable můžete vytvořit z umístění složky obsahující soubory v rozdílovém formátu a pak pomocí rozhraní API upravit data v tabulce.
from delta.tables import *
from pyspark.sql.functions import *
# Create a DeltaTable object
delta_path = "Files/mytable"
deltaTable = DeltaTable.forPath(spark, delta_path)
# Update the table (reduce price of accessories by 10%)
deltaTable.update(
condition = "Category == 'Accessories'",
set = { "Price": "Price * 0.9" })
Práce s verzí tabulek pomocí časového cestování
Změny tabulek delta se protokolují v transakčním protokolu tabulky. Protokolované transakce můžete použít k zobrazení historie změn provedených v tabulce a k načtení starších verzí dat (označované jako časová cesta).
Pokud chcete zobrazit historii tabulky, můžete použít DESCRIBE
příkaz SQL, jak je znázorněno zde.
%%sql
DESCRIBE HISTORY products
Výsledky tohoto příkazu zobrazují transakce použité v tabulce, jak je znázorněno zde (některé sloupce byly vynechány):
version | časové razítko | operation | operationParameters |
---|---|---|---|
2 | 2023-04-04T21:46:43Z | UPDATE | {"predikát":"(ProductId = 1)"} |
0 | 2023-04-04T21:42:48Z | ZÁPIS | {"mode":"Append","partitionBy":"[]"} |
0 | 2023-04-04T20:04:23Z | CREATE TABLE | {"isManaged":"true","description":null,"partitionBy":"[]","properties":"}{} |
Pokud chcete zobrazit historii externí tabulky, můžete místo názvu tabulky zadat umístění složky.
%%sql
DESCRIBE HISTORY 'Files/mytable'
Data z konkrétní verze dat můžete načíst načtením umístění rozdílového souboru do datového rámce a zadáním verze požadované jako versionAsOf
možnosti:
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)
Případně můžete pomocí možnosti zadat časové razítko timestampAsOf
:
df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_path)