Vytvoření tabulek Delta Lake

Dokončeno

Delta Lake je postavená na tabulkách, které poskytují abstrakci relačního úložiště nad soubory v datovém jezeře.

Vytvoření tabulky Delta Lake z datového rámce

Jedním z nejjednodušších způsobů vytvoření tabulky Delta Lake je uložení datového rámce ve formátu delta a určení cesty, kam se mají ukládat datové soubory a související informace o metadatech tabulky.

Například následující kód PySpark načte datový rámec s daty z existujícího souboru a potom tento datový rámec uloží do nového umístění složky v rozdílovém formátu:

# Load a file into a dataframe
df = spark.read.load('/data/mydata.csv', format='csv', header=True)

# Save the dataframe as a delta table
delta_table_path = "/delta/mydata"
df.write.format("delta").save(delta_table_path)

Po uložení tabulky Delta obsahuje zadané umístění cesty soubory parquet pro data (bez ohledu na formát zdrojového souboru, který jste načetli do datového rámce) a složku _delta_log obsahující transakční protokol tabulky.

Poznámka:

Transakční protokol zaznamenává všechny úpravy dat v tabulce. Protokolováním každé úpravy je možné vynutit transakční konzistenci a zachovat informace o správě verzí tabulky.

Existující tabulku Delta Lake můžete nahradit obsahem datového rámce pomocí režimu přepsání , jak je znázorněno tady:

new_df.write.format("delta").mode("overwrite").save(delta_table_path)

K existující tabulce můžete také přidat řádky z datového rámce pomocí režimu připojení :

new_rows_df.write.format("delta").mode("append").save(delta_table_path)

Provádění podmíněných aktualizací

I když můžete provádět úpravy dat v datovém rámci a pak nahradit tabulku Delta Lake tím, že ji přepíšete, je častějším vzorem v databázi vložení, aktualizace nebo odstranění řádků v existující tabulce jako diskrétní transakční operace. K provedení takových úprav tabulky Delta Lake můžete použít objekt DeltaTable v rozhraní Delta Lake API, který podporuje operace aktualizace, odstranění a sloučení . Například můžete použít následující kód k aktualizaci sloupce ceny pro všechny řádky s hodnotou sloupce kategorie "Příslušenství":

from delta.tables import *
from pyspark.sql.functions import *

# Create a deltaTable object
deltaTable = DeltaTable.forPath(spark, delta_table_path)

# Update the table (reduce price of accessories by 10%)
deltaTable.update(
    condition = "Category == 'Accessories'",
    set = { "Price": "Price * 0.9" })

Změny dat se zaznamenávají v transakčním protokolu a nové soubory parquet se vytvoří ve složce tabulky podle potřeby.

Tip

Další informace o používání rozhraní Delta Lake API najdete v dokumentaci k rozhraní Delta Lake API.

Dotazování předchozí verze tabulky

Tabulky Delta Lake podporují správu verzí prostřednictvím transakčního protokolu. Transakční protokol zaznamenává změny provedené v tabulce, přičemž uvádí časové razítko a číslo verze pro každou transakci. Tato data protokolované verze můžete použít k zobrazení předchozích verzí tabulky – funkce označovaná jako časová cesta.

Data z konkrétní verze tabulky Delta Lake můžete načíst načtením dat z umístění tabulky Delta do datového rámce a určením požadované verze jako versionAsOf možnosti:

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)

Případně můžete pomocí možnosti zadat časové razítko timestampAsOf :

df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_table_path)