Udostępnij za pośrednictwem


Ładowanie i przetwarzanie danych przyrostowo za pomocą przepływów delta live tables

W tym artykule wyjaśniono, jakie przepływy są i jak można używać przepływów w potokach delta Live Tables, aby przyrostowo przetwarzać dane ze źródła do docelowej tabeli przesyłania strumieniowego. W tabelach delta Live Tables przepływy są definiowane na dwa sposoby:

  1. Przepływ jest definiowany automatycznie podczas tworzenia zapytania, które aktualizuje tabelę przesyłania strumieniowego.
  2. Delta Live Tables udostępnia również funkcje jawnego definiowania przepływów w celu bardziej złożonego przetwarzania, takiego jak dołączanie do tabeli przesyłania strumieniowego z wielu źródeł przesyłania strumieniowego.

W tym artykule omówiono niejawne przepływy tworzone podczas definiowania zapytania w celu zaktualizowania tabeli przesyłania strumieniowego, a następnie przedstawiono szczegółowe informacje na temat składni definiującej bardziej złożone przepływy.

Co to jest przepływ?

W tabelach delta Live Tables przepływ jest zapytaniem przesyłanym strumieniowo, które przetwarza dane źródłowe przyrostowo w celu zaktualizowania docelowej tabeli przesyłania strumieniowego. Większość zestawów danych delta Live Tables tworzonych w potoku definiuje przepływ jako część zapytania i nie wymaga jawnego definiowania przepływu. Możesz na przykład utworzyć tabelę przesyłania strumieniowego w tabelach delta live w jednym poleceniu DDL zamiast używać oddzielnych instrukcji tabeli i przepływu w celu utworzenia tabeli przesyłania strumieniowego:

Uwaga

Ten CREATE FLOW przykład jest udostępniany tylko do celów ilustracyjnych i zawiera słowa kluczowe, które nie są prawidłową składnią tabel delta live tables.

CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")

-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;

CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");

Oprócz domyślnego przepływu zdefiniowanego przez zapytanie interfejsy Delta Live Tables Python i SQL zapewniają funkcje przepływu dołączania. Przepływ dołączania obsługuje przetwarzanie, które wymaga odczytywania danych z wielu źródeł przesyłania strumieniowego w celu zaktualizowania pojedynczej tabeli przesyłania strumieniowego. Możesz na przykład użyć funkcji przepływu dołączania, jeśli masz istniejącą tabelę i przepływ przesyłania strumieniowego i chcesz dodać nowe źródło przesyłania strumieniowego, które zapisuje w istniejącej tabeli przesyłania strumieniowego.

Używanie przepływu dołączania do zapisywania w tabeli przesyłania strumieniowego z wielu strumieni źródłowych

Uwaga

Aby można było używać przetwarzania przepływu dołączania, potok musi być skonfigurowany do korzystania z kanału w wersji zapoznawczej.

Użyj dekoratora @append_flow w interfejsie języka Python lub CREATE FLOW klauzuli w interfejsie SQL, aby zapisać w tabeli przesyłania strumieniowego z wielu źródeł przesyłania strumieniowego. Użyj przepływu dołączania do przetwarzania zadań, takich jak:

  • Dodaj źródła przesyłania strumieniowego, które dołączają dane do istniejącej tabeli przesyłania strumieniowego bez konieczności pełnego odświeżania. Na przykład możesz mieć tabelę łączącą dane regionalne z każdego regionu, w którym działasz. W miarę wdrażania nowych regionów można dodać nowe dane regionu do tabeli bez przeprowadzania pełnego odświeżania. Zobacz Przykład: Zapisywanie w tabeli przesyłania strumieniowego z wielu tematów platformy Kafka.
  • Zaktualizuj tabelę przesyłania strumieniowego, dołączając brakujące dane historyczne (wypełnianie). Na przykład masz istniejącą tabelę przesyłania strumieniowego napisaną na podstawie tematu platformy Apache Kafka. Istnieją również dane historyczne przechowywane w tabeli, która jest potrzebna dokładnie raz do tabeli przesyłania strumieniowego i nie można przesyłać strumieniowo danych, ponieważ przetwarzanie obejmuje wykonywanie złożonej agregacji przed wstawieniem danych. Zobacz Przykład: Uruchamianie jednorazowego wypełniania danych.
  • Łączenie danych z wielu źródeł i zapisywanie w jednej tabeli przesyłania strumieniowego zamiast używania UNION klauzuli w zapytaniu. Użycie przetwarzania przepływu dołączania zamiast UNION umożliwia przyrostowe aktualizowanie tabeli docelowej bez konieczności uruchamiania aktualizacji pełnego odświeżania. Zobacz Przykład: Użyj przetwarzania przepływu dołączania zamiast funkcji UNION.

