DLT Python-taal referentie
Dit artikel bevat informatie over de DLT Python-programmeerinterface.
Zie de DLT SQL-taalreferentievoor meer informatie over de SQL-API.
Voor details specifiek over het configureren van Auto Loader, zie Wat is Auto Loader?.
voordat u begint
Hier volgen belangrijke overwegingen bij het implementeren van pijplijnen met de DLT Python-interface:
- Omdat de Python-
table()
- enview()
-functies meerdere keren worden aangeroepen tijdens de planning en uitvoering van een pijplijnupdate, moet u geen code opnemen in een van deze functies die bijwerkingen kunnen hebben (bijvoorbeeld code die gegevens wijzigt of een e-mailbericht verzendt). Om onverwacht gedrag te voorkomen, moeten uw Python-functies die gegevenssets definiëren alleen de code bevatten die nodig is om de tabel of weergave te definiëren. - Als u bewerkingen wilt uitvoeren zoals het verzenden van e-mailberichten of het integreren met een externe bewakingsservice, met name in functies die gegevenssets definiëren, gebruikt u gebeurtenishook. Het implementeren van deze bewerkingen in de functies die uw gegevenssets definiëren, veroorzaakt onverwacht gedrag.
- De Python-
table
- enview
-functies moeten een DataFrame retourneren. Sommige functies die op DataFrames werken, retourneren geen DataFrames en mogen niet worden gebruikt. Deze bewerkingen omvatten functies zoalscollect()
,count()
,toPandas()
,save()
ensaveAsTable()
. Omdat DataFrame-transformaties worden uitgevoerd nadat de volledige gegevensstroomgrafiek is opgelost, kan het gebruik van dergelijke bewerkingen onbedoelde bijwerkingen hebben.
De dlt
Python-module importeren
DLT Python-functies worden gedefinieerd in de module dlt
. Uw pijplijnen die zijn geïmplementeerd met de Python-API, moeten deze module importeren:
import dlt
Een gerealiseerde DLT-weergave of streamingtabel maken
In Python bepaalt DLT of een gegevensset moet worden bijgewerkt als een gerealiseerde weergave of streamingtabel op basis van de definitiequery. De @table
decorator kan worden gebruikt om zowel materiële weergaven als streamingtabellen te definiëren.
Als u een gerealiseerde weergave in Python wilt definiëren, past u @table
toe op een query die een statische leesbewerking uitvoert op een gegevensbron. Als u een streamingtabel wilt definiëren, past u @table
toe op een query die een streamingbewerking uitvoert op basis van een gegevensbron of gebruikt u de functie create_streaming_table(). Beide typen gegevenssets hebben dezelfde syntaxisspecificatie als volgt:
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
een DLT-weergave maken
Als u een weergave in Python wilt definiëren, past u de @view
decorator toe. Net als de @table
decorator kunt u weergaven in DLT gebruiken voor statische of streaminggegevenssets. Hier volgt de syntaxis voor het definiëren van weergaven met Python:
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Voorbeeld: Tabellen en weergaven definiëren
Als u een tabel of weergave in Python wilt definiëren, past u de @dlt.view
of @dlt.table
decorator toe op een functie. U kunt de functienaam of de parameter name
gebruiken om de tabel- of weergavenaam toe te wijzen. In het volgende voorbeeld worden twee verschillende gegevenssets gedefinieerd: een weergave met de naam taxi_raw
die een JSON-bestand als invoerbron gebruikt en een tabel met de naam filtered_data
die de taxi_raw
weergave als invoer gebruikt:
import dlt
@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return spark.read.table("taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return spark.read.table("taxi_raw").where(...)
Voorbeeld: Toegang krijgen tot een gegevensset die is gedefinieerd in dezelfde pijplijn
Notitie
Hoewel de dlt.read()
- en dlt.read_stream()
-functies nog steeds beschikbaar zijn en volledig worden ondersteund door de DLT Python-interface, raadt Databricks aan altijd de spark.read.table()
- en spark.readStream.table()
-functies te gebruiken vanwege het volgende:
- De
spark
-functies ondersteunen het lezen van interne en externe gegevenssets, waaronder gegevenssets in externe opslag of gedefinieerd in andere pijplijnen. Dedlt
-functies ondersteunen alleen het lezen van interne gegevenssets. - De
spark
-functies ondersteunen het opgeven van opties, zoalsskipChangeCommits
, om bewerkingen te lezen. Het opgeven van opties wordt niet ondersteund door dedlt
-functies.
Als u toegang wilt krijgen tot een gegevensset die in dezelfde pijplijn is gedefinieerd, gebruikt u de spark.read.table()
- of spark.readStream.table()
-functies:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredA():
return spark.read.table("customers_raw").where(...)
Notitie
Wanneer u query's uitvoert op weergaven of tabellen in uw pijplijn, kunt u de catalogus en het schema rechtstreeks opgeven of kunt u de standaardinstellingen gebruiken die zijn geconfigureerd in uw pijplijn. In dit voorbeeld wordt de customers
tabel geschreven en gelezen uit de standaardcatalogus en het standaardschema dat is geconfigureerd voor uw pijplijn.
Voorbeeld: Lezen uit een tabel die is geregistreerd in een metastore
Als u gegevens wilt lezen uit een tabel die is geregistreerd in de Hive-metastore, kunt u in het functieargument de tabelnaam kwalificeren met de databasenaam:
@dlt.table
def customers():
return spark.read.table("sales.customers").where(...)
Zie Gegevens opnemen in een Unity Catalog-pijplijnvoor een voorbeeld van het lezen uit een Unity Catalog-tabel.
voorbeeld: een gegevensset openen met behulp van spark.sql
U kunt ook een gegevensset retourneren met behulp van een spark.sql
-expressie in een queryfunctie. Als u wilt lezen uit een interne gegevensset, kunt u de naam niet gekwalificeerd laten om de standaardcatalogus en het standaardschema te gebruiken, of u kunt deze vooraf instellen:
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM catalog_name.schema_name.customers_cleaned WHERE city = 'Chicago'")
Records definitief verwijderen uit een gerealiseerde weergave of streamingtabel
Als u records definitief wilt verwijderen uit een gerealiseerde weergave of streamingtabel waarvoor verwijderingsvectoren zijn ingeschakeld, zoals voor AVG-naleving, moeten extra bewerkingen worden uitgevoerd op de onderliggende Delta-tabellen van het object. Zie om records permanent te verwijderen uit een gematerialiseerde weergave met ingeschakelde verwijderingsvectoren, zodat zeker is dat ze uit de weergave zijn verwijderd. Zie Records definitief verwijderen uit een streamingtabelom ervoor te zorgen dat records uit een streamingtabel worden verwijderd.
schrijven naar externe gebeurtenisstreamingservices of Delta-tabellen met de DLT sink
-API
Belangrijk
De DLT sink
-API bevindt zich in Openbare Preview.
Notitie
- Bij het uitvoeren van een volledige vernieuwingsupdate, worden geen gegevens uit de sinks gewist. Eventuele opnieuw verwerkte gegevens worden toegevoegd aan de sink en bestaande gegevens worden niet gewijzigd.
- DLT-verwachtingen worden niet ondersteund met de
sink
-API.
Als u wilt schrijven naar een gebeurtenisstreamingservice zoals Apache Kafka of Azure Event Hubs of naar een Delta-tabel vanuit een DLT-pijplijn, gebruikt u de create_sink()
functie die is opgenomen in de dlt
Python-module. Nadat u een sink hebt gemaakt met de functie create_sink()
, gebruikt u de sink in een toevoegstroom om gegevens naar de sink te schrijven. toevoegstroom is het enige stroomtype dat wordt ondersteund met de functie create_sink()
. Andere stroomtypen, zoals apply_changes
, worden niet ondersteund.
Hier volgt de syntaxis voor het maken van een sink met de functie create_sink()
:
create_sink(<sink_name>, <format>, <options>)
Argumenten |
---|
name Typ: str Een reeks die de sink identificeert en gebruikt wordt om naar de sink te verwijzen en deze te beheren. Sinknamen moeten uniek zijn binnen de gehele pijplijn, inclusief in alle broncode zoals notebooks of modules die onderdeel zijn van de pijplijn. Deze parameter is vereist. |
format Type: str Een tekenreeks die de uitvoerindeling definieert, kafka of delta .Deze parameter is vereist. |
options Typ: dict Een optionele lijst met 'sink'-opties, opgemaakt als {"key": "value"} , waarbij de sleutel en waarde beide tekenreeksen zijn. Alle Databricks Runtime-opties die door de Kafka- en Delta-sinks worden ondersteund, zijn beschikbaar. Voor Kafka-opties, zie De Kafka Structured Streaming Writer configureren. Voor Delta-opties, zie Delta-tabel als een afvoer. |
voorbeeld: Een Kafka-sink maken met de functie create_sink()
create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
voorbeeld: een Delta-sink maken met de functie create_sink()
en een bestandssysteempad
In het volgende voorbeeld wordt een sink gemaakt die naar een Delta-tabel schrijft door het bestandssysteempad door te geven aan de tabel:
create_sink(
"my_delta_sink",
"delta",
{ "path": "//path/to/my/delta/table" }
)
voorbeeld: een Delta-sink maken met de create_sink()
-functie en de naam van een Unity Catalog-tabel
Notitie
De Delta-sink ondersteunt externe en beheerde tabellen van Unity Catalog en beheerde Hive-metastore-tabellen. Tabelnamen moeten volledig gespecificeerd zijn. Unity Catalog-tabellen moeten bijvoorbeeld een id met drie lagen gebruiken: <catalog>.<schema>.<table>
. Hive-metastore-tabellen moeten <schema>.<table>
gebruiken.
In het volgende voorbeeld wordt een 'sink' gecreëerd die naar een Delta-tabel schrijft door de naam van een tabel op te geven in Unity Catalog.
create_sink(
"my_delta_sink",
"delta",
{ "tableName": "my_catalog.my_schema.my_table" }
)
Voorbeeld: Een appendflow gebruiken om naar een Delta-sink te schrijven
In het volgende voorbeeld wordt een sink gemaakt die naar een Delta-tabel schrijft en vervolgens wordt een toevoegstroming gemaakt om naar die sink te schrijven.
create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})
@append_flow(name = "flow", target = "my_sink")
def flowFunc():
return <streaming-query>
Voorbeeld: Een append flow gebruiken om naar een Kafka-sink te schrijven
In het volgende voorbeeld wordt een sink gemaakt die naar een Kafka-onderwerp schrijft en vervolgens een toevoegstroom maakt om naar die sink te schrijven:
create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
@append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))
Het schema van het DataFrame dat naar Kafka is geschreven, moet de kolommen bevatten die zijn opgegeven in De kafka Structured Streaming Writer configureren.
Een tabel maken die moet worden gebruikt als doel van streamingbewerkingen
Gebruik de functie create_streaming_table()
om een doeltabel te maken voor recorduitvoer door streamingbewerkingen, waaronder apply_changes(), apply_changes_from_snapshot()en @append_flow uitvoerrecords.
Notitie
De functies create_target_table()
en create_streaming_live_table()
zijn afgeschaft. Databricks raadt aan om bestaande code bij te werken om de create_streaming_table()
-functie te gebruiken.
create_streaming_table(
name = "<table-name>",
comment = "<comment>",
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
path="<storage-location-path>",
schema="schema-definition",
expect_all = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
row_filter = "row-filter-clause"
)
Argumenten |
---|
name Type: str De tabelnaam. Deze parameter is vereist. |
comment Type: str Een optionele beschrijving voor de tabel. |
spark_conf Typ: dict Een optionele lijst met Spark-configuraties voor de uitvoering van deze query. |
table_properties Typ: dict Een optionele lijst met tabeleigenschappen voor de tabel. |
partition_cols Typ: array Een optionele lijst met een of meer kolommen die moeten worden gebruikt voor het partitioneren van de tabel. |
cluster_by Type: array Schakel eventueel liquide clustering in de tabel in en definieer de kolommen die moeten worden gebruikt als clusteringsleutels. Zie Liquid Clustering gebruiken voor Delta-tabellen. |
path Type: str Een optionele opslaglocatie voor tabelgegevens. Als dit niet is ingesteld, wordt het systeem standaard ingesteld op de opslaglocatie van de pijplijn. |
schema Type: str of StructType Een optionele schemadefinitie voor de tabel. Schema's kunnen worden gedefinieerd als een SQL DDL-tekenreeks of met een Python StructType . |
expect_all expect_all_or_drop expect_all_or_fail Typ: dict Optionele beperkingen voor gegevenskwaliteit voor de tabel. Zie meerdere verwachtingen. |
row_filter (publieke preview)Type: str Een optionele rijfilterverklaring voor de tabel. Zie Tabellen publiceren met rijfilters en kolommaskers. |
Beheersen hoe tabellen worden gematerialiseerd
Tabellen bieden ook extra controle over hun materialisatie:
- Specificeer hoe u -tabellen met behulp van
cluster_by
kunt clusteren. U kunt liquide clustering gebruiken om query's te versnellen. Zie Liquid Clustering gebruiken voor Delta-tabellen. - Geef op hoe tabellen worden gepartitioneerd met behulp van
partition_cols
. - U kunt tabeleigenschappen instellen wanneer u een weergave of tabel definieert. Zie DLT-tabeleigenschappen.
- Stel een opslaglocatie in voor tabelgegevens met behulp van de instelling
path
. Tabelgegevens worden standaard opgeslagen in de opslaglocatie voor pijplijnen alspath
niet is ingesteld. - U kunt gegenereerde kolommen in uw schemadefinitie gebruiken. Zie voorbeeld: Een schema en clusterkolommen opgeven.
Notitie
Voor tabellen die kleiner zijn dan 1 TB, raadt Databricks aan om DLT de organisatie van gegevens te laten beheren. U moet geen partitiekolommen opgeven, tenzij u verwacht dat de tabel groter wordt dan een terabyte.
voorbeeld: een schema en clusterkolommen opgeven
U kunt desgewenst een tabelschema opgeven met behulp van een Python-StructType
of een SQL DDL-tekenreeks. Wanneer deze is opgegeven met een DDL-tekenreeks, kan de definitie gegenereerde kolommenbevatten.
In het volgende voorbeeld wordt een tabel met de naam sales
gemaakt met een schema dat is opgegeven met behulp van een Python-StructType
:
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
In het volgende voorbeeld wordt het schema voor een tabel opgegeven met behulp van een DDL-tekenreeks, wordt een gegenereerde kolom gedefinieerd en worden clusterkolommen gedefinieerd:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
cluster_by = ["order_day_of_week", "customer_id"])
def sales():
return ("...")
Standaard wordt het schema afgeleid van de table
definitie als u geen schema opgeeft.
voorbeeld: partitiekolommen opgeven
In het volgende voorbeeld wordt het schema voor een tabel opgegeven met behulp van een DDL-tekenreeks, wordt een gegenereerde kolom gedefinieerd en wordt een partitiekolom gedefinieerd:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
Voorbeeld: Tabelbeperkingen definiëren
Belangrijk
Tabelbeperkingen zitten in Publieke Preview.
Wanneer u een schema opgeeft, kunt u primaire en vreemde sleutels definiëren. De beperkingen zijn informatief en worden niet afgedwongen. Zie de CONSTRAINT clausule in de SQL-taalreferentie.
In het volgende voorbeeld wordt een tabel met een primaire en vreemde-sleutelbeperking gedefinieerd:
@dlt.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
"""
def sales():
return ("...")
Voorbeeld: Een rijfilter en kolommasker definiëren
Belangrijk
Rijfilters en kolommaskers zijn beschikbaar in Openbare Preview.
Als u een gerealiseerde weergave of streamingtabel wilt maken met een rijfilter en kolommasker, gebruikt u de ROW FILTER component en de MASK-component. In het volgende voorbeeld ziet u hoe u een gerealiseerde weergave en een streamingtabel definieert met zowel een rijfilter als een kolommasker:
@dlt.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
return ("...")
Zie Tabellen publiceren met rijfilters en kolommaskersvoor meer informatie over rijfilters en kolommaskers.
Een streamingtabel configureren om wijzigingen in een bronstreamingtabel te negeren
Notitie
- De vlag
skipChangeCommits
werkt alleen metspark.readStream
met behulp van de functieoption()
. U kunt deze vlag niet gebruiken in eendlt.read_stream()
functie. - U kunt de vlag
skipChangeCommits
niet gebruiken wanneer de bronstreamingtabel is gedefinieerd als het doel van een apply_changes()-functie.
Voor streamingtabellen zijn standaard append-only bronnen vereist. Wanneer een streamingtabel een andere streamingtabel als bron gebruikt en de bronstreamingtabel updates of verwijderingen vereist, bijvoorbeeld AVG 'recht om te vergeten' verwerking, kan de skipChangeCommits
vlag worden ingesteld bij het lezen van de bronstreamingtabel om deze wijzigingen te negeren. Zie Updates negeren en verwijderenvoor meer informatie over deze vlag.
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")
Python DLT-eigenschappen
In de volgende tabellen worden de opties en eigenschappen beschreven die u kunt opgeven tijdens het definiëren van tabellen en weergaven met DLT:
@table of @view |
---|
name Type: str Een optionele naam voor de tabel of weergave. Als deze niet is gedefinieerd, wordt de functienaam gebruikt als tabel- of weergavenaam. |
comment Type: str Een optionele beschrijving voor de tabel. |
spark_conf Soort: dict Een optionele lijst met Spark-configuraties voor de uitvoering van deze query. |
table_properties Type: dict Een optionele lijst met tabeleigenschappen voor de tabel. |
path Typ: str Een optionele opslaglocatie voor tabelgegevens. Als dit niet is ingesteld, wordt het systeem standaard ingesteld op de opslaglocatie van de pijplijn. |
partition_cols Type: a collection of str Een optionele verzameling, bijvoorbeeld een list van een of meer kolommen die moeten worden gebruikt voor het partitioneren van de tabel. |
cluster_by Type: array Schakel eventueel liquide clustering in de tabel in en definieer de kolommen die moeten worden gebruikt als clusteringsleutels. Zie Liquid Clustering gebruiken voor Delta-tabellen. |
schema Type: str of StructType Een optionele schemadefinitie voor de tabel. Schema's kunnen worden gedefinieerd als een SQL DDL-tekenreeks of met een Python- StructType . |
temporary Typ: bool Maak een tabel, maar publiceer geen metagegevens voor de tabel. Het temporary trefwoord geeft DLT opdracht om een tabel te maken die beschikbaar is voor de pijplijn, maar die niet buiten de pijplijn mag worden geopend. Om de verwerkingstijd te verminderen, blijft een tijdelijke tabel behouden gedurende de levensduur van de pijplijn die deze maakt, en niet slechts één update.De standaardwaarde is 'Onwaar'. |
row_filter (openbare preview)Type: str Een optionele rijfilterclausule voor de tabel. Zie Tabellen publiceren met rijfilters en kolommaskers. |
Tabel- of weergavedefinitie |
---|
def <function-name>() Een Python-functie die de gegevensset definieert. Als de parameter name niet is ingesteld, wordt <function-name> gebruikt als de naam van de doelgegevensset. |
query Een Spark SQL-instructie die een Spark-gegevensset of Koalas DataFrame retourneert. Gebruik dlt.read() of spark.read.table() om een volledige leesbewerking uit te voeren van een gegevensset die in dezelfde pijplijn is gedefinieerd. Als u een externe gegevensset wilt lezen, gebruikt u de functie spark.read.table() . U kunt dlt.read() niet gebruiken om externe gegevenssets te lezen. Omdat spark.read.table() kan worden gebruikt voor het lezen van interne gegevenssets, gegevenssets die buiten de huidige pijplijn zijn gedefinieerd en waarmee u opties voor het lezen van gegevens kunt opgeven, raadt Databricks aan om deze te gebruiken in plaats van de dlt.read() functie.Wanneer u een gegevensset in een pijplijn definieert, wordt standaard de catalogus en het schema gebruikt dat is gedefinieerd in de pijplijnconfiguratie. U kunt de functie spark.read.table() gebruiken om te lezen uit een gegevensset die in de pijplijn is gedefinieerd zonder kwalificatie. Als u bijvoorbeeld wilt lezen uit een gegevensset met de naam customers :spark.read.table("customers") U kunt de functie spark.read.table() ook gebruiken om te lezen uit een tabel die is geregistreerd in de metastore door de tabelnaam optioneel te kwalificeren met de databasenaam:spark.read.table("sales.customers") Gebruik dlt.read_stream() of spark.readStream.table() om een streaming-leesbewerking uit te voeren van een gegevensset die in dezelfde pijplijn is gedefinieerd. Als u een streaming-leesbewerking wilt uitvoeren vanuit een externe gegevensset, gebruikt u despark.readStream.table() functie. Omdat spark.readStream.table() kan worden gebruikt voor het lezen van interne gegevenssets, gegevenssets die buiten de huidige pijplijn zijn gedefinieerd en waarmee u opties voor het lezen van gegevens kunt opgeven, raadt Databricks aan om deze te gebruiken in plaats van de dlt.read_stream() functie.Als u een query in een DLT- table -functie wilt definiëren met behulp van de SQL-syntaxis, gebruikt u de functie spark.sql . Zie Voorbeeld: Een gegevensset openen met behulp van spark.sql . Als u een query wilt definiëren in een DLT-table -functie met behulp van Python, gebruikt u PySpark syntaxis. |
Verwachtingen |
---|
@expect("description", "constraint") Een beperking voor gegevenskwaliteit declareren die is geïdentificeerd door description . Als een rij in strijd is met de verwachting, neemt u de rij op in de doelgegevensset. |
@expect_or_drop("description", "constraint") Een beperking voor gegevenskwaliteit declareren die is geïdentificeerd door description . Als een rij in strijd is met de verwachting, verwijdert u de rij uit de doelgegevensset. |
@expect_or_fail("description", "constraint") Een beperking voor gegevenskwaliteit declareren die is geïdentificeerd door description . Als een rij in strijd is met de verwachtingen, stop onmiddellijk de uitvoering. |
@expect_all(expectations) Declareer een of meer beperkingen voor gegevenskwaliteit. expectations is een Python-woordenlijst, waarbij de sleutel de beschrijving van de verwachting is en de waarde de verwachtingsbeperking is. Als een rij in strijd is met een van de verwachtingen, neemt u de rij op in de doelgegevensset. |
@expect_all_or_drop(expectations) Declareer een of meer beperkingen voor gegevenskwaliteit. expectations is een Python-woordenlijst, waarbij de sleutel de beschrijving van de verwachting is en de waarde de verwachtingsbeperking is. Als een rij een van de verwachtingen schendt, verwijdert u de rij uit de doelgegevensset. |
@expect_all_or_fail(expectations) Declareer een of meer beperkingen voor gegevenskwaliteit. expectations is een Python-woordenlijst, waarbij de sleutel de beschrijving van de verwachting is en de waarde de verwachtingsbeperking is. Als een rij een van de verwachtingen schendt, stopt u onmiddellijk met de uitvoering. |
Gegevenswijzigingen vastleggen vanuit een veranderstroom met Python in DLT
Gebruik de apply_changes()
-functie in de Python-API om CDC-functionaliteit (DLT change data capture) te gebruiken om brongegevens te verwerken vanuit een CDF (Change Data Feed).
Belangrijk
U moet een streamingtabel als doel opgeven om wijzigingen toe te passen. U kunt desgewenst het schema voor uw doeltabel opgeven. Wanneer u het schema van de apply_changes()
doeltabel opgeeft, moet u de __START_AT
en __END_AT
kolommen met hetzelfde gegevenstype opnemen als de sequence_by
velden.
Als u de vereiste doeltabel wilt maken, kunt u de functie create_streaming_table() gebruiken in de DLT Python-interface.
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Notitie
Voor APPLY CHANGES
verwerking is het standaardgedrag voor INSERT
en UPDATE
gebeurtenissen om upsert CDC-gebeurtenissen vanuit de bron te doen: werk alle rijen in de doeltabel bij die overeenkomen met de opgegeven sleutel(s), of voeg een nieuwe rij in wanneer er geen overeenkomend record in de doeltabel aanwezig is. De verwerking van DELETE
-gebeurtenissen kan worden gespecificeerd met de voorwaarde APPLY AS DELETE WHEN
.
Zie De TOEPASSEN VAN WIJZIGINGEN-API's: CDC-verwerking vereenvoudigen met een wijzigingsfeed en DLT voor meer informatie. Zie apply_changes()
voor een voorbeeld van het gebruik van de functie .
Belangrijk
U moet een streamingtabel als doel opgeven om wijzigingen toe te passen. U kunt desgewenst het schema voor uw doeltabel opgeven. Wanneer u het apply_changes
doeltabelschema opgeeft, moet u de __START_AT
en __END_AT
kolommen met hetzelfde gegevenstype opnemen als het sequence_by
veld.
Zie De APPLY CHANGES-API's: Het vastleggen van wijzigingen vereenvoudigen met DLT-.
Argumenten |
---|
target Type: str De naam van de tabel die moet worden bijgewerkt. U kunt de create_streaming_table() functie gebruiken om de doeltabel te maken voordat u de apply_changes() -functie uitvoert.Deze parameter is vereist. |
source Soort: str De gegevensbron met CDC-records. Deze parameter is vereist. |
keys Type: list De kolom of combinatie van kolommen waarmee een rij in de brongegevens uniek wordt geïdentificeerd. Dit wordt gebruikt om te bepalen welke CDC-gebeurtenissen van toepassing zijn op specifieke records in de doeltabel. U kunt een van de volgende opties opgeven:
Argumenten voor col() functies kunnen geen kwalificaties bevatten. U kunt bijvoorbeeld col(userId) gebruiken, maar u kunt col(source.userId) niet gebruiken.Deze parameter is vereist. |
sequence_by Type: str of col() De kolomnaam waarmee de logische volgorde van CDC-gebeurtenissen in de brongegevens wordt opgegeven. DLT maakt gebruik van deze sequentiëring voor het afhandelen van wijzigingen die in verkeerde volgorde aankomen. U kunt een van de volgende opties opgeven:
Argumenten voor col() functies kunnen geen kwalificaties bevatten. U kunt bijvoorbeeld col(userId) gebruiken, maar u kunt col(source.userId) niet gebruiken.De opgegeven kolom moet een sorteerbaar gegevenstype zijn. Deze parameter is vereist. |
ignore_null_updates Type: bool Toestaan dat updates worden opgenomen die een subset van de doelkolommen bevatten. Wanneer een CDC-gebeurtenis overeenkomt met een bestaande rij en ignore_null_updates wordt True , behouden kolommen met een null hun bestaande waarden in het doel. Dit geldt ook voor kolommen binnen kolommen die een waarde van null hebben. Wanneer ignore_null_updates is False , worden bestaande waarden overschreven met null waarden.Deze parameter is optioneel. De standaardwaarde is False . |
apply_as_deletes Type: str of expr() Dit specificeert wanneer een CDC-gebeurtenis als een DELETE in plaats van een upsert behandeld moet worden. Om ongesorteerde gegevens te verwerken, wordt de verwijderde rij tijdelijk bewaard als een zogenoemde "tombstone" in de onderliggende Delta-tabel, en wordt er een weergave in de metastore gemaakt die deze tombstones wegfiltert. Het bewaarinterval kan worden geconfigureerd met depipelines.cdc.tombstoneGCThresholdInSeconds tabeleigenschap.U kunt een van de volgende opties opgeven:
Deze parameter is optioneel. |
apply_as_truncates Type: str of expr() Hiermee geeft u op wanneer een CDC-gebeurtenis moet worden behandeld als een volledige tabel TRUNCATE . Omdat deze clausule een volledige trunctie van de doeltabel activeert, moet deze alleen worden gebruikt voor specifieke gevallen waarvoor deze functionaliteit nodig is.De parameter apply_as_truncates wordt alleen ondersteund voor SCD-type 1. SCD-type 2 ondersteunt geen truncate-bewerkingen.U kunt een van de volgende opties opgeven:
Deze parameter is optioneel. |
column_list except_column_list Type: list Een subset van kolommen om op te nemen in de doeltabel. Gebruik column_list om de volledige lijst met kolommen op te geven die u wilt opnemen. Gebruik except_column_list om de kolommen op te geven die moeten worden uitgesloten. U kunt waarde declareren als een lijst met tekenreeksen of als Spark SQL-col() -functies:
Argumenten voor col() functies kunnen geen kwalificaties bevatten. U kunt bijvoorbeeld col(userId) gebruiken, maar u kunt col(source.userId) niet gebruiken.Deze parameter is optioneel. De standaardinstelling is dat alle kolommen in de doeltabel worden opgenomen wanneer er geen column_list of except_column_list argument wordt doorgegeven aan de functie. |
stored_as_scd_type Type: str of int Of de records moeten worden opgeslagen als SCD-type 1 of SCD-type 2. Ingesteld op 1 voor SCD-type 1 of 2 voor SCD-type 2.Deze component is optioneel. De standaardwaarde is SCD type 1. |
track_history_column_list track_history_except_column_list Typ: list Een subset van uitvoerkolommen die voor historische doeleinden moeten worden bijgehouden in de doeltabel. Gebruik track_history_column_list om de volledige lijst met kolommen op te geven die moeten worden bijgehouden. Gebruiktrack_history_except_column_list om de kolommen op te geven die moeten worden uitgesloten van het bijhouden. U kunt waarde declareren als een lijst met tekenreeksen of als Spark SQL-col() -functies:
Argumenten voor col() functies kunnen geen kwalificaties bevatten. U kunt bijvoorbeeld col(userId) gebruiken, maar u kunt col(source.userId) niet gebruiken.Deze parameter is optioneel. De standaardinstelling is om alle kolommen in de doeltabel op te nemen wanneer er geen track_history_column_list of een andere beperking is.track_history_except_column_list argument wordt doorgegeven aan de functie. |
Gegevenswijzigingen vastleggen vanuit momentopnamen van databases met Python in DLT
Belangrijk
De APPLY CHANGES FROM SNAPSHOT
-API bevindt zich in openbare preview-versie.
Gebruik de functie apply_changes_from_snapshot()
in de Python-API om CDC-functionaliteit (DLT change data capture) te gebruiken om brongegevens uit momentopnamen van databases te verwerken.
Belangrijk
U moet een streamingtabel als doel opgeven om wijzigingen toe te passen. U kunt desgewenst het schema voor uw doeltabel opgeven. Wanneer u het schema van de apply_changes_from_snapshot()
doeltabel opgeeft, moet u ook de kolommen __START_AT
en __END_AT
met hetzelfde gegevenstype opnemen als het sequence_by
veld.
Als u de vereiste doeltabel wilt maken, kunt u de functie create_streaming_table() gebruiken in de DLT Python-interface.
apply_changes_from_snapshot(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
) -> None
Notitie
Voor APPLY CHANGES FROM SNAPSHOT
verwerking is het standaardgedrag om een nieuwe rij in te voegen wanneer een overeenkomende record met dezelfde sleutel(en) niet in het doel bestaat. Als er wel een overeenkomende record bestaat, wordt deze alleen bijgewerkt als een van de waarden in de rij is gewijzigd. Rijen met sleutels die in het doel aanwezig zijn, maar niet meer in de bron, worden verwijderd.
Zie de API'S WIJZIGINGEN TOEPASSEN: Eenvoudiger veranderingen vastleggen met DLTvoor meer informatie over CDC-verwerking met momentopnamen. Zie de voorbeelden van de apply_changes_from_snapshot()
en historische momentopnameverwerking voor voorbeelden van het gebruik van de -functie.
Argumenten |
---|
target Type: str De naam van de tabel die moet worden bijgewerkt. U kunt de functie create_streaming_table() gebruiken om de doeltabel te maken voordat u de apply_changes() -functie uitvoert.Deze parameter is vereist. |
source Type: str of lambda function Ofwel de naam van een tabel of weergave om periodiek een momentopname te maken of een Python-lambda-functie die de dataframe voor momentopnamen retourneert die moet worden verwerkt en de versie van de momentopname. Zie Het argument source implementeren.Deze parameter is vereist. |
keys Type: list De kolom of combinatie van kolommen waarmee een rij in de brongegevens uniek wordt geïdentificeerd. Dit wordt gebruikt om te bepalen welke CDC-gebeurtenissen van toepassing zijn op specifieke records in de doeltabel. U kunt een van de volgende opties opgeven:
Argumenten voor col() functies kunnen geen kwalificaties bevatten. U kunt bijvoorbeeld col(userId) gebruiken, maar u kunt col(source.userId) niet gebruiken.Deze parameter is vereist. |
stored_as_scd_type Type: str of int Of de records moeten worden opgeslagen als SCD-type 1 of SCD-type 2. Ingesteld op 1 voor SCD-type 1 of 2 voor SCD-type 2.Deze component is optioneel. De standaardwaarde is SCD type 1. |
track_history_column_list track_history_except_column_list Type: list Een subset van uitvoerkolommen die voor historische doeleinden moeten worden bijgehouden in de doeltabel. Gebruik track_history_column_list om de volledige lijst met kolommen op te geven die moeten worden bijgehouden. Gebruikentrack_history_except_column_list om de kolommen op te geven die moeten worden uitgesloten van het bijhouden. U kunt waarde declareren als een lijst met tekenreeksen of als Spark SQL-col() -functies:
Argumenten voor col() functies kunnen geen kwalificaties bevatten. U kunt bijvoorbeeld col(userId) gebruiken, maar u kunt col(source.userId) niet gebruiken.Deze parameter is optioneel. De standaardinstelling is om alle kolommen in de doeltabel op te nemen wanneer er geen track_history_column_list of een andere beperking is.track_history_except_column_list argument wordt doorgegeven aan de functie. |
het argument source
implementeren
De functie apply_changes_from_snapshot()
bevat het argument source
. Voor het verwerken van historische momentopnamen is het source
argument naar verwachting een Python-lambda-functie die twee waarden retourneert naar de apply_changes_from_snapshot()
-functie: een Python DataFrame met de momentopnamegegevens die moeten worden verwerkt en een momentopnameversie.
Hier volgt de handtekening van de lambda-functie:
lambda Any => Optional[(DataFrame, Any)]
- Het argument voor de lambda-functie is de laatst verwerkte momentopnameversie.
- De retourwaarde van de lambda-functie is
None
of een tuple van twee waarden: de eerste waarde van de tuple is een DataFrame dat de momentopname bevat die moet worden verwerkt. De tweede waarde van het tuple is de versie van de momentopname die de logische volgorde van de momentopname vertegenwoordigt.
Een voorbeeld waarmee de lambda-functie wordt geïmplementeerd en aangeroepen:
def next_snapshot_and_version(latest_snapshot_version):
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
apply_changes_from_snapshot(
# ...
source = next_snapshot_and_version,
# ...
)
De DLT-runtime voert de volgende stappen uit telkens wanneer de pijplijn met de apply_changes_from_snapshot()
functie wordt geactiveerd:
- Hiermee wordt de
next_snapshot_and_version
-functie uitgevoerd om het volgende dataframe voor momentopnamen en de bijbehorende momentopnameversie te laden. - Als er geen DataFrame wordt geretourneerd, wordt de uitvoering beëindigd en wordt de pijplijnupdate gemarkeerd als voltooid.
- Detecteert de wijzigingen in de nieuwe momentopname en past deze incrementeel toe op de doeltabel.
- Keert terug naar stap 1 om de volgende momentopname en de bijbehorende versie te laden.
beperkingen voor
De DLT Python-interface heeft de volgende beperking:
De functie pivot()
wordt niet ondersteund. De pivot
-bewerking in Spark vereist het gretige laden van invoergegevens om het uitvoerschema te berekenen. Deze mogelijkheid wordt niet ondersteund in DLT.