Ładowanie i przetwarzanie danych przyrostowo za pomocą przepływów Delta Live Tables
W tym artykule wyjaśniono, czym są przepływy i jak można używać przepływów w potokach Delta Live Tables, aby stopniowo przetwarzać dane ze źródła na docelową tabelę przesyłania strumieniowego. W tabelach delta Live Tables przepływy są definiowane na dwa sposoby:
- Przepływ jest definiowany automatycznie podczas tworzenia zapytania, które aktualizuje tabelę przesyłania strumieniowego.
- Delta Live Tables zapewnia także funkcjonalność do jawnego definiowania przepływów dla bardziej złożonego przetwarzania, takiego jak dołączanie do tabeli strumieniowej z wielu źródeł strumieniowych.
W tym artykule omówiono niejawne przepływy tworzone podczas definiowania zapytania w celu zaktualizowania tabeli przesyłania strumieniowego, a następnie przedstawiono szczegółowe informacje na temat składni definiującej bardziej złożone przepływy.
Co to jest przepływ?
W Delta Live Tables, przepływ to zapytanie strumieniowe, które przetwarza dane źródłowe stopniowo w celu zaktualizowania docelowej tabeli strumieniowej. Większość zestawów danych Delta Live Tables tworzonych w potoku definiuje przepływ jako część zapytania i nie wymaga jawnego definiowania przepływu. Możesz na przykład utworzyć tabelę przesyłania strumieniowego w Delta Live Tables jednym poleceniem DDL, zamiast używać oddzielnych instrukcji dotyczących tabeli i przepływu w celu utworzenia tabeli przesyłania strumieniowego.
Uwaga
Ten przykład CREATE FLOW
jest udostępniany tylko w celach ilustracyjnych i zawiera słowa kluczowe, które nie są prawidłową składnią Delta Live Tables.
CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")
-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;
CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");
Oprócz domyślnego przepływu zdefiniowanego przez zapytanie interfejsy Delta Live Tables Python i SQL zapewniają funkcji przepływu dołączania. Przepływ dołączania obsługuje przetwarzanie, które wymaga odczytywania danych z wielu źródeł przesyłania strumieniowego w celu zaktualizowania pojedynczej tabeli przesyłania strumieniowego. Możesz na przykład użyć funkcji przepływu dołączania, jeśli masz istniejącą tabelę i przepływ przesyłania strumieniowego i chcesz dodać nowe źródło przesyłania strumieniowego, które zapisuje w istniejącej tabeli przesyłania strumieniowego.
Użyj przepływu dołączania, aby zapisywać do tabeli strumieniowej z wielu źródeł strumieniowych
Użyj dekoratora @append_flow
w interfejsie języka Python lub klauzuli CREATE FLOW
w interfejsie SQL, aby zapisać dane do tabeli strumieniowej z wielu źródeł strumieniowych. Użyj przepływu dołączania do przetwarzania zadań, takich jak:
- Dodaj źródła przesyłania strumieniowego, które dołączają dane do istniejącej tabeli przesyłania strumieniowego bez konieczności pełnego odświeżania. Na przykład możesz mieć tabelę łączącą dane regionalne z każdego regionu, w którym działasz. W miarę wdrażania nowych regionów można dodać nowe dane regionu do tabeli bez przeprowadzania pełnego odświeżania. Zobacz Przykład: zapisywanie w tabeli przesyłania strumieniowego z wielu tematów platformy Kafka.
- Zaktualizuj tabelę strumieniowaną, dołączając brakujące dane historyczne (uzupełnianie brakujących danych). Na przykład masz istniejącą tabelę przesyłania strumieniowego zapisywaną przez temat Apache Kafka. pl-PL: Masz również dane historyczne przechowywane w tabeli, które musisz wstawić dokładnie raz do tabeli przesyłania strumieniowego, i nie możesz przesyłać tych danych strumieniowo, ponieważ przetwarzanie obejmuje wykonanie złożonej agregacji przed wstawieniem danych. Zobacz Przykład: Uruchamianie jednorazowego wypełniania danych.
- Połącz dane z wielu źródeł i zapisz w jednej tabeli przesyłania strumieniowego zamiast używania klauzuli
UNION
w zapytaniu. Użycie przetwarzania przepływu danych przez dołączanie zamiastUNION
umożliwia przyrostowe aktualizowanie tabeli docelowej bez konieczności uruchamiania pełnej aktualizacji odświeżenia. Zobacz Przykład: Użyj przetwarzania przepływu dołączania zamiast funkcji UNION.
Docelowym miejscem dla rekordów wygenerowanych przez przetwarzanie przepływu dołączania może być istniejąca tabela lub nowa tabela. W przypadku zapytań języka Python użyj funkcji create_streaming_table(), aby utworzyć tabelę docelową.
Ważne
- Jeśli musisz zdefiniować ograniczenia dotyczące jakości danych z oczekiwaniami , zdefiniuj oczekiwania w tabeli docelowej w ramach funkcji
create_streaming_table()
lub istniejącej definicji tabeli. Nie można zdefiniować oczekiwań w@append_flow
definicji. - Przepływy są identyfikowane przez nazwę przepływu, a ta nazwa służy do identyfikowania punktów kontrolnych przesyłania strumieniowego. Użycie nazwy przepływu do identyfikowania punktu kontrolnego oznacza następujące kwestie:
- Jeśli nazwa istniejącego przepływu w potoku zostanie zmieniona, punkt kontrolny nie jest przenoszony, a zmieniony przepływ jest w rzeczywistości całkowicie nowym przepływem.
- Nie można ponownie użyć nazwy przepływu w potoku, ponieważ istniejący punkt kontrolny nie będzie zgodny z nową definicją przepływu.
Poniżej przedstawiono składnię dla elementu @append_flow
:
Python
import dlt
dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming query>)
SQL
CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.
CREATE FLOW
flow_name
AS INSERT INTO
target_table BY NAME
SELECT * FROM
source;
Przykład: zapisywanie w tabeli przesyłania strumieniowego z wielu tematów platformy Kafka
W poniższych przykładach tworzy się tabela przesyłania strumieniowego o nazwie kafka_target
i zapisuje dane z dwóch tematów Kafka do tej tabeli.
Python
import dlt
dlt.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
Aby dowiedzieć się więcej na temat funkcji tabelarycznej read_kafka()
używanej w zapytaniach SQL, zobacz read_kafka w dokumentacji języka SQL.
W języku Python można programowo utworzyć wiele przepływów przeznaczonych dla jednej tabeli. Poniższy przykład przedstawia ten wzorzec dla listy tematów platformy Kafka.
Uwaga
Ten wzorzec ma te same wymagania co tworzenie tabel przy użyciu pętli for
. Musisz jawnie przekazać wartość języka Python do funkcji definiującej przepływ. Zobacz Create tables in a for loop (Tworzenie tabel w pętli).
import dlt
dlt.create_streaming_table("kafka_target")
topic_list = ["topic1", "topic2", "topic3"]
for topic_name in topic_list:
@dlt.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)
Przykład: uruchamianie jednorazowego wypełniania danych
W poniższych przykładach uruchomiono zapytanie, aby dołączyć dane historyczne do tabeli przesyłania strumieniowego:
Uwaga
Aby zapewnić rzeczywiste pojedyncze wypełnienie, gdy zapytanie wypełniania jest częścią potoku uruchamianego zgodnie z harmonogramem lub stale, usuń zapytanie po jednokrotnym uruchomieniu potoku. Aby dołączyć nowe dane, jeśli pojawią się w katalogu wypełniania, pozostaw to zapytanie w miejscu.
Python
import dlt
@dlt.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dlt.append_flow(target = "csv_target")
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
read_files(
"path/to/sourceDir",
"csv"
);
CREATE FLOW
backfill
AS INSERT INTO
csv_target BY NAME
SELECT * FROM
read_files(
"path/to/backfill/data/dir",
"csv"
);
Przykład: Użyj przetwarzania przepływu dołączania zamiast UNION
Zamiast używać zapytania z klauzulą UNION
, można użyć zapytań dołączających, aby połączyć wiele źródeł i zapisać w jednej tabeli strumieniowej. Używanie zapytań typu 'dołącz' zamiast UNION
pozwala dołączyć do tabeli danych strumieniowych z wielu źródeł bez uruchamiania pełnego odświeżenia.
Poniższy przykład języka Python zawiera zapytanie, które łączy wiele źródeł danych z klauzulą UNION
:
@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
raw_orders_eu =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
return raw_orders_us.union(raw_orders_eu)
W poniższych przykładach zapytanie jest zastępowane UNION
zapytaniami przepływu dołączania:
Python
dlt.create_streaming_table("raw_orders")
@dlt.append_flow(target="raw_orders")
def raw_oders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE raw_orders;
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/us",
"csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/eu",
"csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/apac",
"csv"
);