Udostępnij za pośrednictwem


Opracowywanie kodu potoku przy użyciu języka SQL

Delta Live Tables wprowadza kilka nowych słów kluczowych i funkcji SQL do definiowania zmaterializowanych widoków i tabel strumieniowych w potokach. Obsługa języka SQL na potrzeby tworzenia potoków opiera się na podstawach usługi Spark SQL i dodaje obsługę funkcji przesyłania strumieniowego ze strukturą.

Użytkownicy zaznajomieni z ramkami danych PySpark mogą preferować tworzenie kodu potoku w języku Python. Język Python obsługuje bardziej rozbudowane testowanie i operacje, które są trudne do zaimplementowania przy użyciu języka SQL, takich jak operacje metaprogramowania. Zobacz Tworzenie kodu potoku przy użyciu języka Python.

Aby zapoznać się z pełnym odniesieniem do składni SQL Delta Live Tables, zapoznaj się z referencją języka SQL Delta Live Tables.

Podstawy opracowywania potoków w języku SQL

Kod SQL, który tworzy zestawy danych Delta Live Tables, używa składni CREATE OR REFRESH do definiowania zmaterializowanych widoków i tabel przesyłania strumieniowego względem wyników zapytania.

Słowo STREAM kluczowe wskazuje, czy źródło danych, do których odwołuje się klauzula SELECT , powinno być odczytywane za pomocą semantyki przesyłania strumieniowego.

Odczyty i zapisy domyślnie dotyczą katalogu i schematu określonych podczas konfiguracji potoku. Zobacz Ustaw katalog docelowy i schemat.

Kod źródłowy Delta Live Tables krytycznie różni się od skryptów SQL: Tabele Delta Live Tables oceniają wszystkie definicje zestawów danych we wszystkich plikach kodu źródłowego skonfigurowanych w potoku i tworzą graf przepływu danych przed uruchomieniem zapytań. Kolejność zapytań wyświetlanych w notesie lub skryscie definiuje kolejność oceny kodu, ale nie kolejność wykonywania zapytań.

Tworzenie zmaterializowanego widoku przy użyciu języka SQL

Poniższy przykład kodu przedstawia podstawową składnię tworzenia zmaterializowanego widoku przy użyciu języka SQL:

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

Tworzenie tabeli przesyłania strumieniowego za pomocą języka SQL

Poniższy przykład kodu demonstruje podstawową składnię tworzenia tabeli strumieniowej za pomocą SQL:

Uwaga

Nie wszystkie źródła danych obsługują odczyty przesyłane strumieniowo, a niektóre źródła danych powinny być zawsze przetwarzane za pomocą semantyki przesyłania strumieniowego.

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

Ładowanie danych z magazynu obiektów

Usługa Delta Live Tables obsługuje ładowanie danych ze wszystkich formatów obsługiwanych przez usługę Azure Databricks. Zobacz Opcje formatu danych.

Uwaga

W tych przykładach używane są dane dostępne w obszarze /databricks-datasets automatycznie zainstalowanym w obszarze roboczym. Usługa Databricks zaleca używanie ścieżek woluminów lub identyfikatorów URI w chmurze w celu odwołowania się do danych przechowywanych w magazynie obiektów w chmurze. Zobacz Co to są woluminy katalogu Unity?.

Usługa Databricks zaleca używanie Auto Loader i tabel przesyłania strumieniowego podczas konfigurowania obciążeń pozyskiwania przyrostowego względem danych przechowywanych w usługach przechowywania obiektów w chmurze. Zobacz Co to jest moduł automatycznego ładowania?.

Program SQL używa read_files funkcji do wywoływania funkcji automatycznego modułu ładującego. Należy również użyć słowa kluczowego STREAM , aby skonfigurować odczyt przesyłania strumieniowego za pomocą polecenia read_files.

W poniższym przykładzie tabela strumieniowa z plików JSON jest tworzona przy użyciu Auto Loader.

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Funkcja read_files obsługuje również semantyki wsadowe w celu tworzenia zmaterializowanych widoków. W poniższym przykładzie użyto semantyki wsadowej do odczytania katalogu JSON i utworzenia zmaterializowanego widoku:

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");

Weryfikowanie danych z oczekiwaniami

Możesz użyć oczekiwań, aby ustawić i wymusić ograniczenia dotyczące jakości danych. Zobacz Zarządzanie jakością danych przy użyciu oczekiwań potoku.

Poniższy kod definiuje oczekiwaną nazwę, valid_data która odrzuca rekordy, które mają wartość null podczas pozyskiwania danych:

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Wykonywanie zapytań na zmaterializowanych widokach i tabelach strumieniowych zdefiniowanych w ramach potoku

W poniższym przykładzie zdefiniowano cztery zestawy danych:

  • Tabela strumieniowa nazwana orders, która ładuje dane JSON.
  • Zmaterializowany widok o nazwie customers , który ładuje dane CSV.
  • Zmaterializowany widok o nazwie customer_orders , który łączy rekordy z orders zestawów danych i customers , rzutuje znacznik czasu zamówienia na datę i wybiera customer_idpola , order_number, statei order_date .
  • Zmaterializowany widok o nazwie daily_orders_by_state , który agreguje dzienną liczbę zamówień dla każdego stanu.

Uwaga

Podczas wykonywania zapytań dotyczących widoków lub tabel w potoku można bezpośrednio określić wykaz i schemat lub użyć wartości domyślnych skonfigurowanych w potoku. W tym przykładzie, tabele orders, customersi customer_orders są zapisywane i odczytywane z domyślnego katalogu i schematu skonfigurowanego dla twojego potoku.

Tryb dziedzictwa publikowania używa schematu LIVE do wysyłania zapytań dotyczących innych zmaterializowanych widoków i tabel przesyłania strumieniowego określonych w Twoim potoku. W nowych potokach składnia schematu LIVE jest ignorowana w trybie dyskretnym. Zobacz live schema (starsza wersja).

CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;