Opracowywanie kodu potoku przy użyciu języka SQL
Delta Live Tables wprowadza kilka nowych słów kluczowych i funkcji SQL do definiowania zmaterializowanych widoków i tabel strumieniowych w potokach. Obsługa języka SQL na potrzeby tworzenia potoków opiera się na podstawach usługi Spark SQL i dodaje obsługę funkcji przesyłania strumieniowego ze strukturą.
Użytkownicy zaznajomieni z ramkami danych PySpark mogą preferować tworzenie kodu potoku w języku Python. Język Python obsługuje bardziej rozbudowane testowanie i operacje, które są trudne do zaimplementowania przy użyciu języka SQL, takich jak operacje metaprogramowania. Zobacz Tworzenie kodu potoku przy użyciu języka Python.
Aby zapoznać się z pełnym odniesieniem do składni SQL Delta Live Tables, zapoznaj się z referencją języka SQL Delta Live Tables.
Podstawy opracowywania potoków w języku SQL
Kod SQL, który tworzy zestawy danych Delta Live Tables, używa składni CREATE OR REFRESH
do definiowania zmaterializowanych widoków i tabel przesyłania strumieniowego względem wyników zapytania.
Słowo STREAM
kluczowe wskazuje, czy źródło danych, do których odwołuje się klauzula SELECT
, powinno być odczytywane za pomocą semantyki przesyłania strumieniowego.
Odczyty i zapisy domyślnie dotyczą katalogu i schematu określonych podczas konfiguracji potoku. Zobacz Ustaw katalog docelowy i schemat.
Kod źródłowy Delta Live Tables krytycznie różni się od skryptów SQL: Tabele Delta Live Tables oceniają wszystkie definicje zestawów danych we wszystkich plikach kodu źródłowego skonfigurowanych w potoku i tworzą graf przepływu danych przed uruchomieniem zapytań. Kolejność zapytań wyświetlanych w notesie lub skryscie definiuje kolejność oceny kodu, ale nie kolejność wykonywania zapytań.
Tworzenie zmaterializowanego widoku przy użyciu języka SQL
Poniższy przykład kodu przedstawia podstawową składnię tworzenia zmaterializowanego widoku przy użyciu języka SQL:
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
Tworzenie tabeli przesyłania strumieniowego za pomocą języka SQL
Poniższy przykład kodu demonstruje podstawową składnię tworzenia tabeli strumieniowej za pomocą SQL:
Uwaga
Nie wszystkie źródła danych obsługują odczyty przesyłane strumieniowo, a niektóre źródła danych powinny być zawsze przetwarzane za pomocą semantyki przesyłania strumieniowego.
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
Ładowanie danych z magazynu obiektów
Usługa Delta Live Tables obsługuje ładowanie danych ze wszystkich formatów obsługiwanych przez usługę Azure Databricks. Zobacz Opcje formatu danych.
Uwaga
W tych przykładach używane są dane dostępne w obszarze /databricks-datasets
automatycznie zainstalowanym w obszarze roboczym. Usługa Databricks zaleca używanie ścieżek woluminów lub identyfikatorów URI w chmurze w celu odwołowania się do danych przechowywanych w magazynie obiektów w chmurze. Zobacz Co to są woluminy katalogu Unity?.
Usługa Databricks zaleca używanie Auto Loader i tabel przesyłania strumieniowego podczas konfigurowania obciążeń pozyskiwania przyrostowego względem danych przechowywanych w usługach przechowywania obiektów w chmurze. Zobacz Co to jest moduł automatycznego ładowania?.
Program SQL używa read_files
funkcji do wywoływania funkcji automatycznego modułu ładującego. Należy również użyć słowa kluczowego STREAM
, aby skonfigurować odczyt przesyłania strumieniowego za pomocą polecenia read_files
.
W poniższym przykładzie tabela strumieniowa z plików JSON jest tworzona przy użyciu Auto Loader.
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Funkcja read_files
obsługuje również semantyki wsadowe w celu tworzenia zmaterializowanych widoków. W poniższym przykładzie użyto semantyki wsadowej do odczytania katalogu JSON i utworzenia zmaterializowanego widoku:
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");
Weryfikowanie danych z oczekiwaniami
Możesz użyć oczekiwań, aby ustawić i wymusić ograniczenia dotyczące jakości danych. Zobacz Zarządzanie jakością danych przy użyciu oczekiwań potoku.
Poniższy kod definiuje oczekiwaną nazwę, valid_data
która odrzuca rekordy, które mają wartość null podczas pozyskiwania danych:
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Wykonywanie zapytań na zmaterializowanych widokach i tabelach strumieniowych zdefiniowanych w ramach potoku
W poniższym przykładzie zdefiniowano cztery zestawy danych:
- Tabela strumieniowa nazwana
orders
, która ładuje dane JSON. - Zmaterializowany widok o nazwie
customers
, który ładuje dane CSV. - Zmaterializowany widok o nazwie
customer_orders
, który łączy rekordy zorders
zestawów danych icustomers
, rzutuje znacznik czasu zamówienia na datę i wybieracustomer_id
pola ,order_number
,state
iorder_date
. - Zmaterializowany widok o nazwie
daily_orders_by_state
, który agreguje dzienną liczbę zamówień dla każdego stanu.
Uwaga
Podczas wykonywania zapytań dotyczących widoków lub tabel w potoku można bezpośrednio określić wykaz i schemat lub użyć wartości domyślnych skonfigurowanych w potoku. W tym przykładzie, tabele orders
, customers
i customer_orders
są zapisywane i odczytywane z domyślnego katalogu i schematu skonfigurowanego dla twojego potoku.
Tryb dziedzictwa publikowania używa schematu LIVE
do wysyłania zapytań dotyczących innych zmaterializowanych widoków i tabel przesyłania strumieniowego określonych w Twoim potoku. W nowych potokach składnia schematu LIVE
jest ignorowana w trybie dyskretnym. Zobacz live schema (starsza wersja).
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;