Ładowanie i przetwarzanie danych przyrostowo za pomocą przepływów delta live tables
W tym artykule wyjaśniono, jakie przepływy są i jak można używać przepływów w potokach delta Live Tables, aby przyrostowo przetwarzać dane ze źródła do docelowej tabeli przesyłania strumieniowego. W tabelach delta Live Tables przepływy są definiowane na dwa sposoby:
- Przepływ jest definiowany automatycznie podczas tworzenia zapytania, które aktualizuje tabelę przesyłania strumieniowego.
- Delta Live Tables udostępnia również funkcje jawnego definiowania przepływów w celu bardziej złożonego przetwarzania, takiego jak dołączanie do tabeli przesyłania strumieniowego z wielu źródeł przesyłania strumieniowego.
W tym artykule omówiono niejawne przepływy tworzone podczas definiowania zapytania w celu zaktualizowania tabeli przesyłania strumieniowego, a następnie przedstawiono szczegółowe informacje na temat składni definiującej bardziej złożone przepływy.
Co to jest przepływ?
W tabelach delta Live Tables przepływ jest zapytaniem przesyłanym strumieniowo, które przetwarza dane źródłowe przyrostowo w celu zaktualizowania docelowej tabeli przesyłania strumieniowego. Większość zestawów danych delta Live Tables tworzonych w potoku definiuje przepływ jako część zapytania i nie wymaga jawnego definiowania przepływu. Możesz na przykład utworzyć tabelę przesyłania strumieniowego w tabelach delta live w jednym poleceniu DDL zamiast używać oddzielnych instrukcji tabeli i przepływu w celu utworzenia tabeli przesyłania strumieniowego:
Uwaga
Ten CREATE FLOW
przykład jest udostępniany tylko do celów ilustracyjnych i zawiera słowa kluczowe, które nie są prawidłową składnią tabel delta live tables.
CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")
-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;
CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");
Oprócz domyślnego przepływu zdefiniowanego przez zapytanie interfejsy Delta Live Tables Python i SQL zapewniają funkcje przepływu dołączania. Przepływ dołączania obsługuje przetwarzanie, które wymaga odczytywania danych z wielu źródeł przesyłania strumieniowego w celu zaktualizowania pojedynczej tabeli przesyłania strumieniowego. Możesz na przykład użyć funkcji przepływu dołączania, jeśli masz istniejącą tabelę i przepływ przesyłania strumieniowego i chcesz dodać nowe źródło przesyłania strumieniowego, które zapisuje w istniejącej tabeli przesyłania strumieniowego.
Używanie przepływu dołączania do zapisywania w tabeli przesyłania strumieniowego z wielu strumieni źródłowych
Uwaga
Aby można było używać przetwarzania przepływu dołączania, potok musi być skonfigurowany do korzystania z kanału w wersji zapoznawczej.
Użyj dekoratora @append_flow
w interfejsie języka Python lub CREATE FLOW
klauzuli w interfejsie SQL, aby zapisać w tabeli przesyłania strumieniowego z wielu źródeł przesyłania strumieniowego. Użyj przepływu dołączania do przetwarzania zadań, takich jak:
- Dodaj źródła przesyłania strumieniowego, które dołączają dane do istniejącej tabeli przesyłania strumieniowego bez konieczności pełnego odświeżania. Na przykład możesz mieć tabelę łączącą dane regionalne z każdego regionu, w którym działasz. W miarę wdrażania nowych regionów można dodać nowe dane regionu do tabeli bez przeprowadzania pełnego odświeżania. Zobacz Przykład: Zapisywanie w tabeli przesyłania strumieniowego z wielu tematów platformy Kafka.
- Zaktualizuj tabelę przesyłania strumieniowego, dołączając brakujące dane historyczne (wypełnianie). Na przykład masz istniejącą tabelę przesyłania strumieniowego napisaną na podstawie tematu platformy Apache Kafka. Istnieją również dane historyczne przechowywane w tabeli, która jest potrzebna dokładnie raz do tabeli przesyłania strumieniowego i nie można przesyłać strumieniowo danych, ponieważ przetwarzanie obejmuje wykonywanie złożonej agregacji przed wstawieniem danych. Zobacz Przykład: Uruchamianie jednorazowego wypełniania danych.
- Łączenie danych z wielu źródeł i zapisywanie w jednej tabeli przesyłania strumieniowego zamiast używania
UNION
klauzuli w zapytaniu. Użycie przetwarzania przepływu dołączania zamiastUNION
umożliwia przyrostowe aktualizowanie tabeli docelowej bez konieczności uruchamiania aktualizacji pełnego odświeżania. Zobacz Przykład: Użyj przetwarzania przepływu dołączania zamiast funkcji UNION.
Elementem docelowym dla rekordów wyjściowych przez przetwarzanie przepływu dołączania może być istniejąca tabela lub nowa tabela. W przypadku zapytań języka Python użyj funkcji create_streaming_table(), aby utworzyć tabelę docelową.
Ważne
- Jeśli musisz zdefiniować ograniczenia dotyczące jakości danych z oczekiwaniami, zdefiniuj oczekiwania w tabeli docelowej w ramach
create_streaming_table()
funkcji lub istniejącej definicji tabeli. Nie można zdefiniować oczekiwań w@append_flow
definicji. - Przepływy są identyfikowane przez nazwę przepływu, a ta nazwa służy do identyfikowania punktów kontrolnych przesyłania strumieniowego. Użycie nazwy przepływu do identyfikowania punktu kontrolnego oznacza następujące kwestie:
- Jeśli nazwa istniejącego przepływu w potoku zostanie zmieniona, punkt kontrolny nie jest przenoszony, a zmieniony przepływ jest w rzeczywistości całkowicie nowym przepływem.
- Nie można ponownie użyć nazwy przepływu w potoku, ponieważ istniejący punkt kontrolny nie będzie zgodny z nową definicją przepływu.
Poniżej przedstawiono składnię dla elementu @append_flow
:
Python
import dlt
dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming query>)
SQL
CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.
CREATE FLOW
flow_name
AS INSERT INTO
target_table BY NAME
SELECT * FROM
source;
Przykład: zapisywanie w tabeli przesyłania strumieniowego z wielu tematów platformy Kafka
W poniższych przykładach tworzona jest tabela przesyłania strumieniowego o nazwie kafka_target
i zapisuje w tej tabeli przesyłania strumieniowego z dwóch tematów platformy Kafka:
Python
import dlt
dlt.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
Aby dowiedzieć się więcej na temat read_kafka()
funkcji wartości tabeli używanej w zapytaniach SQL, zobacz read_kafka w dokumentacji języka SQL.
Przykład: uruchamianie jednorazowego wypełniania danych
W poniższych przykładach uruchomiono zapytanie, aby dołączyć dane historyczne do tabeli przesyłania strumieniowego:
Uwaga
Aby zapewnić rzeczywiste jednorazowe wypełnienie, gdy zapytanie wypełniania jest częścią potoku, który jest uruchamiany zgodnie z harmonogramem lub stale, usuń zapytanie po uruchomieniu potoku raz. Aby dołączyć nowe dane, jeśli pojawią się w katalogu wypełniania, pozostaw to zapytanie w miejscu.
Python
import dlt
@dlt.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dlt.append_flow(target = "csv_target")
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
read_files(
"path/to/sourceDir",
"csv"
);
CREATE FLOW
backfill
AS INSERT INTO
csv_target BY NAME
SELECT * FROM
read_files(
"path/to/backfill/data/dir",
"csv"
);
Przykład: Użyj przetwarzania przepływu dołączania zamiast UNION
Zamiast używać zapytania z klauzulą UNION
, możesz użyć zapytań przepływu dołączania, aby połączyć wiele źródeł i zapisać w jednej tabeli przesyłania strumieniowego. Używanie zapytań przepływu dołączania zamiast UNION
umożliwia dołączanie do tabeli przesyłania strumieniowego z wielu źródeł bez uruchamiania pełnego odświeżania.
Poniższy przykład języka Python zawiera zapytanie, które łączy wiele źródeł danych z klauzulą UNION
:
@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
raw_orders_eu =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
return raw_orders_us.union(raw_orders_eu)
W poniższych przykładach zapytanie jest zastępowane UNION
zapytaniami przepływu dołączania:
Python
dlt.create_streaming_table("raw_orders")
@dlt.append_flow(target="raw_orders")
def raw_oders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE raw_orders;
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/us",
"csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/eu",
"csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/apac",
"csv"
);