Gegevens incrementeel laden en verwerken met Delta Live Tables-stromen
In dit artikel wordt uitgelegd wat stromen zijn en hoe u stromen in Delta Live Tables-pijplijnen kunt gebruiken om gegevens van een bron naar een doelstreamingtabel incrementeel te verwerken. In Delta Live Tables worden stromen op twee manieren gedefinieerd:
- Een stroom wordt automatisch gedefinieerd wanneer u een query maakt waarmee een streamingtabel wordt bijgewerkt.
- Delta Live Tables biedt ook functionaliteit om stromen expliciet te definiëren voor complexere verwerking, zoals toevoegen aan een streamingtabel vanuit meerdere streamingbronnen.
In dit artikel worden de impliciete stromen besproken die worden gemaakt wanneer u een query definieert voor het bijwerken van een streamingtabel en vindt u vervolgens details over de syntaxis om complexere stromen te definiëren.
Wat is een stroom?
In Delta Live Tables is een flow een streamingquery die brongegevens incrementeel verwerkt om een doelstreamingtabel bij te werken. De meeste Delta Live Tables-gegevenssets die u in een pijplijn maakt, definiëren de stroom als onderdeel van de query en hoeven de stroom niet expliciet te definiëren. U maakt bijvoorbeeld een streamingtabel in Delta Live Tables in één DDL-opdracht in plaats van afzonderlijke tabel- en stroominstructies te gebruiken om de streamingtabel te maken:
Notitie
Dit CREATE FLOW
voorbeeld wordt alleen ter illustratie gegeven en bevat trefwoorden die geen geldige syntaxis van Delta Live Tables zijn.
CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")
-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;
CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");
Naast de standaardstroom die door een query is gedefinieerd, bieden de Python- en SQL-interfaces van Delta Live Tables toevoegstroom functionaliteit. Append-stroom ondersteunt dataverwerking die het lezen van gegevens uit meerdere streamingbronnen vereist, om een enkele streamingtabel bij te werken. U kunt bijvoorbeeld toevoegstroomfunctionaliteit gebruiken wanneer u een bestaande streamingtabel en stroom hebt en een nieuwe streamingbron wilt toevoegen die naar deze bestaande streamingtabel schrijft.
Toevoegstroom gebruiken om vanuit meerdere bronstreams naar een streamingtabel te schrijven
Gebruik de @append_flow
-decorator in de Python-interface of de CREATE FLOW
-clausule in de SQL-interface om vanuit meerdere streamingbronnen naar een streamingtabel te schrijven. Gebruik toevoegstroom voor het verwerken van taken, zoals de volgende:
- Voeg streamingbronnen toe die gegevens toevoegen aan een bestaande streamingtabel zonder dat u een volledige vernieuwing nodig hebt. U hebt bijvoorbeeld een tabel waarin regionale gegevens worden gecombineerd van elke regio waarin u werkt. Wanneer er nieuwe regio's worden geïmplementeerd, kunt u de nieuwe regiogegevens toevoegen aan de tabel zonder dat u een volledige vernieuwing hoeft uit te voeren. Zie Voorbeeld: Naar een streamingtabel schrijven vanuit meerdere Kafka-topics.
- Werk een streamingtabel bij door ontbrekende historische gegevens toe te voegen (backfilling). U hebt bijvoorbeeld een bestaande streamingtabel die door een Apache Kafka-onderwerp wordt beschreven. U hebt ook historische gegevens opgeslagen in een tabel die u precies eenmaal in de streamingtabel hebt ingevoegd en u kunt de gegevens niet streamen omdat uw verwerking een complexe aggregatie omvat voordat u de gegevens invoegt. Zie voorbeeld: Een eenmalige gegevensbackfill uitvoeren.
- Combineer gegevens uit meerdere bronnen en schrijf naar één streamingtabel in plaats van de
UNION
component in een query te gebruiken. Als u toevoegstroomverwerking gebruikt in plaats vanUNION
kunt u de doeltabel incrementeel bijwerken zonder een volledige vernieuwingsupdate uit te voeren. Zie voorbeeld: De verwerking van toevoegstromen gebruiken in plaats van UNION.
Het doel voor de recorduitvoer door de verwerking van de toevoegstroom kan een bestaande tabel of een nieuwe tabel zijn. Gebruik voor Python-query's de create_streaming_table() functie om een doeltabel te maken.
Belangrijk
- Als u beperkingen voor gegevenskwaliteit wilt definiëren met verwachtingen, definieert u de verwachtingen voor de doeltabel als onderdeel van de
create_streaming_table()
functie of in een bestaande tabeldefinitie. U kunt geen verwachtingen definiëren in de@append_flow
definitie. - Stromen worden geïdentificeerd door een stroomnaam en deze naam wordt gebruikt om streamingcontrolepunten te identificeren. Het gebruik van de stroomnaam om het controlepunt te identificeren betekent het volgende:
- Als de naam van een bestaande stroom in een pijplijn wordt gewijzigd, wordt het controlepunt niet overgedragen en is de hernoemde stroom een volledig nieuwe stroom.
- U kunt een stroomnaam niet opnieuw gebruiken in een pijplijn, omdat het bestaande controlepunt niet overeenkomt met de nieuwe stroomdefinitie.
Hier volgt de syntaxis voor @append_flow
:
Python
import dlt
dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming query>)
SQL
CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.
CREATE FLOW
flow_name
AS INSERT INTO
target_table BY NAME
SELECT * FROM
source;
voorbeeld: schrijven naar een streamingtabel vanuit meerdere Kafka-onderwerpen
In de volgende voorbeelden wordt een streamingtabel met de naam kafka_target
gemaakt en naar die streamingtabel geschreven vanuit twee Kafka-onderwerpen:
Python
import dlt
dlt.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
Zie read_kafka in de sql-taalverwijzing voor meer informatie over de read_kafka()
tabelwaardefunctie die wordt gebruikt in de SQL-query's.
Voorbeeld: Een eenmalige gegevensbackfill uitvoeren
In de volgende voorbeelden wordt een query uitgevoerd om historische gegevens toe te voegen aan een streamingtabel:
Notitie
Als u een echte eenmalige backfill wilt garanderen wanneer de backfillquery deel uitmaakt van een pijplijn die op geplande basis of continu wordt uitgevoerd, verwijdert u de query nadat de pijplijn eenmaal is uitgevoerd. Als u nieuwe gegevens wilt toevoegen als deze binnenkomt in de map backfill, laat u de query op zijn plaats.
Python
import dlt
@dlt.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dlt.append_flow(target = "csv_target")
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
read_files(
"path/to/sourceDir",
"csv"
);
CREATE FLOW
backfill
AS INSERT INTO
csv_target BY NAME
SELECT * FROM
read_files(
"path/to/backfill/data/dir",
"csv"
);
Voorbeeld: Toevoegstroomverwerking gebruiken in plaats van UNION
In plaats van een query met een UNION
-component te gebruiken, kunt u toevoegstroomquery's gebruiken om meerdere bronnen te combineren en naar één streamingtabel te schrijven. Door append-queries te gebruiken in plaats van UNION
kunt u vanuit meerdere bronnen gegevens toevoegen aan een streaming-tabel zonder een volledige vernieuwingsoperatieuit te voeren.
Het volgende Python-voorbeeld bevat een query die meerdere gegevensbronnen combineert met een UNION
component:
@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
raw_orders_eu =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
return raw_orders_us.union(raw_orders_eu)
De volgende voorbeelden vervangen de UNION
query door toevoegstroomquery's:
Python
dlt.create_streaming_table("raw_orders")
@dlt.append_flow(target="raw_orders")
def raw_oders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE raw_orders;
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/us",
"csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/eu",
"csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/apac",
"csv"
);