Inkrementelles Laden und Verarbeiten von Daten mit Delta Live Tables-Flows
In diesem Artikel wird erläutert, was Flows sind und wie Sie Flows in Delta Live Tables-Pipelines verwenden können, um Daten aus einer Quelle inkrementell in einer Zielstreamingtabelle zu verarbeiten. In Delta Live Tables werden Flows auf zwei Arten definiert:
- Ein Flow wird automatisch definiert, wenn Sie eine Abfrage erstellen, die eine Streamingtabelle aktualisiert.
- Delta Live Tables bietet auch Funktionen zum expliziten Definieren von Flows für eine komplexere Verarbeitung, z. B. das Anfügen an eine Streamingtabelle aus mehreren Streamingquellen.
In diesem Artikel werden die impliziten Flows erläutert, die erstellt werden, wenn Sie eine Abfrage zum Aktualisieren einer Streamingtabelle definieren. Anschließend werden Details zur Syntax zum Definieren komplexerer Flüsse bereitgestellt.
Was ist ein Flow?
In Delta Live Tables ist ein Flow eine Streamingabfrage, die Quelldaten inkrementell verarbeitet, um eine Zielstreamingtabelle zu aktualisieren. Die meisten Delta Live Tables-Datasets, die Sie in einer Pipeline erstellen, definieren den Flow als Teil der Abfrage und erfordern keine explizite Definition des Flows. Sie erstellen beispielsweise eine Streamingtabelle in Delta Live Tables in einem einzigen DDL-Befehl, anstatt separate Tabellen- und Flowanweisungen zum Erstellen der Streamingtabelle zu verwenden:
Hinweis
Dieses CREATE FLOW
-Beispiel wird nur zur Veranschaulichung bereitgestellt und enthält Schlüsselwörter, die keine gültige Syntax für Delta Live Tables sind.
CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")
-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;
CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");
Zusätzlich zum von einer Abfrage definierten Standardflow stellen die Python- und SQL-Schnittstelle für Delta Live Tables eine Funktion für Anfügeflows bereit. Der Anfügeflow unterstützt die Verarbeitung, für die Daten aus mehreren Streamingquellen gelesen werden müssen, um eine Streamingtabelle zu aktualisieren. Sie können z. B. die Anfügeflowfunktion verwenden, wenn Sie über eine vorhandene Streamingtabelle und einen Flow verfügen und eine neue Streamingquelle hinzufügen möchten, die in diese vorhandene Streamingtabelle schreibt.
Schreiben in eine Streamingtabelle aus mehreren Quelldatenströmen mithilfe des Anfügeflows
Verwenden Sie den @append_flow
-Decorator in der Python-Schnittstelle oder die CREATE FLOW
-Klausel in der SQL-Schnittstelle, um aus mehreren Streamingquellen in eine Streamingtabelle zu schreiben. Verwenden Sie den Anfügeflow für Verarbeitungsaufgaben wie die folgenden:
- Hinzufügen von Streamingquellen, die Daten an eine vorhandene Streamingtabelle anfügen, ohne dass eine vollständige Aktualisierung erforderlich ist. Sie können beispielsweise eine Tabelle haben, in der regionale Daten aus jeder Region kombiniert werden, in der Sie arbeiten. Wenn neue Regionen eingeführt werden, können Sie der Tabelle die neuen Regionsdaten hinzufügen, ohne eine vollständige Aktualisierung durchzuführen. Siehe Beispiel: Schreiben in eine Streamingtabelle aus mehreren Kafka-Themen.
- Aktualisieren Sie eine Streamingtabelle, indem Sie fehlende historische Daten (Backfilling) anfügen. Sie haben beispielsweise eine vorhandene Streamingtabelle, in die ein Apache Kafka-Thema geschrieben wird. Sie haben auch historische Daten in einer Tabelle gespeichert, die Sie genau einmal in die Streamingtabelle einfügen müssen, und Sie können die Daten nicht streamen, da Ihre Verarbeitung das Durchführen einer komplexen Aggregation umfasst, bevor Sie die Daten einfügen. Siehe Beispiel: Ausführen eines einmaligen Datenabgleichs.
- Kombinieren von Daten aus mehreren Quellen und Schreiben in eine einzelne Streamingtabelle, anstatt die
UNION
-Klausel in einer Abfrage zu verwenden. Die Verwendung der Anfügeflowverarbeitung anstelle vonUNION
ermöglicht es Ihnen, die Zieltabelle inkrementell zu aktualisieren, ohne ein vollständiges Aktualisierungsupdateauszuführen. Siehe Beispiel: Verwenden der Anfügeflowverarbeitung anstelle von UNION.
Das Ziel für die Datensätze, die von der Anfügeflowverarbeitung ausgegeben werden, kann eine vorhandene Tabelle oder eine neue Tabelle sein. Verwenden Sie für Python-Abfragen die create_streaming_table()-Funktion, um eine Zieltabelle zu erstellen.
Wichtig
- Wenn Sie Datenqualitätseinschränkungen mit Erwartungen definieren müssen, definieren Sie die Erwartungen in der Zieltabelle als Teil der
create_streaming_table()
-Funktion oder in einer vorhandenen Tabellendefinition. Sie können die Erwartungen in der@append_flow
Definition nicht definieren. - Flows werden durch einen Flownamenidentifiziert, und dieser Name wird verwendet, um Streamingprüfpunkte zu identifizieren. Die Verwendung des Flownamens zur Identifizierung des Prüfpunkts bedeutet Folgendes:
- Wenn ein vorhandener Flow in einer Pipeline umbenannt wird, wird der Prüfpunkt nicht übernommen, und der umbenannte Flow ist effektiv ein völlig neuer Flow.
- Sie können einen Flownamen in einer Pipeline nicht wiederverwenden, da der vorhandene Prüfpunkt nicht mit der neuen Flowdefinition übereinstimmt.
Es folgt die Syntax für @append_flow
:
Python
import dlt
dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming query>)
SQL
CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.
CREATE FLOW
flow_name
AS INSERT INTO
target_table BY NAME
SELECT * FROM
source;
Beispiel: Schreiben in eine Streamingtabelle aus mehreren Kafka-Themen
Im folgenden Beispiel wird eine Streamingtabelle mit dem Namen kafka_target
erstellt, und es wird aus zwei Kafka-Themen in die Streamingtabelle geschrieben:
Python
import dlt
dlt.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
Weitere Informationen zur in den SQL-Abfragen verwendeten read_kafka()
-Tabellenwertfunktion finden Sie unter read_kafka in der SQL-Sprachreferenz.
Beispiel: Ausführen eines einmaligen Datenabgleichs
Im folgenden Beispiel wird eine Abfrage ausgeführt, um historische Daten an eine Streamingtabelle anzufügen:
Hinweis
Um sicherzustellen, dass ein wahrer einmaliger Rücklauf ausgeführt wird, wenn die Backfill-Abfrage Teil einer Pipeline ist, die auf geplanter oder kontinuierlicher Basis ausgeführt wird, entfernen Sie die Abfrage nach der Ausführung der Pipeline einmal. Um neue Daten anzufügen, wenn sie im Backfill-Verzeichnis eingeht, lassen Sie die Abfrage an Ort und Stelle.
Python
import dlt
@dlt.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dlt.append_flow(target = "csv_target")
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
read_files(
"path/to/sourceDir",
"csv"
);
CREATE FLOW
backfill
AS INSERT INTO
csv_target BY NAME
SELECT * FROM
read_files(
"path/to/backfill/data/dir",
"csv"
);
Beispiel: Verwenden der Anfügeflowverarbeitung anstelle von UNIONUNION
Anstatt eine Abfrage mit einer UNION
-Klausel zu verwenden, können Sie Anfügeflowabfragen verwenden, um mehrere Quellen zu kombinieren und in eine einzelne Streamingtabelle zu schreiben. Mithilfe von Anfügeflowabfragen anstelle von UNION
können Sie an eine Streamingtabelle aus mehreren Quellen anfügen, ohne eine vollständige Aktualisierung auszuführen.
Das folgende Python-Beispiel enthält eine Abfrage, die mehrere Datenquellen mit einer UNION
-Klausel kombiniert:
@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
raw_orders_eu =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
return raw_orders_us.union(raw_orders_eu)
In den folgenden Beispielen wird die UNION
-Abfrage durch Anfügeflowabfragen ersetzt:
Python
dlt.create_streaming_table("raw_orders")
@dlt.append_flow(target="raw_orders")
def raw_oders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE raw_orders;
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/us",
"csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/eu",
"csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/apac",
"csv"
);