Vytvoření tabulek Delta Lake
Delta Lake je postavená na tabulkách, které poskytují abstrakci relačního úložiště nad soubory v datovém jezeře.
Vytvoření tabulky Delta Lake z datového rámce
Jedním z nejjednodušších způsobů vytvoření tabulky Delta Lake je uložení datového rámce ve formátu delta a určení cesty, kam se mají ukládat datové soubory a související informace o metadatech tabulky.
Například následující kód PySpark načte datový rámec s daty z existujícího souboru a potom tento datový rámec uloží do nového umístění složky v rozdílovém formátu:
# Load a file into a dataframe
df = spark.read.load('/data/mydata.csv', format='csv', header=True)
# Save the dataframe as a delta table
delta_table_path = "/delta/mydata"
df.write.format("delta").save(delta_table_path)
Po uložení tabulky Delta obsahuje zadané umístění cesty soubory parquet pro data (bez ohledu na formát zdrojového souboru, který jste načetli do datového rámce) a složku _delta_log obsahující transakční protokol tabulky.
Poznámka:
Transakční protokol zaznamenává všechny úpravy dat v tabulce. Protokolováním každé úpravy je možné vynutit transakční konzistenci a zachovat informace o správě verzí tabulky.
Existující tabulku Delta Lake můžete nahradit obsahem datového rámce pomocí režimu přepsání , jak je znázorněno tady:
new_df.write.format("delta").mode("overwrite").save(delta_table_path)
K existující tabulce můžete také přidat řádky z datového rámce pomocí režimu připojení :
new_rows_df.write.format("delta").mode("append").save(delta_table_path)
Provádění podmíněných aktualizací
I když můžete provádět úpravy dat v datovém rámci a pak nahradit tabulku Delta Lake tím, že ji přepíšete, je častějším vzorem v databázi vložení, aktualizace nebo odstranění řádků v existující tabulce jako diskrétní transakční operace. K provedení takových úprav tabulky Delta Lake můžete použít objekt DeltaTable v rozhraní Delta Lake API, který podporuje operace aktualizace, odstranění a sloučení . Například můžete použít následující kód k aktualizaci sloupce ceny pro všechny řádky s hodnotou sloupce kategorie "Příslušenství":
from delta.tables import *
from pyspark.sql.functions import *
# Create a deltaTable object
deltaTable = DeltaTable.forPath(spark, delta_table_path)
# Update the table (reduce price of accessories by 10%)
deltaTable.update(
condition = "Category == 'Accessories'",
set = { "Price": "Price * 0.9" })
Změny dat se zaznamenávají v transakčním protokolu a nové soubory parquet se vytvoří ve složce tabulky podle potřeby.
Tip
Další informace o používání rozhraní Delta Lake API najdete v dokumentaci k rozhraní Delta Lake API.
Dotazování předchozí verze tabulky
Tabulky Delta Lake podporují správu verzí prostřednictvím transakčního protokolu. Transakční protokol zaznamenává změny provedené v tabulce, přičemž uvádí časové razítko a číslo verze pro každou transakci. Tato data protokolované verze můžete použít k zobrazení předchozích verzí tabulky – funkce označovaná jako časová cesta.
Data z konkrétní verze tabulky Delta Lake můžete načíst načtením dat z umístění tabulky Delta do datového rámce a určením požadované verze jako versionAsOf
možnosti:
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
Případně můžete pomocí možnosti zadat časové razítko timestampAsOf
:
df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_table_path)