Udostępnij za pośrednictwem


Opracowywanie kodu potoku przy użyciu języka SQL

Funkcja Delta Live Tables wprowadza kilka nowych słów kluczowych i funkcji SQL do definiowania zmaterializowanych widoków i tabel przesyłania strumieniowego w potokach. Obsługa języka SQL na potrzeby tworzenia potoków opiera się na podstawach usługi Spark SQL i dodaje obsługę funkcji przesyłania strumieniowego ze strukturą.

Użytkownicy zaznajomieni z ramkami danych PySpark mogą preferować tworzenie kodu potoku w języku Python. Język Python obsługuje bardziej rozbudowane testowanie i operacje, które są trudne do zaimplementowania przy użyciu języka SQL, takich jak operacje metaprogramowania. Zobacz Tworzenie kodu potoku przy użyciu języka Python.

Aby zapoznać się z pełną dokumentacją składni JĘZYKA SQL tabel delta Live Tables, zobacz Dokumentacja języka SQL usługi Delta Live Tables.

Podstawy opracowywania potoków w języku SQL

Kod SQL, który tworzy zestawy danych funkcji Delta Live Tables, używa CREATE OR REFRESH składni do definiowania zmaterializowanych widoków i tabel przesyłania strumieniowego względem wyników zapytań.

Słowo STREAM kluczowe wskazuje, czy źródło danych, do których odwołuje się klauzula SELECT , powinno być odczytywane za pomocą semantyki przesyłania strumieniowego.

Kod źródłowy funkcji Delta Live Tables krytycznie różni się od skryptów SQL: tabele delta Live Tables ocenia wszystkie definicje zestawów danych we wszystkich plikach kodu źródłowego skonfigurowanych w potoku i tworzy graf przepływu danych przed uruchomieniem zapytań. Kolejność zapytań wyświetlanych w notesie lub skrycie nie definiuje kolejności wykonywania.

Tworzenie zmaterializowanego widoku przy użyciu języka SQL

Poniższy przykład kodu przedstawia podstawową składnię tworzenia zmaterializowanego widoku przy użyciu języka SQL:

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

Tworzenie tabeli przesyłania strumieniowego za pomocą języka SQL

Poniższy przykład kodu przedstawia podstawową składnię tworzenia tabeli przesyłania strumieniowego za pomocą języka SQL:

Uwaga

Nie wszystkie źródła danych obsługują odczyty przesyłane strumieniowo, a niektóre źródła danych powinny być zawsze przetwarzane za pomocą semantyki przesyłania strumieniowego.

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

Ładowanie danych z magazynu obiektów

Usługa Delta Live Tables obsługuje ładowanie danych ze wszystkich formatów obsługiwanych przez usługę Azure Databricks. Zobacz Opcje formatu danych.

Uwaga

W tych przykładach używane są dane dostępne w obszarze /databricks-datasets automatycznie zainstalowanym w obszarze roboczym. Usługa Databricks zaleca używanie ścieżek woluminów lub identyfikatorów URI w chmurze w celu odwołowania się do danych przechowywanych w magazynie obiektów w chmurze. Zobacz Co to są woluminy wykazu aparatu Unity?.

Usługa Databricks zaleca używanie tabel automatycznego modułu ładującego i przesyłania strumieniowego podczas konfigurowania obciążeń pozyskiwania przyrostowego względem danych przechowywanych w magazynie obiektów w chmurze. Zobacz Co to jest moduł automatycznego ładowania?.

Program SQL używa read_files funkcji do wywoływania funkcji automatycznego modułu ładującego. Należy również użyć słowa kluczowego STREAM , aby skonfigurować odczyt przesyłania strumieniowego za pomocą polecenia read_files.

W poniższym przykładzie tworzona jest tabela przesyłania strumieniowego z plików JSON przy użyciu modułu automatycznego ładującego:

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Funkcja read_files obsługuje również semantyki wsadowe w celu utworzenia zmaterializowanych widoków. W poniższym przykładzie użyto semantyki wsadowej do odczytania katalogu JSON i utworzenia zmaterializowanego widoku:

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");

Weryfikowanie danych z oczekiwaniami

Możesz użyć oczekiwań, aby ustawić i wymusić ograniczenia dotyczące jakości danych. Zobacz Zarządzanie jakością danych za pomocą tabel delta live.

Poniższy kod definiuje oczekiwaną nazwę, valid_data która odrzuca rekordy, które mają wartość null podczas pozyskiwania danych:

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Wykonywanie zapytań dotyczących zmaterializowanych widoków i tabel przesyłania strumieniowego zdefiniowanych w potoku

Użyj schematu LIVE , aby wykonywać zapytania dotyczące innych zmaterializowanych widoków i tabel przesyłania strumieniowego zdefiniowanych w potoku.

W poniższym przykładzie zdefiniowano cztery zestawy danych:

  • Tabela przesyłania strumieniowego o nazwie orders , która ładuje dane JSON.
  • Zmaterializowany widok o nazwie customers , który ładuje dane CSV.
  • Zmaterializowany widok o nazwie customer_orders , który łączy rekordy z orders zestawów danych i customers , rzutuje znacznik czasu zamówienia na datę i wybiera customer_idpola , order_number, statei order_date .
  • Zmaterializowany widok o nazwie daily_orders_by_state , który agreguje dzienną liczbę zamówień dla każdego stanu.
CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM LIVE.orders o
INNER JOIN LIVE.customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM LIVE.customer_orders
GROUP BY state, order_date;