DLT Python-språkreferens
Den här artikeln innehåller information om programmeringsgränssnittet för DLT Python.
Information om SQL-API:et finns i DLT SQL-språkreferensen.
Mer information om hur du konfigurerar automatisk inläsning finns i Vad är automatisk inläsning?.
Innan du börjar
Följande är viktiga överväganden när du implementerar pipelines med DLT Python-gränssnittet:
- Eftersom Funktionerna Python
table()
ochview()
anropas flera gånger under planeringen och körningen av en pipelineuppdatering ska du inte inkludera kod i någon av dessa funktioner som kan ha biverkningar (till exempel kod som ändrar data eller skickar ett e-postmeddelande). För att undvika oväntat beteende bör dina Python-funktioner som definierar datauppsättningar endast innehålla den kod som krävs för att definiera tabellen eller vyn. - Om du vill utföra åtgärder som att skicka e-post eller integrera med en extern övervakningstjänst, särskilt i funktioner som definierar datauppsättningar, använder du händelsekrokar. Om du implementerar dessa åtgärder i de funktioner som definierar dina datauppsättningar kan det orsaka oväntat beteende.
- Funktionerna Python
table
ochview
måste returnera en DataFrame. Vissa funktioner som körs på DataFrames returnerar inte DataFrames och bör inte användas. Dessa åtgärder omfattar funktioner somcollect()
,count()
,toPandas()
,save()
ochsaveAsTable()
. Eftersom DataFrame-transformeringar körs efter det fullständiga dataflödesdiagrammet har lösts, kan användning av sådana åtgärder ha oavsiktliga biverkningar.
Importera dlt
Python-modulen
DLT Python-funktioner definieras i modulen dlt
. Dina pipelines som implementeras med Python-API:et måste importera den här modulen:
import dlt
Skapa en materialiserad DLT-vy eller strömningstabell
I Python avgör DLT om en datauppsättning ska uppdateras som en materialiserad vy eller en strömmande tabell baserat på den definierande frågan. Den @table
dekoratören kan användas för att definiera både materialiserade vyer och strömmande tabeller.
Om du vill definiera en materialiserad vy i Python använder du @table
på en fråga som utför en statisk läsning mot en datakälla. Om du vill definiera en strömningstabell använder du @table
på en fråga som utför en direktuppspelningsläsning mot en datakälla eller använder funktionen create_streaming_table(). Båda datauppsättningstyperna har samma syntaxspecifikation enligt följande:
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Skapa en DLT-vy
Om du vill definiera en vy i Python använder du @view
dekoratör. Precis som @table
dekoratör kan du använda vyer i DLT för antingen statiska eller strömmande datauppsättningar. Följande är syntaxen för att definiera vyer med Python:
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Exempel: Definiera tabeller och vyer
Om du vill definiera en tabell eller vy i Python använder du @dlt.view
eller @dlt.table
dekoratör på en funktion. Du kan använda funktionsnamnet eller parametern name
för att tilldela tabellen eller visningsnamnet. I följande exempel definieras två olika datauppsättningar: en vy med namnet taxi_raw
som tar en JSON-fil som indatakälla och en tabell med namnet filtered_data
som tar taxi_raw
-vyn som indata:
import dlt
@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return spark.read.table("taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return spark.read.table("taxi_raw").where(...)
Exempel: Få åtkomst till en datauppsättning som definierats i samma pipeline
Not
Även om funktionerna dlt.read()
och dlt.read_stream()
fortfarande är tillgängliga och stöds fullt ut av DLT Python-gränssnittet rekommenderar Databricks att du alltid använder funktionerna spark.read.table()
och spark.readStream.table()
på grund av följande:
-
spark
-funktionerna stödjer läsning av interna och externa datamängder, inklusive datamängder i extern lagring eller definierade i andra pipelines. Funktionernadlt
stöder endast läsning av interna datamängder. - De
spark
funktionerna har stöd för att ange alternativ, till exempelskipChangeCommits
, för att läsa operationer. Att ange alternativ stöds inte avdlt
funktioner.
Om du vill komma åt en datauppsättning som definierats i samma pipeline använder du funktionerna spark.read.table()
eller spark.readStream.table()
:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredA():
return spark.read.table("customers_raw").where(...)
Not
När du kör frågor mot vyer eller tabeller i pipelinen kan du ange katalogen och schemat direkt, eller så kan du använda standardvärdena som konfigurerats i pipelinen. I det här exemplet skrivs och läss tabellen customers
från standardkatalogen och schemat som konfigurerats för din pipeline.
Exempel: Läsa från en tabell som är registrerad i ett metaarkiv
Om du vill läsa data från en tabell som är registrerad i Hive-metaarkivet kan du i funktionsargumentet kvalificera tabellnamnet med databasnamnet:
@dlt.table
def customers():
return spark.read.table("sales.customers").where(...)
Ett exempel på läsning från en Unity Catalog-tabell finns i Mata in data i en Unity Catalog-pipeline.
exempel: Få åtkomst till en datauppsättning med hjälp av spark.sql
Du kan också returnera en datauppsättning med hjälp av ett spark.sql
uttryck i en frågefunktion. Om du vill läsa från en intern datauppsättning kan du antingen låta namnet vara okvalificerat för att använda standardkatalogen och schemat, eller så kan du förbereda dem:
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM catalog_name.schema_name.customers_cleaned WHERE city = 'Chicago'")
Ta bort poster permanent från en materialiserad vy eller direktuppspelningstabell
Om du vill ta bort poster permanent från en materialiserad vy eller strömningstabell med borttagningsvektorer aktiverade, till exempel för GDPR-efterlevnad, måste ytterligare åtgärder utföras på objektets underliggande Delta-tabeller. Information om hur du tar bort poster från en materialiserad vy finns i Ta bort poster permanent från en materialiserad vy med borttagningsvektorer aktiverade. Information om hur du tar bort poster från en strömmande tabell finns i Ta bort poster permanent från en strömmande tabell.
Skriv till externa händelseströmningstjänster eller Delta-tabeller med DLT sink
API
Viktig
DLT sink
-API:et finns i offentlig förhandsversion.
Notera
- Om du kör en fullständig uppdatering rensas inte data från mottagare. Alla ombearbetade data läggs till i datamottagaren och befintliga data ändras inte.
- DLT-förväntningar stöds inte med api:et
sink
.
Om du vill skriva till en händelseströmningstjänst som Apache Kafka eller Azure Event Hubs eller till en Delta-tabell från en DLT-pipeline använder du funktionen create_sink()
som ingår i dlt
Python-modulen. När du har skapat en mottagare med funktionen create_sink()
använder du mottagaren i ett tilläggsflöde för att skriva data till mottagaren. tilläggsflöde är den enda flödestyp som stöds med funktionen create_sink()
. Andra flödestyper, till exempel apply_changes
, stöds inte.
Följande är syntaxen för att skapa en sink med create_sink()
-funktionen:
create_sink(<sink_name>, <format>, <options>)
Argumenten |
---|
name Typ: str En sträng som identifierar sänkan och används för att referera till och hantera sänkan. Sink-namn måste vara unika för pipelinen, inklusive i all källkod, såsom anteckningsböcker eller moduler som ingår i pipelinen. Den här parametern krävs. |
format Typ: str En sträng som definierar utdataformatet, antingen kafka eller delta .Den här parametern krävs. |
options Typ: dict En valfri lista med alternativ för mottagare, formaterad enligt {"key": "value"} , där både nyckeln och värdet är strängar. Alla Databricks Runtime-alternativ som stöds av Kafka- och Delta-mottagare stöds. För Kafka-alternativen, se Konfigurera Kafka Structured Streaming-skrivaren. För Delta-alternativ, se Delta-tabellen som en mottagarpunkt. |
Exempel: Skapa en Kafka-sink med funktionen create_sink()
create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
Exempel: Skapa en Delta-sänk med funktionen create_sink()
och en filsystemsökväg
I följande exempel skapas en mottagare som skriver till en Delta-tabell genom att ange filsystemsökvägen till tabellen.
create_sink(
"my_delta_sink",
"delta",
{ "path": "//path/to/my/delta/table" }
)
Exempel: Skapa en Delta-datasänka med funktionen create_sink()
och ett Unity Catalog-tabellnamn
Note
Delta-sinken har stöd för externa och hanterade tabeller i Unity Catalog och hanterade Hive metastore-tabeller. Namnen på tabeller måste vara fullständigt kvalificerade. Unity Catalog-tabeller måste till exempel använda en identifierare på tre nivåer: <catalog>.<schema>.<table>
. Hive-metaarkivtabeller måste använda <schema>.<table>
.
I följande exempel skapas en "sink" som skriver till en Delta-tabell genom att ange namnet på en tabell i Unity-katalogen.
create_sink(
"my_delta_sink",
"delta",
{ "tableName": "my_catalog.my_schema.my_table" }
)
Exempel: Använd ett tilläggsflöde för att skriva till en Delta-mottagare
I följande exempel skapas en sink som skriver till en Delta-tabell och sedan skapas ett appendflöde för att skriva till sinken.
create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})
@append_flow(name = "flow", target = "my_sink")
def flowFunc():
return <streaming-query>
Exempel: Använd ett append-flöde för att skriva till en Kafka-mottagare
I följande exempel skapas en datasänka som skrivs till ett Kafka-ämne och sedan skapas ett påläggsflöde för att skriva till denna datasänka:
create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
@append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))
Schemat för dataramen som skrivs till Kafka ska innehålla kolumnerna som anges i Konfigurera Kafka Structured Streaming-skrivaren.
Skapa en tabell som ska användas som mål för strömningsåtgärder
Använd funktionen create_streaming_table()
för att skapa en måltabell för utdataposter som genereras av strömningsåtgärder, inklusive apply_changes(), apply_changes_from_snapshot() och @append_flow.
Not
Funktionerna create_target_table()
och create_streaming_live_table()
är inaktuella. Databricks rekommenderar att du uppdaterar befintlig kod för att använda funktionen create_streaming_table()
.
create_streaming_table(
name = "<table-name>",
comment = "<comment>",
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
path="<storage-location-path>",
schema="schema-definition",
expect_all = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
row_filter = "row-filter-clause"
)
Argument |
---|
name Typ: str Tabellnamnet. Den här parametern krävs. |
comment Typ: str En valfri beskrivning för tabellen. |
spark_conf Typ: dict En valfri lista över Spark-konfigurationer för körning av denna fråga. |
table_properties Typ: dict En valfri lista över tabellegenskaper för tabellen. |
partition_cols Typ: array En valfri lista över en eller flera kolumner som ska användas för partitionering av tabellen. |
cluster_by Typ: array Du kan också aktivera flytande klustring i tabellen och definiera de kolumner som ska användas som klustringsnycklar. Se Använd flytande klustring för Delta-tabeller. |
path Typ: str En valfri lagringsplats för tabelldata. Om det inte anges, använder systemet pipeline-lagringsplatsen som standard. |
schema Typ: str eller StructType En valfri schemadefinition för tabellen. Scheman kan definieras som en SQL DDL-sträng eller med en Python StructType . |
expect_all expect_all_or_drop expect_all_or_fail Typ: dict Valfria datakvalitetsbegränsningar för tabellen. Se flera olika förväntningar. |
row_filter (offentlig förhandsversion)Typ: str En valfri radfiltersats för tabellen. Se Publicera tabeller med radfilter och kolumnmasker. |
Kontrollera hur tabeller materialiseras
Tabeller ger också ytterligare kontroll över materialiseringen:
- Ange hur du klustrar tabeller med hjälp av
cluster_by
. Du kan använda flytande klustring för att påskynda databasfrågor. Se Använd flytande klustring för Delta-tabeller. - Ange hur tabeller partitioneras med hjälp av
partition_cols
. - Du kan ange tabellegenskaper när du definierar en vy eller tabell. Se DLT-tabellegenskaper.
- Ange en lagringsplats för tabelldata med inställningen
path
. Som standard lagras tabelldata på lagringsplatsen för pipelinen ompath
inte har angetts. - Du kan använda genererade kolumner i schemadefinitionen. Se Exempel: Ange ett schema och klusterkolumner.
Not
För tabeller som är mindre än 1 TB i storlek rekommenderar Databricks att DLT kontrollerar dataorganisationen. Du bör inte ange partitionskolumner om du inte förväntar dig att tabellen ska växa mer än en terabyte.
Exempel: Ange ett schema och klusterkolumner
Du kan också ange ett tabellschema med hjälp av en Python-StructType
eller en SQL DDL-sträng. När den anges med en DDL-sträng kan definitionen innehålla genererade kolumner.
I följande exempel skapas en tabell med namnet sales
med ett schema som anges med hjälp av en Python-StructType
:
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
I följande exempel anges schemat för en tabell med hjälp av en DDL-sträng, definierar en genererad kolumn och definierar klustringskolumner:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
cluster_by = ["order_day_of_week", "customer_id"])
def sales():
return ("...")
Som standard härleder DLT schemat från table
definition om du inte anger något schema.
exempel: Ange partitionskolumner
I följande exempel anges schemat för en tabell med hjälp av en DDL-sträng, definierar en genererad kolumn och definierar en partitionskolumn:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
Exempel: Definiera tabellbegränsningar
Viktig
Tabellbegränsningar finns i offentlig förhandsversion.
När du anger ett schema kan du definiera primära och externa nycklar. Begränsningarna är informationsmässiga och tillämpas inte. Se CONSTRAINT-satsen i SQL-språkreferensen.
I följande exempel definieras en tabell med en primär och sekundär nyckelbegränsning:
@dlt.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
"""
def sales():
return ("...")
Exempel: Definiera ett radfilter och en kolumnmask
Viktig
Radfilter och kolumnmasker finns i offentlig förhandsversion.
Om du vill skapa en materialiserad vy eller en strömningstabell med ett radfilter och en kolumnmask använder du ROW FILTER-satsen och MASK-satsen. I följande exempel visas hur du definierar en materialiserad vy och en strömningstabell med både ett radfilter och en kolumnmask:
@dlt.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
return ("...")
Mer information om radfilter och kolumnmasker finns i Publicera tabeller med radfilter och kolumnmasker.
Konfigurera en strömmande tabell för att ignorera ändringar i en källströmningstabell
Anteckning
- Flaggan
skipChangeCommits
fungerar bara medspark.readStream
med hjälp av funktionenoption()
. Du kan inte använda den här flaggan i endlt.read_stream()
funktion. - Du kan inte använda flaggan
skipChangeCommits
när källuppspelningstabellen definieras som mål för en apply_changes() funktion.
Standardmässigt kräver strömningstabeller endast tilläggskällor. När en strömmande tabell använder en annan strömmande tabell som källa, och källströmningstabellen kräver uppdateringar eller borttagningar, till exempel GDPR-bearbetning av "rätt att bli bortglömd", kan skipChangeCommits
-flaggan anges när du läser källströmningstabellen för att ignorera dessa ändringar. Mer information om den här flaggan finns i Ignorera uppdateringar och borttagningar.
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")
Python DLT-egenskaper
I följande tabeller beskrivs de alternativ och egenskaper som du kan ange när du definierar tabeller och vyer med DLT:
@table eller @view |
---|
name Typ: str Ett valfritt namn för tabellen eller vyn. Om det inte har definierats används funktionsnamnet som tabell- eller vynamn. |
comment Typ: str En valfri beskrivning för tabellen. |
spark_conf Typ: dict En valfri lista över Spark-konfigurationer för körning av denna fråga. |
table_properties Typ: dict En valfri lista över tabellegenskaper för tabellen. |
path Typ: str En valfri lagringsplats för tabelldata. Om det inte anges, använder systemet pipeline-lagringsplatsen som standard. |
partition_cols Typ: a collection of str En valfri samling, till exempel en list av en eller flera kolumner som ska användas för partitionering av tabellen. |
cluster_by Typ: array Du kan också aktivera flytande klustring i tabellen och definiera de kolumner som ska användas som klustringsnycklar. Se Använd flytande klustring för Delta-tabeller. |
schema Typ: str eller StructType En valfri schemadefinition för tabellen. Scheman kan definieras som en SQL DDL-sträng eller med ett Python- StructType . |
temporary Typ: bool Skapa en tabell, men publicera inte metadata för tabellen. Nyckelordet temporary instruerar DLT att skapa en tabell som är tillgänglig för pipelinen men som inte ska nås utanför pipelinen. För att förkorta bearbetningstiden finns en tillfällig tabell kvar under hela den pipeline som skapar den, och inte bara under en enskild uppdatering.Standardvärdet är "False". |
row_filter (offentlig förhandsversion)Typ: str En valfri radfiltersats för tabellen. Se Publicera tabeller med radfilter och kolumnmasker. |
Tabell- eller vydefinition |
---|
def <function-name>() En Python-funktion som definierar datauppsättningen. Om parametern name inte har angetts används <function-name> som måldatauppsättningsnamn. |
query En Spark SQL-instruktion som returnerar en Spark Dataset eller Koalas DataFrame. Använd dlt.read() eller spark.read.table() för att utföra en fullständig läsning från en datauppsättning som definierats i samma pipeline. Om du vill läsa en extern datauppsättning använder du funktionen spark.read.table() . Du kan inte använda dlt.read() för att läsa externa datauppsättningar. Eftersom spark.read.table() kan användas för att läsa interna datamängder, datauppsättningar som definierats utanför den aktuella pipelinen och gör att du kan ange alternativ för att läsa data rekommenderar Databricks att du använder dem i stället för funktionen dlt.read() .När du definierar en datauppsättning i en pipeline använder den som standard den katalog och det schema som definierats i pipelinekonfigurationen. Du kan använda funktionen spark.read.table() för att läsa från en datauppsättning som definierats i pipelinen utan kvalificering. Om du till exempel vill läsa från en datauppsättning med namnet customers :spark.read.table("customers") Du kan också använda funktionen spark.read.table() för att läsa från en tabell som är registrerad i metaarkivet genom att eventuellt kvalificera tabellnamnet med databasnamnet:spark.read.table("sales.customers") Använd dlt.read_stream() eller spark.readStream.table() för att utföra en direktuppspelning från en datauppsättning som definierats i samma pipeline. Om du vill utföra en direktuppspelningsläsning från en extern datauppsättning använder duspark.readStream.table() funktion Eftersom spark.readStream.table() kan användas för att läsa interna datamängder, datauppsättningar som definierats utanför den aktuella pipelinen och gör att du kan ange alternativ för att läsa data rekommenderar Databricks att du använder dem i stället för funktionen dlt.read_stream() .Om du vill definiera en fråga i en DLT- table -funktion med sql-syntax använder du funktionen spark.sql . Se Exempel: Få åtkomst till en datauppsättning med hjälp av spark.sql . Om du vill definiera en fråga i en DLT table -funktion med python använder du PySpark syntax. |
Förväntningar |
---|
@expect("description", "constraint") Deklarera en datakvalitetsbegränsning som identifieras av description . Om en rad bryter mot förväntningarna inkluderar du raden i måldatauppsättningen. |
@expect_or_drop("description", "constraint") Deklarera en datakvalitetsbegränsning som identifieras av description . Om en rad bryter mot förväntningarna släpper du raden från måldatauppsättningen. |
@expect_or_fail("description", "constraint") Deklarera en datakvalitetsbegränsning som identifieras av description . Om en rad bryter mot förväntningarna, stoppa körningen omedelbart. |
@expect_all(expectations) Deklarera en eller flera datakvalitetsbegränsningar. expectations är en Python-ordlista, där nyckeln är förväntansbeskrivningen och värdet är förväntansbegränsningen. Om en rad bryter mot någon av förväntningarna ska du inkludera raden i måldatauppsättningen. |
@expect_all_or_drop(expectations) Deklarera en eller flera datakvalitetsbegränsningar. expectations är en Python-ordlista, där nyckeln är förväntansbeskrivningen och värdet är förväntansbegränsningen. Om en rad bryter mot någon av förväntningarna släpper du raden från måldatauppsättningen. |
@expect_all_or_fail(expectations) Deklarera en eller flera datakvalitetsbegränsningar. expectations är en Python-ordlista, där nyckeln är förväntansbeskrivningen och värdet är förväntansbegränsningen. Om en rad bryter mot någon av förväntningarna stoppar du omedelbart körningen. |
Ändra datainsamling från en ändringsfeed med Python i DLT
Använd funktionen apply_changes()
i Python-API:et för att använda cdc-funktioner (DLT Change Data Capture) för att bearbeta källdata från en cdf (change data feed).
Viktig
Du måste deklarera en måltabell för strömning för att tillämpa ändringar i. Du kan också ange schemat för måltabellen. När du anger schemat för apply_changes()
måltabellen måste du inkludera kolumnerna __START_AT
och __END_AT
med samma datatyp som sequence_by
fälten.
Om du vill skapa den obligatoriska måltabellen kan du använda funktionen create_streaming_table() i DLT Python-gränssnittet.
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Obs
För APPLY CHANGES
bearbetning är standardbeteendet för INSERT
och UPDATE
händelser att uppdatera eller lägga till CDC-händelser från källan: uppdatera alla rader i måltabellen som matchar de angivna nycklarna eller lägga till en ny rad när en matchande post inte finns i måltabellen. Hantering av DELETE
händelser kan anges med villkoret APPLY AS DELETE WHEN
.
Mer information om CDC-bearbetning med ett ändringsflöde finns i APPLY CHANGES-API:erna: Förenkla förändringsdatafångst med DLT. Ett exempel på hur du använder funktionen apply_changes()
finns i Exempel: SCD-typ 1 och SCD typ 2-bearbetning med CDF-källdata.
Viktig
Du måste deklarera en måltabell för strömning för att tillämpa ändringar i. Du kan också ange schemat för måltabellen. När du anger apply_changes
måltabellschema måste du inkludera kolumnerna __START_AT
och __END_AT
med samma datatyp som fältet sequence_by
.
Se API:er för TILLÄMPA ÄNDRINGAR: Förenkla insamling av ändringsdata med DLT.
Argument |
---|
target Typ: str Namnet på tabellen som ska uppdateras. Du kan använda funktionen create_streaming_table() för att skapa måltabellen innan du kör funktionen apply_changes() .Den här parametern krävs. |
source Typ: str Datakällan som innehåller CDC-register. Den här parametern krävs. |
keys Typ: list Kolumnen eller kombinationen av kolumner som unikt identifierar en rad i källdata. Detta används för att identifiera vilka CDC-händelser som gäller för specifika poster i måltabellen. Du kan ange något av följande:
Argumenten till col() -funktioner får inte innehålla kvalificerare. Du kan till exempel använda col(userId) , men du kan inte använda col(source.userId) .Den här parametern krävs. |
sequence_by Typ: str eller col() Kolumnnamnet som anger den logiska ordningen för CDC-händelser i källdata. DLT använder den här sekvenseringen för att hantera ändringshändelser som kommer ur ordning. Du kan ange något av följande:
Argumenten till col() -funktioner får inte innehålla kvalificerare. Du kan till exempel använda col(userId) , men du kan inte använda col(source.userId) .Den angivna kolumnen måste vara en sorterbar datatyp. Den här parametern krävs. |
ignore_null_updates Typ: bool Tillåt inmatning av uppdateringar som innehåller en delmängd av målkolumnerna. När en CDC-händelse matchar en befintlig rad och ignore_null_updates är True behåller kolumner med en null sina befintliga värden i målet. Detta gäller även kapslade kolumner med värdet null . När ignore_null_updates är False skrivs befintliga värden över med null värden.Den här parametern är valfri. Standardvärdet är False . |
apply_as_deletes Typ: str eller expr() Anger när en CDC-händelse ska behandlas som en DELETE istället för en uppdatering. För att hantera oordnade data behålls den borttagna raden tillfälligt som en gravsten i den underliggande Delta-tabellen och en vy skapas i metaarkivet som filtrerar bort dessa gravstenar. Kvarhållningsintervallet kan konfigureras genom systemetpipelines.cdc.tombstoneGCThresholdInSeconds tabelegenskap.Du kan ange något av följande:
Den här parametern är valfri. |
apply_as_truncates Typ: str eller expr() Anger när en CDC-händelse ska behandlas som en fullständig tabell TRUNCATE . Eftersom den här satsen utlöser en fullständig borttagning av innehållet i måltabellen bör den endast användas för specifika situationer där denna funktionalitet är nödvändig.Parametern apply_as_truncates stöds endast för SCD-typ 1. SCD-typ 2 stöder inte trunkeringsåtgärder.Du kan ange något av följande:
Den här parametern är valfri. |
column_list except_column_list Typ: list En delmängd av kolumner som ska inkluderas i måltabellen. Använd column_list för att ange den fullständiga listan med kolumner som ska inkluderas. Använd except_column_list för att ange vilka kolumner som ska undantas. Du kan deklarera antingen värdet som en lista med strängar eller som Spark SQL col() funktioner:
Argumenten till col() -funktioner får inte innehålla kvalificerare. Du kan till exempel använda col(userId) , men du kan inte använda col(source.userId) .Den här parametern är valfri. Standardvärdet är att inkludera alla kolumner i måltabellen när inget column_list eller except_column_list argument skickas till funktionen. |
stored_as_scd_type Typ: str eller int Ska poster lagras som SCD-typ 1 eller SCD-typ 2? Ange till 1 för SCD typ 1 eller 2 för SCD typ 2.Den här satsen är valfri. Standardvärdet är SCD typ 1. |
track_history_column_list track_history_except_column_list Typ: list En delmängd av kolumner i utdata som ska loggas för historik i måltabellen. Använd track_history_column_list för att ange den fullständiga listan över kolumner som ska spåras. Användtrack_history_except_column_list för att ange vilka kolumner som ska undantas från spårning. Du kan deklarera antingen värdet som en lista med strängar eller som Spark SQL col() funktioner:
Argumenten till col() -funktioner får inte innehålla kvalificerare. Du kan till exempel använda col(userId) , men du kan inte använda col(source.userId) .Den här parametern är valfri. Standardvärdet är att inkludera alla kolumner i måltabellen när inga track_history_column_list eller liknande specificeras.track_history_except_column_list -argumentet skickas till funktionen. |
Ändra datainsamling från databasögonblicksbilder med Python i DLT
Viktig
APPLY CHANGES FROM SNAPSHOT
-API:et finns i offentlig förhandsversion.
Använd funktionen apply_changes_from_snapshot()
i Python API för att använda DLT Change Data Capture (CDC)-funktionalitet för att bearbeta källdata från databassnapshots.
Viktig
Du måste deklarera en måltabell för strömning för att tillämpa ändringar i. Du kan också ange schemat för måltabellen. När du anger schemat för apply_changes_from_snapshot()
måltabellen måste du även inkludera kolumnerna __START_AT
och __END_AT
med samma datatyp som fältet sequence_by
.
Om du vill skapa den obligatoriska måltabellen kan du använda funktionen create_streaming_table() i DLT Python-gränssnittet.
apply_changes_from_snapshot(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
) -> None
Not
För behandling av APPLY CHANGES FROM SNAPSHOT
är det standardbeteende att infoga en ny rad om en matchande post med samma nyckel/nycklar inte finns i måltabellen. Om det finns en matchande post uppdateras den endast om något av värdena på raden har ändrats. Rader med nyckelvärden som finns i måldatabasen men som inte längre finns i källan tas bort.
Mer information om CDC-bearbetning med ögonblicksbilder finns i API:erna APPLY CHANGES: Förenkla förändringsdatainhämtning med DLT. Exempel på hur du använder funktionen apply_changes_from_snapshot()
finns i periodisk inmatning av ögonblicksbilder och historisk inmatning av ögonblicksbilder exempel.
Argument |
---|
target Typ: str Namnet på tabellen som ska uppdateras. Du kan använda funktionen create_streaming_table() för att skapa måltabellen innan du kör funktionen apply_changes() .Den här parametern krävs. |
source Typ: str eller lambda function Antingen namnet på en tabell eller vy som ska ögonblicksbildas regelbundet eller en Python lambda-funktion som returnerar den ögonblicksbild av DataFrame som ska bearbetas och ögonblicksbildversionen. Referera till Implementera argumentet source .Den här parametern krävs. |
keys Typ: list Kolumnen eller kombinationen av kolumner som unikt identifierar en rad i källdata. Detta används för att identifiera vilka CDC-händelser som gäller för specifika poster i måltabellen. Du kan ange något av följande:
Argumenten till col() -funktioner får inte innehålla kvalificerare. Du kan till exempel använda col(userId) , men du kan inte använda col(source.userId) .Den här parametern krävs. |
stored_as_scd_type Typ: str eller int Om poster ska lagras som SCD-typ 1 eller SCD-typ 2. Ange till 1 för SCD typ 1 eller 2 för SCD typ 2.Den här satsen är valfri. Standardvärdet är SCD typ 1. |
track_history_column_list track_history_except_column_list Typ: list En delmängd av kolumner i utdata som ska spåras för att bevara historik i måltabellen. Använd track_history_column_list för att ange den fullständiga listan över kolumner som ska spåras. Användtrack_history_except_column_list för att ange vilka kolumner som ska undantas från spårning. Du kan deklarera antingen värdet som en lista med strängar eller som Spark SQL col() funktioner:
Argumenten till col() -funktioner får inte innehålla kvalificerare. Du kan till exempel använda col(userId) , men du kan inte använda col(source.userId) .Den här parametern är valfri. Standardvärdet är att inkludera alla kolumner i måltabellen när inga track_history_column_list eller liknande specificeras.track_history_except_column_list -argumentet skickas till funktionen. |
Implementera argumentet source
Funktionen apply_changes_from_snapshot()
innehåller argumentet source
. För bearbetning av historiska ögonblicksbilder förväntas argumentet source
vara en Python lambda-funktion som returnerar två värden till funktionen apply_changes_from_snapshot()
: en Python DataFrame som innehåller de ögonblicksbildsdata som ska bearbetas och en ögonblicksbildversion.
Följande är signaturen för lambda-funktionen:
lambda Any => Optional[(DataFrame, Any)]
- Argumentet till lambda-funktionen är den senast bearbetade ögonblicksbildversionen.
- Returvärdet för lambda-funktionen är
None
eller en tuppeln med två värden: Det första värdet för tuppeln är en DataFrame som innehåller ögonblicksbilden som ska bearbetas. Det andra värdet för tuppeln är den ögonblicksbildsversion som representerar den logiska ordningen av ögonblicksbilden.
Ett exempel som implementerar och anropar lambda-funktionen:
def next_snapshot_and_version(latest_snapshot_version):
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
apply_changes_from_snapshot(
# ...
source = next_snapshot_and_version,
# ...
)
DLT-körningen utför följande steg varje gång pipelinen som innehåller funktionen apply_changes_from_snapshot()
utlöses:
- Kör funktionen
next_snapshot_and_version
för att läsa in nästa DataFrame-ögonblicksbild och den motsvarande versionen av ögonblicksbilden. - Om ingen DataFrame returnerar avslutas körningen och pipelineuppdateringen markeras som slutförd.
- Identifierar ändringarna i den nya ögonblicksbilden och tillämpar dem stegvis på måltabellen.
- Återgår till steg 1 för att läsa in nästa ögonblicksbild och dess version.
begränsningar
DLT Python-gränssnittet har följande begränsning:
Funktionen pivot()
stöds inte. Den pivot
åtgärden i Spark kräver ivrig inläsning av indata för att beräkna utdataschemat. Den här funktionen stöds inte i DLT.