Python-Sprachreferenz zu Delta Live Tables
Dieser Artikel enthält ausführliche Informationen zur Python-Programmierschnittstelle für Delta Live Tables.
Informationen zur SQL-API finden Sie in der Sprachreferenz zu Delta Live Tables.
Ausführliche Informationen zum Konfigurieren des automatischen Ladens finden Sie unter Automatisches Laden.
Vorbemerkungen
Beachten Sie die folgenden wichtigen Punkte, wenn Sie Pipelines mit der Python-Schnittstelle für Delta Live Tables implementieren:
- Die Python-Funktionen
table()
undview()
werden im Rahmen der Planung und Ausführung einer Pipelineaktualisierung mehrmals aufgerufen. Fügen Sie daher in diese Funktionen keinen Code ein, der Nebenwirkungen haben kann (beispielsweise Code, der Daten ändert oder eine E-Mail sendet). Um unerwartetes Verhalten zu vermeiden, sollten Ihre Python-Funktionen, die Datasets definieren, nur den Code enthalten, der zum Definieren der Tabelle oder Sicht erforderlich ist. - Verwenden Sie Ereignishooks, um Vorgänge wie das Senden von E-Mails oder die Integration in einen externen Überwachungsdienst auszuführen. Das gilt insbesondere bei Funktionen, die Datasets definieren. Die Implementierung dieser Vorgänge in den Funktionen, die Ihre Datasets definieren, führt zu unerwartetem Verhalten.
- Die Python-Funktionen
table
undview
müssen einen DataFrame zurückgeben. Einige Funktionen, die mit DataFrames arbeiten, geben keine zurück und sollten deshalb nicht verwendet werden. Diese Vorgänge umfassen Funktionen wiecollect()
,count()
,toPandas()
,save()
undsaveAsTable()
. Weil DataFrame-Transformationen ausgeführt werden, nachdem das vollständige Dataflowdiagramm aufgelöst wurde, könnte die Verwendung solcher Vorgänge unbeabsichtigte Nebenwirkungen haben.
Importieren des Python-Moduls dlt
Python-Funktionen für Delta Live Tables werden im dlt
-Modul definiert. Ihre mit der Python-API implementierten Pipelines müssen dieses Modul importieren:
import dlt
Erstellen einer materialisierten Sicht oder Streamingtabelle für Delta Live Tables
In Python bestimmt Delta Live Tables basierend auf der definierenden Abfrage, ob ein Dataset als materialisierte Sicht oder als Streamingtabelle aktualisiert werden soll. Der @table
-Dekorateur kann verwendet werden, um sowohl materialisierte Ansichten als auch Streamingtabellen zu definieren.
Um eine materialisierte Sicht in Python zu definieren, wenden Sie @table
auf eine Abfrage an, die einen statischen Lesevorgang für eine Datenquelle ausführt. Um eine Streamingtabelle zu definieren, wenden Sie @table
auf eine Abfrage an, die einen Streaming-Lesevorgang für eine Datenquelle durchführt oder die Funktion create_streaming_table() verwendet. Beide Datasettypen haben die gleiche Syntaxspezifikation:
Hinweis
Damit das Argument cluster_by
zum Aktivieren des Liquid Clustering verwenden werden kann, muss Ihre Pipeline für die Verwendung des Vorschaukanals konfiguriert sein.
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Erstellen einer Delta Live Tables-Sicht
Um eine Sicht in Python zu definieren, wenden Sie den @view
-Decorator an. Sichten können in Delta Live Tables genau wie das Decorator-Element @table
entweder für statische Datasets oder für Streamingdatasets verwendet werden. Hier sehen Sie die Syntax zum Definieren von Sichten mit Python:
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Beispiel: Definieren von Tabellen und Sichten
Wenden Sie das Decorator-Element @dlt.view
oder @dlt.table
auf eine Funktion an, um eine Sicht oder Tabelle in Python zu definieren. Sie können den Funktionsnamen oder den name
-Parameter verwenden, um den Tabellen- oder Sichtnamen zuzuweisen. Im folgenden Beispiel werden zwei verschiedene Datasets definiert: eine Sicht namens taxi_raw
, die eine JSON-Datei als Eingabequelle verwendet, und eine Tabelle namens filtered_data
, die die taxi_raw
-Ansicht als Eingabe verwendet:
import dlt
@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return spark.read.table("LIVE.taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return spark.read.table("LIVE.taxi_raw").where(...)
Beispiel: Zugreifen auf ein Dataset, das in der gleichen Pipeline definiert ist
Hinweis
Obwohl die dlt.read()
Funktionen und dlt.read_stream()
Funktionen weiterhin verfügbar und vollständig von der Delta Live Tables Python-Schnittstelle unterstützt werden, empfiehlt Databricks die Verwendung der Funktionen spark.readStream.table()
und funktionen spark.read.table()
immer aufgrund der folgenden:
- Die
spark
Funktionen unterstützen das Lesen interner und externer Datasets, einschließlich Datasets im externen Speicher oder definiert in anderen Pipelines. Diedlt
Funktionen unterstützen nur das Lesen interner Datasets. - Die
spark
Funktionen unterstützen das Angeben von Optionen, zskipChangeCommits
. B. zum Lesen von Vorgängen. Das Angeben von Optionen wird von dendlt
Funktionen nicht unterstützt.
Um auf ein dataset zuzugreifen, das in derselben Pipeline definiert ist, verwenden Sie die spark.read.table()
oder spark.readStream.table()
die Funktionen, bevor das LIVE
Schlüsselwort zum Datasetnamen aussteht:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredA():
return spark.read.table("LIVE.customers_raw").where(...)
Beispiel: Lesen aus einer Tabelle, die in einem Metastore registriert ist
Wenn Sie Daten aus einer Tabelle lesen möchten, die im Hive-Metastore registriert ist, lassen Sie im Funktionsargument das Schlüsselwort LIVE
weg, und qualifizieren Sie optional den Tabellennamen mit dem Datenbanknamen:
@dlt.table
def customers():
return spark.read.table("sales.customers").where(...)
Ein Beispiel für das Lesen aus einer Unity Catalog-Tabelle finden Sie unter Erfassen von Daten in einer Unity Catalog-Pipeline.
Beispiel: Zugreifen auf ein Dataset mithilfe von spark.sql
Sie können ein Dataset auch mithilfe eines spark.sql
-Ausdrucks in einer Abfragefunktion zurückgeben. Um aus einem internen Dataset zu lesen, müssen Sie dem Datasetnamen LIVE.
voranstellen:
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")
Erstellen einer Tabelle, die als Ziel von Streamingvorgängen verwendet werden soll
Verwenden Sie die create_streaming_table()
-Funktion, um eine Zieltabelle für die Ausgabe von Datensätzen durch Streamingvorgänge zu erstellen, einschließlich apply_changes(), apply_changes_from_snapshot() und @append_flow-Ausgabedatensätze.
Hinweis
Die Funktionen create_target_table()
und create_streaming_live_table()
sind veraltet. Databricks empfiehlt das Aktualisieren des vorhandenen Codes, um die create_streaming_table()
-Funktion zu verwenden.
Hinweis
Damit das Argument cluster_by
zum Aktivieren des Liquid Clustering verwenden werden kann, muss Ihre Pipeline für die Verwendung des Vorschaukanals konfiguriert sein.
create_streaming_table(
name = "<table-name>",
comment = "<comment>"
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
path="<storage-location-path>",
schema="schema-definition",
expect_all = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
row_filter = "row-filter-clause"
)
Argumente |
---|
name Typ: str Der Tabellenname. Dieser Parameter ist erforderlich. |
comment Typ: str Dies ist eine optionale Beschreibung für die Tabelle |
spark_conf Typ: dict Eine optionale Liste der Spark-Konfigurationen für die Ausführung dieser Abfrage. |
table_properties Typ: dict Eine optionale Liste der Tabelleneigenschaften für die Tabelle |
partition_cols Typ: array Eine optionale Liste einer oder mehrerer Spalten, die zum Partitionieren der Tabelle verwendet werden sollen |
cluster_by Typ: array Optional können Sie das Liquid Clustering in der Tabelle aktivieren und die Spalten so definieren, die sie als Clusteringschlüssel verwendet werden. Weitere Informationen finden Sie unter Verwenden von Liquid Clustering für Delta-Tabellen. |
path Typ: str Ein optionaler Speicherort für Tabellendaten. Wenn diese Einstellung nicht festgelegt ist, verwendet das System standardmäßig den Speicherort der Pipeline. |
schema Typ: str oder StructType Eine optionale Schemadefinition für die Tabelle. Schemas können als SQL-DDL-Zeichenfolge oder mit Python definiert werden StructType . |
expect_all expect_all_or_drop expect_all_or_fail Typ: dict Optionale Datenqualitätseinschränkungen für die Tabelle. Siehe Mehrere Erwartungen. |
row_filter (Public Preview)Typ: str Eine optionale Zeilenfilterklausel für die Tabelle. Siehe Veröffentlichen von Tabellen mit Zeilenfiltern und Spaltenmasken. |
Steuern der Materialisierung von Tabellen
Tabellen bieten auch zusätzliche Steuerung ihrer Materialisierung:
- Geben Sie an, wie Tabellen mit
partition_cols
partitioniert werden. Sie können die Partitionierung verwenden, um Abfragen zu beschleunigen. - Sie können Tabelleneigenschaften festlegen, wenn Sie eine Sicht oder Tabelle definieren. Weitere Informationen finden Sie unter Delta Live Tables-Tabelleneigenschaften.
- Legen Sie mithilfe der
path
-Einstellung einen Speicherort für Tabellendaten fest. Standardmäßig werden Tabellendaten am Speicherort der Pipeline gespeichert, wennpath
nicht festgelegt ist. - Sie können Generierte Spalten in Ihrer Schemadefinition verwenden. Weitere Informationen finden Sie unter Beispiel: Angeben des Schemas und der Partitionsspalten.
Hinweis
Für Tabellen mit einer Größe von weniger als 1 TB empfiehlt Databricks, Delta Live Tables die Strukturierung der Daten steuern zu lassen. Wenn Sie nicht davon ausgehen, dass die Größe Ihrer Tabelle über ein Terabyte hinausgeht, sollten Sie im Allgemeinen keine Partitionsspalten angeben.
Beispiel: Angeben des Schemas und der Partitionsspalten
Sie können optional ein Tabellenschema mithilfe eines Python-StructType
oder einer SQL-DDL-Zeichenfolge angeben. Wenn eine DDL-Zeichenfolge angegeben wird, kann die Definition generierte Spalten enthalten.
Im folgenden Beispiel wird eine Tabelle namens sales
mit einem Schema erstellt, das mithilfe eines Python-Strukturtyps StructType
angegeben wird:
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
Im folgenden Beispiel wird das Schema für eine Tabelle mithilfe einer DDL-Zeichenfolge angegeben, eine generierte Spalte definiert und eine Partitionsspalte definiert:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
Standardmäßig leitet Delta Live Tables das Schema aus der table
-Definition ab, wenn Sie kein Schema angeben.
Konfigurieren einer Streamingtabelle, sodass Änderungen in einer Quellstreamingtabelle ignoriert werden
Hinweis
- Das Flag
skipChangeCommits
funktioniert nur, wennspark.readStream
die Funktionoption()
verwendet. Das Flag kann nicht in einerdlt.read_stream()
-Funktion verwendet werden. - Sie können das
skipChangeCommits
-Flag nicht verwenden, wenn die Quellstreamingtabelle als Ziel einer apply_changes()-Funktion definiert ist.
Für Streamingtabellen sind standardmäßig reine Anfügequellen erforderlich. Wenn eine Streamingtabelle eine andere Streamingtabelle als Quelle verwendet und die Quellstreamingtabelle Aktualisierungen oder Löschvorgänge erfordert (etwa aufgrund des Rechts auf Vergessen der DSGVO), kann das Flag skipChangeCommits
beim Lesen der Quellstreamingtabelle festgelegt werden, um diese Änderungen zu ignorieren. Weitere Informationen zu diesem Flag finden Sie unter Ignorieren von Updates und Löschungen.
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")
Beispiel: Definieren von Tabellenconstraints
Wichtig
Tabellenconstraints befinden sich in Public Preview.
Beim Angeben eines Schemas können Sie Primär- und Fremdschlüssel definieren. Die Einschränkungen dienen der Information und werden nicht erzwungen. Weitere Informationen finden Sie unter der CONSTRAINT-Klausel in der SQL-Sprachreferenz.
Im folgenden Beispiel wird eine Tabelle mit einer Primär- und Fremdschlüsseleinschränkung definiert:
@dlt.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
"""
def sales():
return ("...")
Beispiel: Definieren eines Zeilenfilters und einer Spaltenmaske
Wichtig
Zeilenfilter und Spaltenformate befinden sich im Public Preview.
Verwenden Sie die ROW FILTER-Klausel und die MASK-Klausel, um eine materialisierte Ansicht oder Streamingtabelle mit einem Zeilenfilter und spaltenformat zu erstellen. Im folgenden Beispiel wird veranschaulicht, wie sie eine materialisierte Ansicht und eine Streamingtabelle mit einem Zeilenfilter und einem Spaltenformat definieren:
@dlt.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
return ("...")
Weitere Informationen zu Zeilenfiltern und Spaltenmasken finden Sie unter Veröffentlichen von Tabellen mit Zeilenfiltern und Spaltenmasken.
Eigenschaften von Python-Delta Live Tables
In den folgenden Tabellen werden die Optionen und Eigenschaften beschrieben, die Sie beim Definieren von Tabellen und Sichten mit Delta Live Tables angeben können:
Hinweis
Damit das Argument cluster_by
zum Aktivieren des Liquid Clustering verwenden werden kann, muss Ihre Pipeline für die Verwendung des Vorschaukanals konfiguriert sein.
@table oder @view |
---|
name Typ: str Ein optionaler Name für die Tabelle oder Sicht. Wenn er nicht definiert ist, wird der Funktionsname als Tabellen- oder Sichtname verwendet. |
comment Typ: str Dies ist eine optionale Beschreibung für die Tabelle |
spark_conf Typ: dict Eine optionale Liste der Spark-Konfigurationen für die Ausführung dieser Abfrage. |
table_properties Typ: dict Eine optionale Liste der Tabelleneigenschaften für die Tabelle |
path Typ: str Ein optionaler Speicherort für Tabellendaten. Wenn diese Einstellung nicht festgelegt ist, verwendet das System standardmäßig den Speicherort der Pipeline. |
partition_cols Typ: a collection of str Eine optionale Sammlung – beispielsweise eine Liste ( list ) – mit mindestens einer Spalte zum Partitionieren der Tabelle. |
cluster_by Typ: array Optional können Sie das Liquid Clustering in der Tabelle aktivieren und die Spalten so definieren, die sie als Clusteringschlüssel verwendet werden. Weitere Informationen finden Sie unter Verwenden von Liquid Clustering für Delta-Tabellen. |
schema Typ: str oder StructType Eine optionale Schemadefinition für die Tabelle. Schemas können als SQL-DDL-Zeichenfolge oder mit Python StructType definiert werden |
temporary Typ: bool Erstellen Sie eine Tabelle, veröffentlichen Sie jedoch keine Metadaten für die Tabelle. Das Schlüsselwort temporary weist Delta Live Tables an, eine Tabelle zu erstellen, die für die Pipeline verfügbar ist, auf die aber nicht außerhalb der Pipeline zugegriffen werden sollte. Um die Verarbeitungszeit zu reduzieren, wird eine temporäre Tabelle für die Lebensdauer der Pipeline beibehalten, die sie erstellt, und nicht nur für ein einzelnes Update.Die Standardeinstellung lautet „false“. |
row_filter (Public Preview)Typ: str Eine optionale Zeilenfilterklausel für die Tabelle. Siehe Veröffentlichen von Tabellen mit Zeilenfiltern und Spaltenmasken. |
Tabellen- oder Sichtdefinition |
---|
def <function-name>() Eine Python-Funktion, die das Dataset definiert. Wenn der name -Parameter nicht festgelegt ist, wird <function-name> als Zieldatasetname verwendet. |
query Eine Spark SQL-Anweisung, die ein Spark-Dataset oder einen Koalas-DataFrame zurückgibt. Verwenden Sie dlt.read() oder spark.read.table() für das vollständige Lesen aus einem Dataset, das in derselben Pipeline definiert ist. Verwenden Sie die spark.read.table() Funktion, um ein externes Dataset zu lesen. Sie können externe Datasets nicht dlt.read() lesen. Da spark.read.table() sie zum Lesen interner Datasets, außerhalb der aktuellen Pipeline definierten Datasets verwendet werden können und Sie Optionen zum Lesen von Daten angeben können, empfiehlt Databricks die Verwendung anstelle der dlt.read() Funktion.Wenn Sie die spark.read.table() Funktion verwenden, um aus einem Dataset zu lesen, das in derselben Pipeline definiert ist, stellen Sie das LIVE Schlüsselwort dem Datasetnamen im Funktionsargument voran. So lesen Sie beispielsweise aus einem Dataset mit dem Namen customers :spark.read.table("LIVE.customers") Sie können auch die spark.read.table() -Funktion verwenden, um aus einer im Metastore registrierten Tabelle zu lesen, indem Sie das LIVE -Schlüsselwort weglassen und optional den Tabellennamen mit dem Datenbanknamen qualifizieren:spark.read.table("sales.customers") Verwenden dlt.read_stream() oder spark.readStream.table() durchführen Sie einen Streaming-Lesevorgang aus einem Dataset, das in derselben Pipeline definiert ist. Verwenden Sie zum Ausführen eines Streaming-Lesevorgangs aus einem externen Dataset diespark.readStream.table() Funktion. Da spark.readStream.table() sie zum Lesen interner Datasets, außerhalb der aktuellen Pipeline definierten Datasets verwendet werden können und Sie Optionen zum Lesen von Daten angeben können, empfiehlt Databricks die Verwendung anstelle der dlt.read_stream() Funktion.Verwenden Sie die Funktion, um eine Abfrage in einer Delta Live Tables-Funktion table mithilfe der spark.sql SQL-Syntax zu definieren. Siehe Beispiel: Zugreifen auf ein Dataset mit spark.sql. Verwenden Sie pySpark-Syntax, um eine Abfrage in einer Delta Live Tables-Funktion table mithilfe von Python zu definieren. |
Erwartungen |
---|
@expect("description", "constraint") Deklarieren Sie eine Datenqualitätseinschränkung, die identifiziert wird durch description . Wenn eine Zeile gegen die Erwartung verstößt, schließen Sie die Zeile in das Zieldataset ein. |
@expect_or_drop("description", "constraint") Deklarieren Sie eine Datenqualitätseinschränkung, die identifiziert wird durch description . Wenn eine Zeile gegen die Erwartung verstößt, löschen Sie die Zeile aus dem Zieldataset. |
@expect_or_fail("description", "constraint") Deklarieren Sie eine Datenqualitätseinschränkung, die identifiziert wird durch description . Wenn eine Zeile gegen die Erwartung verstößt, beenden Sie die Ausführung sofort. |
@expect_all(expectations) Deklarieren Sie eine oder mehrere Datenqualitätseinschränkungen. expectations ist ein Python-Wörterbuch, wobei der Schlüssel die Beschreibung der Erwartung und der Wert die Erwartungseinschränkung ist. Wenn eine Zeile gegen eine der Erwartungen verstößt, schließen Sie die Zeile in das Zieldataset ein. |
@expect_all_or_drop(expectations) Deklarieren Sie eine oder mehrere Datenqualitätseinschränkungen. expectations ist ein Python-Wörterbuch, wobei der Schlüssel die Beschreibung der Erwartung und der Wert die Erwartungseinschränkung ist. Wenn eine Zeile gegen eine der Erwartungen verstößt, löschen Sie die Zeile aus dem Zieldataset. |
@expect_all_or_fail(expectations) Deklarieren Sie eine oder mehrere Datenqualitätseinschränkungen. expectations ist ein Python-Wörterbuch, wobei der Schlüssel die Beschreibung der Erwartung und der Wert die Erwartungseinschränkung ist. Wenn eine Zeile gegen eine der Erwartungen verstößt, beenden Sie die Ausführung sofort. |
Ändern der Datenerfassung aus einem Änderungsfeed mit Python in Delta Live Tables
Verwenden Sie die Funktion apply_changes()
in der Python-API, um Delta Live Tables Change Data Capture (CDC)-Funktionen zum Verarbeiten von Quelldaten aus einem Änderungsdatenfeed (CDF) zu verwenden.
Wichtig
Sie müssen eine Zielstreamingtabelle deklarieren, auf die Änderungen angewendet werden sollen. Optional können Sie das Schema für Ihre Zieltabelle angeben. Wenn Sie das Schema der apply_changes()
-Zieltabelle angeben, müssen Sie die Spalten __START_AT
und __END_AT
mit demselben Datentyp wie sequence_by
-Felder angeben.
Zum Erstellen der erforderlichen Zieltabelle können Sie die Funktion create_streaming_table() in der Python-Schnittstelle Delta Live Tables verwenden.
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Hinweis
Bei APPLY CHANGES
-Verarbeitung ist das Standardverhalten für INSERT
- und UPDATE
-Ereignisse das upsert CDC-Ereignisse aus der Quelle: Aktualisieren Sie alle Zeilen in der Zieltabelle, die den angegebenen Schlüsseln entsprechen, oder fügen Sie eine neue Zeile ein, wenn ein übereinstimmender Datensatz nicht in der Zieltabelle vorhanden ist. Die Behandlung von DELETE
-Ereignissen kann mit der APPLY AS DELETE WHEN
-Bedingung angegeben werden.
Weitere Informationen zur CDC-Verarbeitung mit einem Änderungsfeed finden Sie unter APPLY CHANGES-APIs: Vereinfachen der Datensatzerfassung von Änderungsdaten mit Delta Live Tables. Ein Beispiel für die Verwendung der apply_changes()
-Funktion finden Sie unter Beispiel: SCD-Typ 1 und SCD-Typ 2-Verarbeitung mit CDF-Quelldaten.
Wichtig
Sie müssen eine Zielstreamingtabelle deklarieren, auf die Änderungen angewendet werden sollen. Optional können Sie das Schema für Ihre Zieltabelle angeben. Wenn Sie das apply_changes
-Zieltabellenschema angeben, müssen Sie auch die Spalten __START_AT
und __END_AT
mit demselben Datentyp wie das Feld sequence_by
angeben.
Siehe Die APPLY CHANGES-APIs: Vereinfachen der Änderungsdatenerfassung mit Delta Live Tables.
Argumente |
---|
target Typ: str Der Name der zu aktualisierenden Tabelle. Sie können die Funktion create_streaming_table() verwenden, um die Zieltabelle zu erstellen, bevor Sie die Funktion apply_changes() ausführen.Dieser Parameter ist erforderlich. |
source Typ: str Die Datenquelle, die CDC-Datensätze enthält. Dieser Parameter ist erforderlich. |
keys Typ: list Die Spalte oder Kombination von Spalten, die eine Zeile in den Quelldaten eindeutig identifiziert. Damit wird ermittelt, welche CDC-Ereignisse für bestimmte Datensätze in der Zieltabelle gelten. Sie können eins der folgenden Elemente angeben: – Eine Liste von Zeichenfolgen: ["userId", "orderId"] – Eine Liste von Spark-SQL col() -Funktionen: [col("userId"), col("orderId"] Argumente für col() -Funktionen können keine Qualifizierer enthalten. Beispielsweise können Sie col(userId) verwenden, aber nicht col(source.userId) .Dieser Parameter ist erforderlich. |
sequence_by Typ: str oder col() Der Spaltenname, der die logische Reihenfolge der CDC-Ereignisse in den Quelldaten angibt. Delta Live Tables verwendet diese Sequenzierung, um Änderungsereignisse zu behandeln, die in nicht ordnungsgemäßer Reihenfolge eingehen. Sie können eins der folgenden Elemente angeben: – Eine Zeichenfolge: "sequenceNum" – Eine Spark SQL- col() -Funktion: col("sequenceNum") Argumente für col() -Funktionen können keine Qualifizierer enthalten. Beispielsweise können Sie col(userId) verwenden, aber nicht col(source.userId) .Die angegebene Spalte muss ein sortierbarer Datentyp sein. Dieser Parameter ist erforderlich. |
ignore_null_updates Typ: bool Ermöglicht das Erfassen von Updates, die eine Teilmenge der Zielspalten enthalten. Wenn ein CDC-Ereignis mit einer vorhandenen Zeile übereinstimmt und ignore_null_updates True ist, behalten Spalten mit einer null ihre vorhandenen Werte im Ziel bei. Dies gilt auch für geschachtelte Spalten mit dem Wert null . Wenn ignore_null_updates ist False , werden vorhandene Werte mit null -Werten überschrieben.Dieser Parameter ist optional. Der Standardwert ist False . |
apply_as_deletes Typ: str oder expr() Gibt an, wann ein CDC-Ereignis als DELETE und nicht als Upsert behandelt werden soll. Um nicht sortierte Daten zu verarbeiten, wird die gelöschte Zeile vorübergehend als Tombstone in der zugrunde liegenden Delta-Tabelle beibehalten, und im Metastore wird eine Sicht erstellt, die diese Tombstones herausfiltert. Das Aufbewahrungsintervall kann konfiguriert werden mit:pipelines.cdc.tombstoneGCThresholdInSeconds Table-Eigenschaft.Sie können eins der folgenden Elemente angeben: – Eine Zeichenfolge: "Operation = 'DELETE'" – Eine Spark SQL- expr() -Funktion: expr("Operation = 'DELETE'") Dieser Parameter ist optional. |
apply_as_truncates Typ: str oder expr() Gibt an, wann ein CDC-Ereignis als TRUNCATE der gesamten Tabelle behandelt werden sollte. Da diese Klausel die vollständige Abschneidung der Zieltabelle auslöst, sollte sie nur in bestimmten Anwendungsfälle verwendet werden, die die Nutzung dieser Funktion erfordern.Der Parameter apply_as_truncates wird nur für den SCD-Typ 1 unterstützt. Der SCD-Typ 2 unterstützt keine Abkürzungsvorgänge.Sie können eins der folgenden Elemente angeben: – Eine Zeichenfolge: "Operation = 'TRUNCATE'" – Eine Spark SQL- expr() -Funktion: expr("Operation = 'TRUNCATE'") Dieser Parameter ist optional. |
column_list except_column_list Typ: list Gibt eine Teilmenge der Spalten an, die in die Zieltabelle eingeschlossen werden sollen. Verwenden Sie column_list , um die vollständige Liste der einzuschließenden Spalten anzugeben. Verwenden Sie except_column_list , um die auszuschließende Spalten anzugeben. Sie können einen Wert entweder als Liste von Zeichenfolgen oder als Spark SQL-col() -Funktionen deklarieren:- column_list = ["userId", "name", "city"] .- column_list = [col("userId"), col("name"), col("city")] - except_column_list = ["operation", "sequenceNum"] - except_column_list = [col("operation"), col("sequenceNum") Argumente für col() -Funktionen können keine Qualifizierer enthalten. Beispielsweise können Sie col(userId) verwenden, aber nicht col(source.userId) .Dieser Parameter ist optional. Standardmäßig werden alle Spalten in die Zieltabelle eingeschlossen, wenn kein column_list - oder except_column_list -Argument an die Funktion übergeben wird. |
stored_as_scd_type Typ: str oder int Gibt an, ob Datensätze als SCD-Typ 1 oder SCD-Typ 2 gespeichert werden sollen. Für SCD-Typ 1 auf 1 oder für SCD-Typ 2 auf 2 festgelegt.Diese Klausel ist optional. Der Standardwert ist SCD-Typ 1. |
track_history_column_list track_history_except_column_list Typ: list Eine Teilmenge der Ausgabespalten, die im Hinblick auf den Verlauf in der Zieltabelle nachverfolgt werden sollen. Verwenden Sie track_history_column_list , um die vollständige Liste der Spalten anzugeben, die nachverfolgt werden sollen. Verwendungtrack_history_except_column_list zum Angeben der Spalten, die von der Nachverfolgung ausgeschlossen werden sollen. Sie können einen Wert entweder als Liste von Zeichenfolgen oder als Spark SQL-col() -Funktionen deklarieren:- track_history_column_list = ["userId", "name", "city"] .- track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") Argumente für col() -Funktionen können keine Qualifizierer enthalten. Beispielsweise können Sie col(userId) verwenden, aber nicht col(source.userId) .Dieser Parameter ist optional. Standardmäßig werden alle Spalten in die Zieltabelle eingeschlossen, wenn kein track_history_column_list - odertrack_history_except_column_list -Argument an die Funktion übergeben wird. |
Ändern der Datenerfassung aus Datenbankmomentaufnahmen mit Python in Delta Live Tables
Wichtig
Die APPLY CHANGES FROM SNAPSHOT
-API befindet sich in der öffentlichen Vorschau.
Verwenden Sie die apply_changes_from_snapshot()
-Funktion in der Python-API, um Delta Live Tables Change Data Capture (CDC)-Funktionalität zu verwenden, um Quelldaten aus Datenbankmomentaufnahmen zu verarbeiten.
Wichtig
Sie müssen eine Zielstreamingtabelle deklarieren, auf die Änderungen angewendet werden sollen. Optional können Sie das Schema für Ihre Zieltabelle angeben. Wenn Sie das Schema der apply_changes_from_snapshot()
-Zieltabelle angeben, müssen Sie auch die Spalten __START_AT
und __END_AT
mit demselben Datentyp wie das Feld sequence_by
angeben.
Zum Erstellen der erforderlichen Zieltabelle können Sie die Funktion create_streaming_table() in der Python-Schnittstelle Delta Live Tables verwenden.
apply_changes_from_snapshot(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
) -> None
Hinweis
Bei APPLY CHANGES FROM SNAPSHOT
-Verarbeitung besteht das Standardverhalten darin, eine neue Zeile einzufügen, wenn im Ziel kein übereinstimmender Datensatz mit demselben Schlüssel vorhanden ist. Wenn ein übereinstimmender Datensatz vorhanden ist, wird er nur aktualisiert, wenn sich einer der Werte in der Zeile geändert hat. Zeilen mit Schlüsseln, die im Ziel vorhanden sind, aber nicht mehr in der Quelle vorhanden sind, werden gelöscht.
Weitere Informationen zur CDC-Verarbeitung mit Momentaufnahmen finden Sie unter APPLY CHANGES-APIs: Vereinfachen der Datensatzerfassung von Änderungsdaten mit Delta Live Tables. Beispiele für die Verwendung der apply_changes_from_snapshot()
-Funktion finden Sie in den Beispielen für die regelmäßige Aufnahme von Momentaufnahmen und historischen Momentaufnahmen.
Argumente |
---|
target Typ: str Der Name der zu aktualisierenden Tabelle. Sie können die Funktion create_streaming_table() verwenden, um die Zieltabelle zu erstellen, bevor Sie die Funktion apply_changes() ausführen.Dieser Parameter ist erforderlich. |
source Typ: str oder lambda function Entweder der Name einer Tabelle oder Ansicht, von der in regelmäßigen Abständen eine Momentaufnahme zu nehmen ist, oder eine Python-Lambda-Funktion, die den zu verarbeitenden Snapshot DataFrame und die Momentaufnahmeversion zurückgibt. Siehe Implementieren des Quellarguments. Dieser Parameter ist erforderlich. |
keys Typ: list Die Spalte oder Kombination von Spalten, die eine Zeile in den Quelldaten eindeutig identifiziert. Damit wird ermittelt, welche CDC-Ereignisse für bestimmte Datensätze in der Zieltabelle gelten. Sie können eins der folgenden Elemente angeben: – Eine Liste von Zeichenfolgen: ["userId", "orderId"] – Eine Liste von Spark-SQL col() -Funktionen: [col("userId"), col("orderId"] Argumente für col() -Funktionen können keine Qualifizierer enthalten. Beispielsweise können Sie col(userId) verwenden, aber nicht col(source.userId) .Dieser Parameter ist erforderlich. |
stored_as_scd_type Typ: str oder int Gibt an, ob Datensätze als SCD-Typ 1 oder SCD-Typ 2 gespeichert werden sollen. Für SCD-Typ 1 auf 1 oder für SCD-Typ 2 auf 2 festgelegt.Diese Klausel ist optional. Der Standardwert ist SCD-Typ 1. |
track_history_column_list track_history_except_column_list Typ: list Eine Teilmenge der Ausgabespalten, die im Hinblick auf den Verlauf in der Zieltabelle nachverfolgt werden sollen. Verwenden Sie track_history_column_list , um die vollständige Liste der Spalten anzugeben, die nachverfolgt werden sollen. Verwendungtrack_history_except_column_list zum Angeben der Spalten, die von der Nachverfolgung ausgeschlossen werden sollen. Sie können einen Wert entweder als Liste von Zeichenfolgen oder als Spark SQL-col() -Funktionen deklarieren:- track_history_column_list = ["userId", "name", "city"] .- track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") Argumente für col() -Funktionen können keine Qualifizierer enthalten. Beispielsweise können Sie col(userId) verwenden, aber nicht col(source.userId) .Dieser Parameter ist optional. Standardmäßig werden alle Spalten in die Zieltabelle eingeschlossen, wenn kein track_history_column_list - odertrack_history_except_column_list -Argument an die Funktion übergeben wird. |
Implementieren des source
-Arguments
Die apply_changes_from_snapshot()
-Funktion enthält das source
-Argument. Für die Verarbeitung historischer Momentaufnahmen wird erwartet, dass das source
-Argument eine Python-Lambda-Funktion ist, die zwei Werte an die apply_changes_from_snapshot()
-Funktion zurückgibt: Ein Python DataFrame, der die zu verarbeitenden Momentaufnahmen und eine Momentaufnahmeversion enthält.
Im Folgenden sehen Sie die Signatur der Lambda-Funktion:
lambda Any => Optional[(DataFrame, Any)]
- Das Argument für die Lambda-Funktion ist die zuletzt verarbeitete Momentaufnahmeversion.
- Der Rückgabewert der Lambda-Funktion ist
None
oder ein Tupel von zwei Werten: Der erste Wert des Tupels ist ein DataFrame, der die zu verarbeitende Momentaufnahme enthält. Der zweite Wert des Tupels ist die Momentaufnahmeversion, welche die logische Reihenfolge der Momentaufnahme darstellt.
Ein Beispiel, das die Lambda-Funktion implementiert und aufruft:
def next_snapshot_and_version(latest_snapshot_version):
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
apply_changes_from_snapshot(
# ...
source = next_snapshot_and_version,
# ...
)
Die Delta Live Tables-Laufzeit führt jedes Mal die folgenden Schritte aus, wenn die Pipeline, welche die apply_changes_from_snapshot()
-Funktion enthält, ausgelöst wird:
- Führt die
next_snapshot_and_version
-Funktion aus, um den nächsten Snapshot DataFrame und die entsprechende Momentaufnahmeversion zu laden. - Wenn kein DataFrame zurückgegeben wird, wird die Ausführung beendet, und das Pipelineupdate ist als abgeschlossen markiert.
- Erkennt die Änderungen in der neuen Momentaufnahme und wendet sie inkrementell auf die Zieltabelle an.
- Kehrt zu Schritt 1 zurück, um die nächste Momentaufnahme und die zugehörige Version zu laden.
Einschränkungen
Die Python-Schnittstelle von Delta Live Tables unterliegt folgender Einschränkung:
Die pivot()
-Funktion wird nicht unterstützt. Der pivot
-Vorgang in Spark erfordert Eager Loading von Eingabedaten, um das Ausgabeschema zu berechnen. Diese Funktion wird in Delta Live Tables nicht unterstützt.