Läsa in och bearbeta data stegvis med DLT-flöden
Den här artikeln förklarar vad flöden är och hur du kan använda flöden i DLT-pipelines för att stegvis bearbeta data från en källa till en målströmningstabell. I DLT definieras flöden på två sätt:
- Ett flöde definieras automatiskt när du skapar en fråga som uppdaterar en strömmande tabell.
- DLT tillhandahåller också funktioner för att explicit definiera flöden för mer komplex bearbetning, till exempel att lägga till en strömmande tabell från flera strömningskällor.
I den här artikeln beskrivs de implicita flöden som skapas när du definierar en fråga för att uppdatera en strömmande tabell och innehåller sedan information om syntaxen för att definiera mer komplexa flöden.
Vad är ett flöde?
I DLT är ett flöde en strömmande fråga som bearbetar källdata stegvis för att uppdatera en målströmningstabell. De flesta DLT-datauppsättningar som du skapar i en pipeline definierar flödet som en del av frågan och kräver inte att flödet uttryckligen definieras. Du kan till exempel skapa en strömmande tabell i DLT i ett enda DDL-kommando i stället för att använda separata tabell- och flödesinstruktioner för att skapa strömningstabellen:
Anmärkning
Det här CREATE FLOW
exemplet tillhandahålls endast i illustrativt syfte och innehåller nyckelord som inte är giltiga DLT-syntax.
CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")
-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;
CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");
Förutom det standardflöde som definieras av en fråga tillhandahåller DLT Python- och SQL-gränssnitten tilläggsflöde funktioner. Tilläggsflödet stöder bearbetning som kräver läsning av data från flera strömmande datakällor för att uppdatera en enda strömmande tabell. Du kan till exempel använda funktionen lägg till flöde när du har en befintlig strömmande tabell och ett befintligt flöde och vill lägga till en ny strömmande källa som skriver till den befintliga strömningstabellen.
Använd tilläggsflöde för att skriva till en strömmande tabell från flera källströmmar
Använd @append_flow
-dekoratören i Python-gränssnittet eller CREATE FLOW
-satsen i SQL-gränssnittet för att skriva till en strömmande tabell från flera strömmande källor. Använd tilläggsflödet för bearbetning av uppgifter, till exempel följande:
- Lägg till strömmande källor som lägger till data i en befintlig strömmande tabell utan att kräva en fullständig uppdatering. Du kan till exempel ha en tabell som kombinerar regionala data från varje region som du arbetar i. När nya regioner distribueras kan du lägga till nya regiondata i tabellen utan att utföra en fullständig uppdatering. Se Exempel: Att skriva till en strömmande tabell från flera Kafka-ämnen.
- Uppdatera en strömmande tabell genom att lägga till saknade historiska data (återfyllnad). Du har till exempel en befintlig strömningstabell som skrivs till av en Apache Kafka topic. Du har också historiska data lagrade i en tabell som du behöver infoga exakt en gång i strömningstabellen, och du kan inte strömma data eftersom bearbetningen innefattar att utföra en komplex aggregering innan du infogar data. Se Exempel: Kör en engångsdatapåfyllning.
- Kombinera data från flera källor och skriv till en enda strömmande tabell i stället för att använda
UNION
-satsen i en fråga. Med tilläggsflödesbearbetning i stället förUNION
kan du stegvis uppdatera måltabellen utan att köra en fullständig uppdatering. Se Exempel: Använd flödesbearbetning för tillägg i stället förUNION
.
Målet för posterna som matas ut av tilläggsflödesbearbetningen kan vara en befintlig tabell eller en ny tabell. För Python-frågor använder du funktionen create_streaming_table() för att skapa en måltabell.
Viktig
- Om du behöver definiera datakvalitetsbegränsningar med förväntningardefinierar du förväntningarna på måltabellen som en del av funktionen
create_streaming_table()
eller en befintlig tabelldefinition. Du kan inte definiera förväntningar i definitionen@append_flow
. - Flöden identifieras med ett flödesnamnoch det här namnet används för att identifiera kontrollpunkter för direktuppspelning. Användning av flödesnamnet för att identifiera kontrollpunkten innebär följande:
- Om ett befintligt flöde i en pipeline byter namn följer inte kontrollpunkten med, och det omdöpta flödet är i praktiken ett helt nytt flöde.
- Du kan inte återanvända ett flödesnamn i en pipeline eftersom den befintliga kontrollpunkten inte matchar den nya flödesdefinitionen.
Följande är syntaxen för @append_flow
:
Python
import dlt
dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming query>)
SQL
CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.
CREATE FLOW
flow_name
AS INSERT INTO
target_table BY NAME
SELECT * FROM
source;
Exempel: Skriva till en strömmande tabell från flera Kafka-topiker
I följande exempel skapas en strömningstabell med namnet kafka_target
och skriver till den strömmande tabellen från två Kafka-ämnen:
Python
import dlt
dlt.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
Mer information om den read_kafka()
tabellvärdesfunktion som används i SQL-frågorna finns i read_kafka i SQL-språkreferensen.
I Python kan du programmatiskt skapa flera flöden som riktar sig mot en enda tabell. I följande exempel visas det här mönstret för en lista över Kafka-ämnen.
Not
Det här mönstret har samma krav som att använda en for
-loop för att skapa tabeller. Du måste uttryckligen skicka ett Python-värde till funktionen som definierar flödet. Se Skapa tabeller i en for
-loop.
import dlt
dlt.create_streaming_table("kafka_target")
topic_list = ["topic1", "topic2", "topic3"]
for topic_name in topic_list:
@dlt.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)
Exempel: Kör en engångsdatapåfyllning
I följande exempel körs en fråga för att lägga till historiska data i en strömmande tabell:
Notera
Ta bort frågan när du har kört pipelinen en gång för att säkerställa en sann engångsefterfyllnad när återfyllnadsfrågan är en del av en pipeline som körs enligt schemat eller kontinuerligt. Om du vill lägga till nya data om de kommer till backfill-mappen, behåll frågan på plats.
Python
import dlt
@dlt.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dlt.append_flow(target = "csv_target")
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
read_files(
"path/to/sourceDir",
"csv"
);
CREATE FLOW
backfill
AS INSERT INTO
csv_target BY NAME
SELECT * FROM
read_files(
"path/to/backfill/data/dir",
"csv"
);
Exempel: Använd tilläggsflödesbearbetning i stället för UNION
I stället för att använda en fråga med en UNION
-sats kan du använda tilläggsflödesfrågor för att kombinera flera källor och skriva till en enda direktuppspelningstabell. Med hjälp av tilläggsflödesfrågor i stället för UNION
kan du lägga till i en strömmande tabell från flera källor utan att köra en fullständig uppdatering.
Följande Python-exempel innehåller en fråga som kombinerar flera datakällor med en UNION
-sats:
@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
raw_orders_eu =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
return raw_orders_us.union(raw_orders_eu)
Följande exempel ersätter den UNION
frågan med tilläggsflödesfrågor:
Python
dlt.create_streaming_table("raw_orders")
@dlt.append_flow(target="raw_orders")
def raw_oders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE raw_orders;
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/us",
"csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/eu",
"csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/apac",
"csv"
);