Opracowywanie kodu potoku przy użyciu języka SQL
Funkcja Delta Live Tables wprowadza kilka nowych słów kluczowych i funkcji SQL do definiowania zmaterializowanych widoków i tabel przesyłania strumieniowego w potokach. Obsługa języka SQL na potrzeby tworzenia potoków opiera się na podstawach usługi Spark SQL i dodaje obsługę funkcji przesyłania strumieniowego ze strukturą.
Użytkownicy zaznajomieni z ramkami danych PySpark mogą preferować tworzenie kodu potoku w języku Python. Język Python obsługuje bardziej rozbudowane testowanie i operacje, które są trudne do zaimplementowania przy użyciu języka SQL, takich jak operacje metaprogramowania. Zobacz Tworzenie kodu potoku przy użyciu języka Python.
Aby zapoznać się z pełną dokumentacją składni JĘZYKA SQL tabel delta Live Tables, zobacz Dokumentacja języka SQL usługi Delta Live Tables.
Podstawy opracowywania potoków w języku SQL
Kod SQL, który tworzy zestawy danych funkcji Delta Live Tables, używa CREATE OR REFRESH
składni do definiowania zmaterializowanych widoków i tabel przesyłania strumieniowego względem wyników zapytań.
Słowo STREAM
kluczowe wskazuje, czy źródło danych, do których odwołuje się klauzula SELECT
, powinno być odczytywane za pomocą semantyki przesyłania strumieniowego.
Kod źródłowy funkcji Delta Live Tables krytycznie różni się od skryptów SQL: tabele delta Live Tables ocenia wszystkie definicje zestawów danych we wszystkich plikach kodu źródłowego skonfigurowanych w potoku i tworzy graf przepływu danych przed uruchomieniem zapytań. Kolejność zapytań wyświetlanych w notesie lub skrycie nie definiuje kolejności wykonywania.
Tworzenie zmaterializowanego widoku przy użyciu języka SQL
Poniższy przykład kodu przedstawia podstawową składnię tworzenia zmaterializowanego widoku przy użyciu języka SQL:
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
Tworzenie tabeli przesyłania strumieniowego za pomocą języka SQL
Poniższy przykład kodu przedstawia podstawową składnię tworzenia tabeli przesyłania strumieniowego za pomocą języka SQL:
Uwaga
Nie wszystkie źródła danych obsługują odczyty przesyłane strumieniowo, a niektóre źródła danych powinny być zawsze przetwarzane za pomocą semantyki przesyłania strumieniowego.
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
Ładowanie danych z magazynu obiektów
Usługa Delta Live Tables obsługuje ładowanie danych ze wszystkich formatów obsługiwanych przez usługę Azure Databricks. Zobacz Opcje formatu danych.
Uwaga
W tych przykładach używane są dane dostępne w obszarze /databricks-datasets
automatycznie zainstalowanym w obszarze roboczym. Usługa Databricks zaleca używanie ścieżek woluminów lub identyfikatorów URI w chmurze w celu odwołowania się do danych przechowywanych w magazynie obiektów w chmurze. Zobacz Co to są woluminy wykazu aparatu Unity?.
Usługa Databricks zaleca używanie tabel automatycznego modułu ładującego i przesyłania strumieniowego podczas konfigurowania obciążeń pozyskiwania przyrostowego względem danych przechowywanych w magazynie obiektów w chmurze. Zobacz Co to jest moduł automatycznego ładowania?.
Program SQL używa read_files
funkcji do wywoływania funkcji automatycznego modułu ładującego. Należy również użyć słowa kluczowego STREAM
, aby skonfigurować odczyt przesyłania strumieniowego za pomocą polecenia read_files
.
W poniższym przykładzie tworzona jest tabela przesyłania strumieniowego z plików JSON przy użyciu modułu automatycznego ładującego:
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Funkcja read_files
obsługuje również semantyki wsadowe w celu utworzenia zmaterializowanych widoków. W poniższym przykładzie użyto semantyki wsadowej do odczytania katalogu JSON i utworzenia zmaterializowanego widoku:
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");
Weryfikowanie danych z oczekiwaniami
Możesz użyć oczekiwań, aby ustawić i wymusić ograniczenia dotyczące jakości danych. Zobacz Zarządzanie jakością danych za pomocą tabel delta live.
Poniższy kod definiuje oczekiwaną nazwę, valid_data
która odrzuca rekordy, które mają wartość null podczas pozyskiwania danych:
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Wykonywanie zapytań dotyczących zmaterializowanych widoków i tabel przesyłania strumieniowego zdefiniowanych w potoku
Użyj schematu LIVE
, aby wykonywać zapytania dotyczące innych zmaterializowanych widoków i tabel przesyłania strumieniowego zdefiniowanych w potoku.
W poniższym przykładzie zdefiniowano cztery zestawy danych:
- Tabela przesyłania strumieniowego o nazwie
orders
, która ładuje dane JSON. - Zmaterializowany widok o nazwie
customers
, który ładuje dane CSV. - Zmaterializowany widok o nazwie
customer_orders
, który łączy rekordy zorders
zestawów danych icustomers
, rzutuje znacznik czasu zamówienia na datę i wybieracustomer_id
pola ,order_number
,state
iorder_date
. - Zmaterializowany widok o nazwie
daily_orders_by_state
, który agreguje dzienną liczbę zamówień dla każdego stanu.
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM LIVE.orders o
INNER JOIN LIVE.customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM LIVE.customer_orders
GROUP BY state, order_date;