Carica ed elabora i dati in modo incrementale con i flussi delle Delta Live Tables
Questo articolo illustra cosa sono i flussi e come utilizzare i flussi nelle pipeline di Delta Live Tables per elaborare in modo incrementale i dati da un'origine verso una tabella di streaming di destinazione. In Tabelle live Delta i flussi vengono definiti in due modi:
- Un flusso viene definito automaticamente quando si crea una query che aggiorna una tabella di streaming.
- Le tabelle live delta forniscono anche funzionalità per definire in modo esplicito i flussi per un'elaborazione più complessa, ad esempio l'aggiunta a una tabella di streaming da più origini di streaming.
Questo articolo illustra i flussi impliciti creati quando si definisce una query per aggiornare una tabella di streaming e vengono quindi forniti dettagli sulla sintassi per definire flussi più complessi.
Che cos'è un flusso?
In Delta Live Tables, un flusso è una query di streaming che elabora i dati di origine in modo incrementale per aggiornare una tabella di streaming di destinazione. La maggior parte degli insiemi di dati Delta Live Tables creati in una pipeline definisce il flusso come parte della query e non richiede di definire esplicitamente il flusso. Ad esempio, si crea una tabella di streaming in Tabelle Live Delta in un singolo comando DDL anziché usare istruzioni di tabella e flusso separate per creare la tabella di streaming:
Nota
Questo esempio CREATE FLOW
viene fornito solo a scopo illustrativo e include parole chiave che non sono una sintassi delta live tables valida.
CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")
-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;
CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");
Oltre al flusso predefinito definito da una query, le interfacce Python e SQL di Delta Live Tables forniscono funzionalità di accodamento. Il flusso di aggiunta supporta l'elaborazione che richiede la lettura di dati da più fonti di streaming per aggiornare una tabella di streaming. Ad esempio, è possibile usare la funzionalità di aggiunta di flussi quando si dispone di una tabella di streaming e un flusso esistenti e si vuole aggiungere una nuova origine di streaming che aggiorna questa tabella di streaming esistente.
Usare il flusso di accodamento per scrivere in una tabella di streaming da più flussi di origine
Usare il decoratore @append_flow
nell'interfaccia Python o la clausola CREATE FLOW
nell'interfaccia SQL per scrivere in una tabella di streaming attingendo da più origini di streaming. Usare il flusso di accodamento per l'elaborazione di attività come le seguenti:
- Aggiungere origini di streaming che aggiungono dati a una tabella di streaming esistente senza richiedere un aggiornamento completo. Ad esempio, potrebbe esserci una tabella che combina i dati regionali di ogni regione in cui si opera. Man mano che vengono implementate nuove aree, è possibile aggiungere i dati della nuova area alla tabella senza eseguire un aggiornamento completo. Consultare esempio: Scrivere su una tabella di streaming da diversi temi Kafka.
- Aggiornare una tabella di streaming aggiungendo dati cronologici mancanti (riempimento). Ad esempio, si dispone di una tabella di streaming esistente scritta in da un argomento Apache Kafka. I dati cronologici sono archiviati anche in una tabella che è necessario inserire esattamente una volta nella tabella di streaming e non è possibile trasmettere i dati perché l'elaborazione include l'esecuzione di un'aggregazione complessa prima di inserire i dati. Vedere Esempio: Eseguire un backfill di dati monouso.
- Combinare dati da più origini e scrivere in una singola tabella di streaming anziché usare la clausola
UNION
in una query. L'elaborazione del flusso di accodamento anzichéUNION
consente di aggiornare la tabella di destinazione in modo incrementale senza eseguire un aggiornamento completo . Vedere Esempio: Usare l'elaborazione del flusso di accodamento anziché UNION.
La destinazione per i record restituiti dall'elaborazione del flusso di accodamento può essere una tabella esistente o una nuova tabella. Per le query Python, usare la funzione create_streaming_table() per creare una tabella di destinazione.
Importante
- Se devi definire vincoli di qualità dei dati con aspettative , definisci le aspettative sulla tabella di destinazione come parte della funzione
create_streaming_table()
o su una definizione di tabella esistente. Non è possibile definire le aspettative nella@append_flow
definizione. - I flussi vengono identificati da un nome di flusso e questo nome viene usato per identificare i checkpoint di streaming. L'uso del nome del flusso per identificare il checkpoint indica quanto segue:
- Se un flusso esistente in una pipeline viene rinominato, il checkpoint non viene portato avanti e il flusso rinominato è effettivamente un flusso completamente nuovo.
- Non è possibile riutilizzare un nome di flusso in una pipeline, perché il checkpoint esistente non corrisponde alla nuova definizione del flusso.
Di seguito è riportata la sintassi per @append_flow
:
Python
import dlt
dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming query>)
SQL
CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.
CREATE FLOW
flow_name
AS INSERT INTO
target_table BY NAME
SELECT * FROM
source;
esempio : Scrivere in una tabella di streaming da più argomenti Kafka
Gli esempi seguenti creano una tabella di streaming denominata kafka_target
e scrive in tale tabella di streaming da due argomenti Kafka:
Python
import dlt
dlt.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
Per altre informazioni sulla funzione con valori di tabella read_kafka()
usata nelle query SQL, vedere read_kafka nelle informazioni di riferimento sul linguaggio SQL.
In Python è possibile creare più flussi a livello di codice destinati a una singola tabella. Nell'esempio seguente viene illustrato questo modello per un elenco di argomenti Kafka.
Nota
Questo modello ha gli stessi requisiti dell'uso di un ciclo for
per creare tabelle. È necessario passare in modo esplicito un valore Python alla funzione che definisce il flusso. Vedere Creare tabelle in un ciclo for.
import dlt
dlt.create_streaming_table("kafka_target")
topic_list = ["topic1", "topic2", "topic3"]
for topic_name in topic_list:
@dlt.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)
Esempio: Eseguire un riempimento dati monouso
Gli esempi seguenti eseguono una query per aggiungere dati cronologici a una tabella di streaming:
Nota
Per garantire un backfill unico quando la query di backfill fa parte di una pipeline che viene eseguita su base pianificata o continuamente, rimuovere la query dopo aver eseguito la pipeline una sola volta. Per aggiungere nuovi dati se arrivano nella directory backfill, lasciare la query sul posto.
Python
import dlt
@dlt.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dlt.append_flow(target = "csv_target")
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
read_files(
"path/to/sourceDir",
"csv"
);
CREATE FLOW
backfill
AS INSERT INTO
csv_target BY NAME
SELECT * FROM
read_files(
"path/to/backfill/data/dir",
"csv"
);
Esempio: usare l'elaborazione del flusso di accodamento anziché UNION
Anziché utilizzare una query con una clausola UNION
, è possibile utilizzare query di accodamento per combinare più origini e scrivere in una singola tabella di streaming. L'uso di query di appendimento al posto di UNION
consente di aggiungere dati a una tabella di streaming da più origini senza eseguire un aggiornamento completo .
L'esempio python seguente include una query che combina più origini dati con una UNION
clausola :
@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
raw_orders_eu =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
return raw_orders_us.union(raw_orders_eu)
Gli esempi seguenti sostituiscono la UNION
query con le query del flusso di accodamento:
Python
dlt.create_streaming_table("raw_orders")
@dlt.append_flow(target="raw_orders")
def raw_oders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE raw_orders;
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/us",
"csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/eu",
"csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/apac",
"csv"
);