Elementem docelowym dla rekordów wyjściowych przez przetwarzanie przepływu dołączania może być istniejąca tabela lub nowa tabela. W przypadku zapytań języka Python użyj funkcji create_streaming_table(), aby utworzyć tabelę docelową.

Ważne

  • Jeśli musisz zdefiniować ograniczenia dotyczące jakości danych z oczekiwaniami, zdefiniuj oczekiwania w tabeli docelowej w ramach create_streaming_table() funkcji lub istniejącej definicji tabeli. Nie można zdefiniować oczekiwań w @append_flow definicji.
  • Przepływy są identyfikowane przez nazwę przepływu, a ta nazwa służy do identyfikowania punktów kontrolnych przesyłania strumieniowego. Użycie nazwy przepływu do identyfikowania punktu kontrolnego oznacza następujące kwestie:
    • Jeśli nazwa istniejącego przepływu w potoku zostanie zmieniona, punkt kontrolny nie jest przenoszony, a zmieniony przepływ jest w rzeczywistości całkowicie nowym przepływem.
    • Nie można ponownie użyć nazwy przepływu w potoku, ponieważ istniejący punkt kontrolny nie będzie zgodny z nową definicją przepływu.

Poniżej przedstawiono składnię dla elementu @append_flow:

Python

import dlt

dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dlt.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming query>)

SQL

CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.

CREATE FLOW
  flow_name
AS INSERT INTO
  target_table BY NAME
SELECT * FROM
  source;

Przykład: zapisywanie w tabeli przesyłania strumieniowego z wielu tematów platformy Kafka

W poniższych przykładach tworzona jest tabela przesyłania strumieniowego o nazwie kafka_target i zapisuje w tej tabeli przesyłania strumieniowego z dwóch tematów platformy Kafka:

Python

import dlt

dlt.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dlt.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

Aby dowiedzieć się więcej na temat read_kafka() funkcji wartości tabeli używanej w zapytaniach SQL, zobacz read_kafka w dokumentacji języka SQL.

Przykład: uruchamianie jednorazowego wypełniania danych

W poniższych przykładach uruchomiono zapytanie, aby dołączyć dane historyczne do tabeli przesyłania strumieniowego:

Uwaga

Aby zapewnić rzeczywiste jednorazowe wypełnienie, gdy zapytanie wypełniania jest częścią potoku, który jest uruchamiany zgodnie z harmonogramem lub stale, usuń zapytanie po uruchomieniu potoku raz. Aby dołączyć nowe dane, jeśli pojawią się w katalogu wypełniania, pozostaw to zapytanie w miejscu.

Python

import dlt

@dlt.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dlt.append_flow(target = "csv_target")
def backfill():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")

SQL

CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  read_files(
    "path/to/sourceDir",
    "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO
  csv_target BY NAME
SELECT * FROM
  read_files(
    "path/to/backfill/data/dir",
    "csv"
  );

Przykład: Użyj przetwarzania przepływu dołączania zamiast UNION

Zamiast używać zapytania z klauzulą UNION , możesz użyć zapytań przepływu dołączania, aby połączyć wiele źródeł i zapisać w jednej tabeli przesyłania strumieniowego. Używanie zapytań przepływu dołączania zamiast UNION umożliwia dołączanie do tabeli przesyłania strumieniowego z wielu źródeł bez uruchamiania pełnego odświeżania.

Poniższy przykład języka Python zawiera zapytanie, które łączy wiele źródeł danych z klauzulą UNION :

@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")

  raw_orders_eu =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")

  return raw_orders_us.union(raw_orders_eu)

W poniższych przykładach zapytanie jest zastępowane UNION zapytaniami przepływu dołączania:

Python

dlt.create_streaming_table("raw_orders")

@dlt.append_flow(target="raw_orders")
def raw_oders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")

SQL

CREATE OR REFRESH STREAMING TABLE raw_orders;

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  read_files(
    "/path/to/orders/us",
    "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  read_files(
    "/path/to/orders/eu",
    "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  read_files(
    "/path/to/orders/apac",
    "csv"
  );