Referenční dokumentace jazyka Python pro rozdílové živé tabulky
Tento článek obsahuje podrobnosti o programovacím rozhraní Delta Live Tables Python.
Informace o rozhraní SQL API najdete v referenčních informacích k jazyku SQL Delta Live Tables.
Podrobnosti týkající se konfigurace automatického zavaděče najdete v tématu Co je automatický zavaděč?.
Než začnete
Při implementaci kanálů s rozhraním Delta Live Tables Python je důležité vzít v úvahu následující skutečnosti:
- Vzhledem k tomu, že Python
table()
aview()
funkce se během plánování a spuštění aktualizace kanálu vyvolávají vícekrát, nezahrnujte do jedné z těchto funkcí kód, který může mít vedlejší účinky (například kód, který upravuje data nebo odesílá e-mail). Aby nedocházelo k neočekávanému chování, měly by funkce Pythonu, které definují datové sady, obsahovat pouze kód potřebný k definování tabulky nebo zobrazení. - K provádění operací, jako je odesílání e-mailů nebo integrace s externí monitorovací službou, zejména ve funkcích, které definují datové sady, použijte háky událostí. Implementace těchto operací ve funkcích, které definují datové sady, způsobí neočekávané chování.
table
Python aview
funkce musí vracet datový rámec. Některé funkce, které pracují s datovými rámci, nevrací datové rámce a neměly by se používat. Mezi tyto operace patří funkce, jakocollect()
jsou ,count()
,toPandas()
,save()
asaveAsTable()
. Vzhledem k tomu, že transformace datového rámce se provádějí po vyřešení celého grafu toku dat, použití těchto operací může mít nežádoucí vedlejší účinky.
Import modulu Pythonu dlt
Funkce Pythonu Delta Live Tables jsou definovány v dlt
modulu. Vaše kanály implementované pomocí rozhraní Python API musí importovat tento modul:
import dlt
Vytvoření materializovaného zobrazení nebo streamované tabulky Delta Live Tables
Rozdílové živé tabulky v Pythonu určují, jestli se má datová sada aktualizovat jako materializovaná nebo streamovaná tabulka na základě definovaného dotazu. Dekorátor @table
lze použít k definování materializovaných zobrazení i streamovaných tabulek.
Pokud chcete definovat materializované zobrazení v Pythonu, použijte @table
dotaz, který provádí statické čtení proti zdroji dat. Pokud chcete definovat streamovací tabulku, použijte @table
dotaz, který provádí čtení streamování vůči zdroji dat nebo používá funkci create_streaming_table(). Oba typy datových sad mají stejnou specifikaci syntaxe:
Poznámka:
Pokud chcete tento argument použít cluster_by
k povolení clusteringu liquid, musí být kanál nakonfigurovaný tak, aby používal kanál preview.
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Vytvoření zobrazení Delta Live Tables
Pokud chcete definovat zobrazení v Pythonu @view
, použijte dekorátor. @table
Podobně jako dekorátor můžete pro statické datové sady nebo streamované datové sady použít zobrazení v dynamických tabulkách Delta. Následuje syntaxe pro definování zobrazení pomocí Pythonu:
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Příklad: Definování tabulek a zobrazení
Pokud chcete definovat tabulku nebo zobrazení v Pythonu, použijte @dlt.view
u funkce dekorátor nebo @dlt.table
dekorátor. Název funkce nebo name
parametr můžete použít k přiřazení názvu tabulky nebo zobrazení. Následující příklad definuje dvě různé datové sady: zobrazení označované taxi_raw
jako vstupní zdroj přebírá soubor JSON a tabulku, filtered_data
která přebírá taxi_raw
zobrazení jako vstup:
import dlt
@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return spark.read.table("LIVE.taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return spark.read.table("LIVE.taxi_raw").where(...)
Příklad: Přístup k datové sadě definované ve stejném kanálu
Poznámka:
I když jsou dlt.read()
funkce dlt.read_stream()
stále dostupné a plně podporované rozhraním Delta Live Tables Python, Databricks doporučuje vždy používat funkce spark.read.table()
a spark.readStream.table()
funkce z následujících důvodů:
- Funkce
spark
podporují čtení interních a externích datových sad, včetně datových sad v externím úložišti nebo definovaných v jiných kanálech. Funkcedlt
podporují jen čtení interních datových sad. - Funkce
spark
podporují zadávání možností, napříkladskipChangeCommits
operací čtení. Funkce nepodporujídlt
zadávání možností.
Pokud chcete získat přístup k datové sadě definované ve stejném kanálu, použijte spark.read.table()
před název datové sady klíčové slovo nebo spark.readStream.table()
funkce LIVE
:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredA():
return spark.read.table("LIVE.customers_raw").where(...)
Příklad: Čtení z tabulky zaregistrované v metastoru
Pokud chcete číst data z tabulky zaregistrované v metastoru Hive, v argumentu funkce vynechat LIVE
klíčové slovo a volitelně kvalifikovat název tabulky s názvem databáze:
@dlt.table
def customers():
return spark.read.table("sales.customers").where(...)
Příklad čtení z tabulky katalogu Unity najdete v tématu Ingestování dat do kanálu katalogu Unity.
Příklad: Přístup k datové sadě pomocí spark.sql
Datovou sadu můžete vrátit také pomocí výrazu spark.sql
ve funkci dotazu. Pokud chcete číst z interní datové sady, předejděte LIVE.
název datové sady:
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")
Vytvoření tabulky, která se použije jako cíl operací streamování
create_streaming_table()
Pomocí funkce můžete vytvořit cílovou tabulku pro výstup záznamů podle operací streamování, včetně apply_changes(), apply_changes_from_snapshot() a @append_flow výstupních záznamů.
Poznámka:
Funkce create_target_table()
jsou create_streaming_live_table()
zastaralé. Databricks doporučuje aktualizovat existující kód tak, aby funkci používal create_streaming_table()
.
Poznámka:
Pokud chcete tento argument použít cluster_by
k povolení clusteringu liquid, musí být kanál nakonfigurovaný tak, aby používal kanál preview.
create_streaming_table(
name = "<table-name>",
comment = "<comment>"
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
path="<storage-location-path>",
schema="schema-definition",
expect_all = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
row_filter = "row-filter-clause"
)
Argumenty |
---|
name Typ: str Název tabulky. Tento parametr je povinný. |
comment Typ: str Volitelný popis tabulky. |
spark_conf Typ: dict Volitelný seznam konfigurací Sparku pro spuštění tohoto dotazu. |
table_properties Typ: dict Volitelný seznam vlastností tabulky pro tabulku. |
partition_cols Typ: array Volitelný seznam jednoho nebo více sloupců, které se mají použít k dělení tabulky. |
cluster_by Typ: array Volitelně můžete povolit clustering liquid v tabulce a definovat sloupce, které se mají použít jako klíče clusteringu. Viz Použití liquid clusteringu pro tabulky Delta. |
path Typ: str Volitelné umístění úložiště pro data tabulky. Pokud není nastavená, systém ve výchozím nastavení nastaví umístění úložiště kanálu. |
schema Typ: str nebo StructType Volitelná definice schématu pro tabulku. Schémata je možné definovat jako řetězec DDL SQL nebo pomocí Pythonu. StructType . |
expect_all expect_all_or_drop expect_all_or_fail Typ: dict Volitelná omezení kvality dat pro tabulku Podívejte se na několik očekávání. |
row_filter (Public Preview)Typ: str Volitelná klauzule filtru řádků pro tabulku. Viz Publikování tabulek s filtry řádků a maskami sloupců. |
Řízení způsobu materializace tabulek
Tabulky také nabízejí další kontrolu nad jejich materializací:
- Určete, jak se tabulky rozdělují pomocí
partition_cols
. K urychlení dotazů můžete použít dělení. - Vlastnosti tabulky můžete nastavit při definování zobrazení nebo tabulky. Viz Vlastnosti tabulky Delta Live Tables.
- Nastavte umístění úložiště pro data tabulky pomocí
path
nastavení. Ve výchozím nastavení se data tabulky ukládají v umístění úložiště kanálu, pokudpath
nejsou nastavená. - V definici schématu můžete použít vygenerované sloupce . Viz příklad: Zadejte schéma a sloupce oddílů.
Poznámka:
U tabulek, které mají velikost menší než 1 TB, databricks doporučuje řídit organizaci dat Delta Live Tables. Sloupce oddílů byste neměli zadávat, pokud neočekáváte, že se tabulka rozšíří nad terabajt.
Příklad: Zadání schématu a sloupců oddílů
Volitelně můžete zadat schéma tabulky pomocí Pythonu StructType
nebo řetězce SQL DDL. Při zadání pomocí řetězce DDL může definice obsahovat vygenerované sloupce.
Následující příklad vytvoří tabulku volanou sales
se schématem zadaným pomocí Pythonu StructType
:
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
Následující příklad určuje schéma tabulky pomocí řetězce DDL, definuje vygenerovaný sloupec a definuje sloupec oddílu:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
Delta Live Tables ve výchozím nastavení odvodí schéma z table
definice, pokud nezadáte schéma.
Konfigurace streamované tabulky tak, aby ignorovala změny ve zdrojové streamovací tabulce
Poznámka:
- Příznak
skipChangeCommits
funguje jenom sspark.readStream
použitímoption()
funkce. Tento příznak nelze použít vedlt.read_stream()
funkci. - Příznak nelze použít
skipChangeCommits
, pokud je zdrojová tabulka streamování definována jako cíl funkce apply_changes().
Ve výchozím nastavení streamované tabulky vyžadují zdroje jen pro připojení. Pokud streamovací tabulka používá jako zdroj jinou streamovací tabulku a zdrojová streamovací tabulka vyžaduje aktualizace nebo odstranění, například zpracování gdpr "právo na zapomenutí", skipChangeCommits
může být příznak nastaven při čtení zdrojové tabulky streamování, aby tyto změny ignoroval. Další informace o tomto příznaku naleznete v tématu Ignorovat aktualizace a odstranění.
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")
Příklad: Definování omezení tabulky
Důležité
Omezení tabulky jsou ve verzi Public Preview.
Při zadávání schématu můžete definovat primární a cizí klíče. Omezení jsou informativní a nevynucují se. Viz klauzule CONSTRAINT v referenční dokumentaci jazyka SQL.
Následující příklad definuje tabulku s omezením primárního a cizího klíče:
@dlt.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
"""
def sales():
return ("...")
Příklad: Definování filtru řádků a masky sloupců
Důležité
Filtry řádků a masky sloupců jsou ve verzi Public Preview.
Pokud chcete vytvořit materializované zobrazení nebo tabulku Streamování s filtrem řádků a maskou sloupců, použijte klauzuli ROW FILTER a klauzuli MASK. Následující příklad ukazuje, jak definovat materializované zobrazení a streamovací tabulku s filtrem řádků i maskou sloupce:
@dlt.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
return ("...")
Další informace o filtrech řádků a maskách sloupců najdete v tématu Publikování tabulek s filtry řádků a maskami sloupců.
Python Delta Live Tables – vlastnosti
Následující tabulky popisují možnosti a vlastnosti, které můžete zadat při definování tabulek a zobrazení pomocí delta živých tabulek:
Poznámka:
Pokud chcete tento argument použít cluster_by
k povolení clusteringu liquid, musí být kanál nakonfigurovaný tak, aby používal kanál preview.
@table nebo @view |
---|
name Typ: str Volitelný název tabulky nebo zobrazení. Pokud není definován, název funkce se použije jako název tabulky nebo zobrazení. |
comment Typ: str Volitelný popis tabulky. |
spark_conf Typ: dict Volitelný seznam konfigurací Sparku pro spuštění tohoto dotazu. |
table_properties Typ: dict Volitelný seznam vlastností tabulky pro tabulku. |
path Typ: str Volitelné umístění úložiště pro data tabulky. Pokud není nastavená, systém ve výchozím nastavení nastaví umístění úložiště kanálu. |
partition_cols Typ: a collection of str Volitelná kolekce, list například jeden nebo více sloupců, které se mají použít k dělení tabulky. |
cluster_by Typ: array Volitelně můžete povolit clustering liquid v tabulce a definovat sloupce, které se mají použít jako klíče clusteringu. Viz Použití liquid clusteringu pro tabulky Delta. |
schema Typ: str nebo StructType Volitelná definice schématu pro tabulku. Schémata lze definovat jako řetězec DDL SQL nebo pomocí Pythonu StructType . |
temporary Typ: bool Vytvořte tabulku, ale nepublikujte metadata pro tabulku. Klíčové temporary slovo dává Delta Live Tables pokyn k vytvoření tabulky, která je k dispozici pro kanál, ale neměla by být přístupná mimo kanál. Aby se zkrátila doba zpracování, dočasná tabulka se zachová po celou dobu životnosti kanálu, který ho vytvoří, a ne jenom jednu aktualizaci.Výchozí hodnota je False. |
row_filter (Public Preview)Typ: str Volitelná klauzule filtru řádků pro tabulku. Viz Publikování tabulek s filtry řádků a maskami sloupců. |
Definice tabulky nebo zobrazení |
---|
def <function-name>() Funkce Pythonu, která definuje datovou sadu. name Pokud parametr není nastavený, <function-name> použije se jako název cílové datové sady. |
query Příkaz Spark SQL, který vrací datový rámec Spark Dataset nebo Koalas. Použijte dlt.read() nebo spark.read.table() proveďte úplné čtení z datové sady definované ve stejném kanálu. Pokud chcete číst externí datovou sadu, použijte spark.read.table() funkci. Nelze použít dlt.read() ke čtení externích datových sad. Protože spark.read.table() se dá použít ke čtení interních datových sad, datových sad definovaných mimo aktuální kanál a umožňuje zadat možnosti pro čtení dat, doporučuje databricks místo funkce používat dlt.read() .Pokud funkci použijete spark.read.table() ke čtení z datové sady definované ve stejném kanálu, předpřipravení LIVE klíčového slova na název datové sady v argumentu funkce. Pokud chcete například číst z datové sady s názvem customers :spark.read.table("LIVE.customers") Funkci můžete také použít spark.read.table() ke čtení z tabulky registrované v metastoru tak, že vynecháte LIVE klíčové slovo a volitelně kvalifikujete název tabulky s názvem databáze:spark.read.table("sales.customers") Použijte dlt.read_stream() nebo spark.readStream.table() proveďte streamování čtení z datové sady definované ve stejném kanálu. Pokud chcete provést streamování přečtené z externí datové sady, použijtespark.readStream.table() funkce. Protože spark.readStream.table() se dá použít ke čtení interních datových sad, datových sad definovaných mimo aktuální kanál a umožňuje zadat možnosti pro čtení dat, doporučuje databricks místo funkce používat dlt.read_stream() .Pokud chcete definovat dotaz ve funkci Delta Live Tables table pomocí syntaxe SQL, použijte spark.sql funkci. Příklad: Přístup k datové sadě pomocí spark.sql Pokud chcete definovat dotaz ve funkci Delta Live Tables table pomocí Pythonu, použijte syntaxi PySpark . |
Očekávání |
---|
@expect("description", "constraint") Deklarace omezení kvality dat identifikovaných description . Pokud řádek porušuje očekávání, zahrňte řádek do cílové datové sady. |
@expect_or_drop("description", "constraint") Deklarace omezení kvality dat identifikovaných description . Pokud řádek porušuje očekávání, odstraňte řádek z cílové datové sady. |
@expect_or_fail("description", "constraint") Deklarace omezení kvality dat identifikovaných description . Pokud řádek porušuje očekávání, okamžitě zastavte provádění. |
@expect_all(expectations) Deklarujte jedno nebo více omezení kvality dat. expectations je slovník Pythonu, kde klíč představuje očekávaný popis a hodnota je očekávané omezení. Pokud řádek porušuje jakékoli očekávání, zahrňte řádek do cílové datové sady. |
@expect_all_or_drop(expectations) Deklarujte jedno nebo více omezení kvality dat. expectations je slovník Pythonu, kde klíč představuje očekávaný popis a hodnota je očekávané omezení. Pokud řádek porušuje jakékoli očekávání, odstraňte řádek z cílové datové sady. |
@expect_all_or_fail(expectations) Deklarujte jedno nebo více omezení kvality dat. expectations je slovník Pythonu, kde klíč představuje očekávaný popis a hodnota je očekávané omezení. Pokud řádek porušuje jakékoli očekávání, okamžitě zastavte provádění. |
Změna zachytávání dat z kanálu změn pomocí Pythonu v rozdílových živých tabulkách
apply_changes()
Pomocí funkce v rozhraní Python API můžete pomocí funkce cdC (Delta Live Tables Change Data Capture) zpracovávat zdrojová data z datového kanálu změn (CDF).
Důležité
Pokud chcete použít změny, musíte deklarovat cílovou streamovací tabulku. Volitelně můžete zadat schéma cílové tabulky. Při zadávání schématu apply_changes()
cílové tabulky musíte zahrnout sloupce __START_AT
se __END_AT
stejným datovým typem jako sequence_by
pole.
K vytvoření požadované cílové tabulky můžete použít funkci create_streaming_table() v rozhraní Pythonu Delta Live Tables.
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Poznámka:
Pro APPLY CHANGES
zpracování je výchozím chováním INSERT
a UPDATE
událostmi upsertovat události CDC ze zdroje: aktualizujte všechny řádky v cílové tabulce, které odpovídají zadaným klíčům, nebo vložte nový řádek, pokud v cílové tabulce neexistuje odpovídající záznam. Zpracování událostí DELETE
lze zadat pomocí APPLY AS DELETE WHEN
podmínky.
Další informace o zpracování CDC pomocí kanálu změn najdete v tématu Rozhraní API APPLY CHANGES: Zjednodušení zachytávání dat změn pomocí rozdílových živých tabulek. Příklad použití apply_changes()
funkce, viz Příklad: SCD typ 1 a SCD typ 2 zpracování se zdrojovými daty CDF.
Důležité
Pokud chcete použít změny, musíte deklarovat cílovou streamovací tabulku. Volitelně můžete zadat schéma cílové tabulky. Při zadávání schématu apply_changes
cílové tabulky musíte zahrnout sloupce __START_AT
se __END_AT
stejným datovým typem jako sequence_by
pole.
Viz rozhraní API APPLY CHANGES: Zjednodušení zachytávání dat změn pomocí rozdílových živých tabulek.
Argumenty |
---|
target Typ: str Název tabulky, která se má aktualizovat. Před spuštěním apply_changes() funkce můžete pomocí funkce create_streaming_table() vytvořit cílovou tabulku.Tento parametr je povinný. |
source Typ: str Zdroj dat obsahující záznamy CDC. Tento parametr je povinný. |
keys Typ: list Sloupec nebo kombinace sloupců, které jednoznačně identifikují řádek ve zdrojových datech. Slouží k identifikaci událostí CDC, které se vztahují na konkrétní záznamy v cílové tabulce. Můžete zadat jednu z těchto: – Seznam řetězců: ["userId", "orderId"] – Seznam funkcí Spark SQL col() : [col("userId"), col("orderId"] Argumenty funkcí col() nemohou obsahovat kvalifikátory. Můžete například použít col(userId) , ale nemůžete použít col(source.userId) .Tento parametr je povinný. |
sequence_by Typ: str nebo col() Název sloupce určující logické pořadí událostí CDC ve zdrojových datech. Delta Live Tables používá toto sekvencování ke zpracování událostí změn, které přicházejí mimo pořadí. Můžete zadat jednu z těchto: – Řetězec: "sequenceNum" – Funkce Spark SQL col() : col("sequenceNum") Argumenty funkcí col() nemohou obsahovat kvalifikátory. Můžete například použít col(userId) , ale nemůžete použít col(source.userId) .Zadaný sloupec musí být seřazený datový typ. Tento parametr je povinný. |
ignore_null_updates Typ: bool Povolit ingestování aktualizací obsahujících podmnožinu cílových sloupců Když událost CDC odpovídá existujícímu řádku a ignore_null_updates je True , sloupce se null zachovají své stávající hodnoty v cíli. To platí také pro vnořené sloupce s hodnotou null . Pokud ignore_null_updates je hodnota False , existující hodnoty se přepíšou null hodnotami.Tento parametr je volitelný. Výchozí hodnota je False . |
apply_as_deletes Typ: str nebo expr() Určuje, kdy se má událost CDC považovat za událost DELETE , nikoli jako upsert. Aby bylo možné zpracovat data mimo pořadí, odstraněný řádek se dočasně zachová jako náhrobek v podkladové tabulce Delta a v metastoru se vytvoří zobrazení, které vyfiltruje tyto náhrobky. Interval uchovávání informací je možné nakonfigurovat pomocípipelines.cdc.tombstoneGCThresholdInSeconds vlastnost table.Můžete zadat jednu z těchto: – Řetězec: "Operation = 'DELETE'" – Funkce Spark SQL expr() : expr("Operation = 'DELETE'") Tento parametr je volitelný. |
apply_as_truncates Typ: str nebo expr() Určuje, kdy má být událost CDC považována za úplnou tabulku TRUNCATE . Vzhledem k tomu, že tato klauzule aktivuje úplné zkrácení cílové tabulky, měla by být použita pouze pro konkrétní případy použití vyžadující tuto funkci.Parametr apply_as_truncates je podporován pouze pro SCD typu 1. ScD typu 2 nepodporuje operace zkrácení.Můžete zadat jednu z těchto: – Řetězec: "Operation = 'TRUNCATE'" – Funkce Spark SQL expr() : expr("Operation = 'TRUNCATE'") Tento parametr je volitelný. |
column_list except_column_list Typ: list Podmnožina sloupců, které se mají zahrnout do cílové tabulky. Slouží column_list k určení kompletního seznamu sloupců, které se mají zahrnout. Slouží except_column_list k určení sloupců, které chcete vyloučit. Můžete deklarovat hodnotu jako seznam řetězců nebo jako funkce Spark SQL col() :- column_list = ["userId", "name", "city"] .- column_list = [col("userId"), col("name"), col("city")] - except_column_list = ["operation", "sequenceNum"] - except_column_list = [col("operation"), col("sequenceNum") Argumenty funkcí col() nemohou obsahovat kvalifikátory. Můžete například použít col(userId) , ale nemůžete použít col(source.userId) .Tento parametr je volitelný. Výchozí hodnota je zahrnout všechny sloupce v cílové tabulce, pokud funkci nepředáte žádný column_list argument nebo except_column_list argument. |
stored_as_scd_type Typ: str nebo int Určuje, zda se mají ukládat záznamy jako SCD typu 1 nebo SCD typu 2. Nastavte na 1 typ SCD 1 nebo 2 pro SCD typ 2.Tato klauzule je nepovinná. Výchozí hodnota je SCD typu 1. |
track_history_column_list track_history_except_column_list Typ: list Podmnožina výstupních sloupců, které se mají sledovat pro historii v cílové tabulce. Slouží track_history_column_list k určení úplného seznamu sloupců, které se mají sledovat. Používánítrack_history_except_column_list určit sloupce, které mají být vyloučeny ze sledování. Můžete deklarovat hodnotu jako seznam řetězců nebo jako funkce Spark SQL col() :- track_history_column_list = ["userId", "name", "city"] .- track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") Argumenty funkcí col() nemohou obsahovat kvalifikátory. Můžete například použít col(userId) , ale nemůžete použít col(source.userId) .Tento parametr je volitelný. Výchozí hodnota je zahrnout všechny sloupce v cílové tabulce, pokud ne nebo ne track_history_column_list .track_history_except_column_list funkce je předána argumentu. |
Změna zachytávání dat ze snímků databáze pomocí Pythonu v rozdílových živých tabulkách
Důležité
Rozhraní APPLY CHANGES FROM SNAPSHOT
API je ve verzi Public Preview.
apply_changes_from_snapshot()
Pomocí funkce v rozhraní Python API můžete ke zpracování zdrojových dat ze snímků databáze použít funkci cdC (Delta Live Tables Change Data Capture).
Důležité
Pokud chcete použít změny, musíte deklarovat cílovou streamovací tabulku. Volitelně můžete zadat schéma cílové tabulky. Při zadávání schématu apply_changes_from_snapshot()
cílové tabulky musíte zahrnout __START_AT
__END_AT
i sloupce se stejným datovým typem jako sequence_by
pole.
K vytvoření požadované cílové tabulky můžete použít funkci create_streaming_table() v rozhraní Pythonu Delta Live Tables.
apply_changes_from_snapshot(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
) -> None
Poznámka:
Při APPLY CHANGES FROM SNAPSHOT
zpracování je výchozím chováním vložit nový řádek, pokud v cíli neexistuje odpovídající záznam se stejnými klíči. Pokud existuje odpovídající záznam, aktualizuje se pouze v případě, že se změnily některé hodnoty v řádku. Řádky s klíči, které jsou přítomné v cíli, ale již nejsou ve zdroji, se odstraní.
Další informace o zpracování CDC pomocí snímků najdete v tématu Rozhraní API APPLY CHANGES: Zjednodušení zachytávání dat změn pomocí rozdílových živých tabulek. Příklady použití funkce najdete v pravidelných příkladech příjmu apply_changes_from_snapshot()
snímků a historických příkladů příjmu snímků.
Argumenty |
---|
target Typ: str Název tabulky, která se má aktualizovat. Před spuštěním apply_changes() funkce můžete pomocí funkce create_streaming_table() vytvořit cílovou tabulku.Tento parametr je povinný. |
source Typ: str nebo lambda function Název tabulky nebo zobrazení pro pravidelné snímky nebo funkci lambda Pythonu, která vrací datový rámec snímku, který se má zpracovat, a verzi snímku. Viz Implementace zdrojového argumentu. Tento parametr je povinný. |
keys Typ: list Sloupec nebo kombinace sloupců, které jednoznačně identifikují řádek ve zdrojových datech. Slouží k identifikaci událostí CDC, které se vztahují na konkrétní záznamy v cílové tabulce. Můžete zadat jednu z těchto: – Seznam řetězců: ["userId", "orderId"] – Seznam funkcí Spark SQL col() : [col("userId"), col("orderId"] Argumenty funkcí col() nemohou obsahovat kvalifikátory. Můžete například použít col(userId) , ale nemůžete použít col(source.userId) .Tento parametr je povinný. |
stored_as_scd_type Typ: str nebo int Určuje, zda se mají ukládat záznamy jako SCD typu 1 nebo SCD typu 2. Nastavte na 1 typ SCD 1 nebo 2 pro SCD typ 2.Tato klauzule je nepovinná. Výchozí hodnota je SCD typu 1. |
track_history_column_list track_history_except_column_list Typ: list Podmnožina výstupních sloupců, které se mají sledovat pro historii v cílové tabulce. Slouží track_history_column_list k určení úplného seznamu sloupců, které se mají sledovat. Používánítrack_history_except_column_list určit sloupce, které mají být vyloučeny ze sledování. Můžete deklarovat hodnotu jako seznam řetězců nebo jako funkce Spark SQL col() :- track_history_column_list = ["userId", "name", "city"] .- track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") Argumenty funkcí col() nemohou obsahovat kvalifikátory. Můžete například použít col(userId) , ale nemůžete použít col(source.userId) .Tento parametr je volitelný. Výchozí hodnota je zahrnout všechny sloupce v cílové tabulce, pokud ne nebo ne track_history_column_list .track_history_except_column_list funkce je předána argumentu. |
Implementace argumentu source
Funkce apply_changes_from_snapshot()
obsahuje source
argument. Pro zpracování historických snímků se očekává, že argumentem je funkce lambda Pythonu, která vrací dvě hodnoty apply_changes_from_snapshot()
funkce: datový rámec Pythonu obsahující data snímku, source
která se mají zpracovat, a verzi snímku.
Následuje podpis funkce lambda:
lambda Any => Optional[(DataFrame, Any)]
- Argumentem funkce lambda je nejnovější zpracovaná verze snímku.
- Návratová hodnota funkce lambda je
None
nebo řazená kolekce členů dvou hodnot: První hodnota řazené kolekce členů je datový rámec obsahující snímek, který se má zpracovat. Druhá hodnota řazené kolekce členů je verze snímku, která představuje logické pořadí snímku.
Příklad, který implementuje a volá funkci lambda:
def next_snapshot_and_version(latest_snapshot_version):
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
apply_changes_from_snapshot(
# ...
source = next_snapshot_and_version,
# ...
)
Modul runtime Delta Live Tables provede následující kroky pokaždé, když se aktivuje kanál obsahující apply_changes_from_snapshot()
funkci:
next_snapshot_and_version
Spustí funkci, která načte další datový rámec snímku a odpovídající verzi snímku.- Pokud se žádný datový rámec nevrátí, spuštění se ukončí a aktualizace kanálu se označí jako dokončená.
- Rozpozná změny v novém snímku a postupně je použije na cílovou tabulku.
- Vrátí krok 1, aby se načetl další snímek a jeho verze.
Omezení
Rozhraní Pythonu Delta Live Tables má následující omezení:
Funkce pivot()
není podporována. Operace pivot
ve Sparku vyžaduje dychtivé načítání vstupních dat pro výpočet výstupního schématu. Tato funkce není v dynamických tabulkách Delta podporovaná.