Vývoj kódu kanálu pomocí SQL
Delta Live Tables představuje několik nových klíčových slov a funkcí SQL pro definování materializovaných zobrazení a streamovaných tabulek v datových tocích. Podpora SQL pro vývoj kanálů vychází ze základů Spark SQL a přidává podporu funkcí strukturovaného streamování.
Uživatelé, kteří znají datové rámce PySpark, můžou preferovat vývoj kódu kanálu pomocí Pythonu. Python podporuje rozsáhlejší testování a operace, které jsou náročné na implementaci s SQL, jako jsou operace metaprogramování. Viz Vývoj kódu kanálu pomocí Pythonu.
Chcete-li získat úplnou referenci syntaxe SQL pro Delta Live Tables, podívejte se na Referenční informace pro jazyk SQL Delta Live Tables.
Základy SQL pro vývoj kanálů
Kód SQL, který vytváří datové sady Delta Live Tables, používá syntaxi CREATE OR REFRESH
k definování materializovaných zobrazení a streamovaných tabulek proti výsledkům dotazu.
Klíčové STREAM
slovo označuje, jestli má být zdroj dat odkazovaný v SELECT
klauzuli přečten pomocí sémantiky streamování.
Čtení a zápisy se ve výchozím nastavení provádějí v katalogu a schématu zadaném během konfigurace kanálu. Viz Nastavení cílového katalogu aschématu .
Zdrojový kód Delta Live Tables se zásadně liší od skriptů SQL: Delta Live Tables vyhodnocuje všechny definice datových sad ve všech souborech zdrojového kódu nakonfigurovaných v pipeline a vytvoří graf toku dat před spuštěním jakýchkoli dotazů. Pořadí dotazů zobrazených v poznámkovém bloku nebo skriptu definuje pořadí vyhodnocení kódu, ale ne pořadí provádění dotazů.
Vytvoření materializovaného zobrazení pomocí SQL
Následující příklad kódu ukazuje základní syntaxi pro vytvoření materializovaného zobrazení pomocí SQL:
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
Vytvoření streamované tabulky pomocí SQL
Následující příklad kódu ukazuje základní syntaxi pro vytvoření tabulky streamování pomocí SQL:
Poznámka:
Ne všechny zdroje dat podporují čtení streamování a některé zdroje dat by se měly vždy zpracovávat sémantikou streamování.
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
Načtení dat z úložiště objektů
Delta Live Tables podporuje načítání dat ze všech formátů podporovaných službou Azure Databricks. Viz Možnosti formátu dat.
Poznámka:
Tyto příklady používají data dostupná v rámci automaticky připojeného k vašemu /databricks-datasets
pracovnímu prostoru. Databricks doporučuje používat cesty svazků nebo cloudové identifikátory URI k odkazování na data uložená v cloudovém úložišti objektů. Podívejte se na Co jsou to svazky katalogu Unity?.
Databricks doporučuje používat Auto Loader a streamované tabulky při konfiguraci úloh přírůstkového příjmu pro data uložená v cloudovém úložišti objektů. Podívejte se, co je automatický zavaděč?
SQL používá read_files
funkci k vyvolání funkce automatického zavaděče. Musíte také použít STREAM
klíčové slovo ke konfiguraci streamování čtení pomocí read_files
.
Následující příklad vytvoří streamovací tabulku ze souborů JSON pomocí Auto Loaderu:
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Funkce read_files
také podporuje dávkovou sémantiku pro vytváření materializovaných zobrazení. Následující příklad používá sémantiku dávky ke čtení adresáře JSON a vytvoření materializovaného zobrazení:
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");
Ověření dat s očekáváními
Pomocí očekávání můžete nastavit a vynutit omezení kvality dat. Podívejte se na Správa kvality dat s očekáváními datového kanálu.
Následující kód definuje očekávanou hodnotu, která zahodí valid_data
záznamy, které mají hodnotu null během příjmu dat:
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Dotazování materializovaných zobrazení a streamovaných tabulek definovaných ve vašem potrubí
Následující příklad definuje čtyři datové sady:
- Streamovaná tabulka s názvem
orders
, která načítá data JSON. - Materializované zobrazení s názvem
customers
, které načte data CSV. - Materializované zobrazení,
customer_orders
které spojuje záznamy zorders
datových sad acustomers
datových sad, přetypuje časové razítko objednávky na datum a vyberecustomer_id
pole , ,order_number
state
aorder_date
pole. - Materializované zobrazení s názvem
daily_orders_by_state
agreguje denní počet objednávek pro každý stav.
Poznámka:
Při dotazování zobrazení nebo tabulek v kanálu můžete přímo zadat katalog a schéma nebo můžete použít výchozí hodnoty nakonfigurované v kanálu. V tomto příkladu se tabulky orders
, customers
a customer_orders
zapisují a čtou z výchozího katalogu a schématu nakonfigurovaného pro váš kanál.
Starší režim publikování používá schéma LIVE
k dotazování jiných materializovaných pohledů a streamovaných tabulek definovaných ve vašem potrubí. V nových potrubích je syntaxe schématu LIVE
tiše ignorována. Viz LIVE schema (starší verze).
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;