Vývoj kódu kanálu pomocí SQL
DLT zavádí několik nových klíčových slov a funkcí SQL pro definování materializovaných zobrazení a streamovaných tabulek v kanálech. Podpora SQL pro vývoj kanálů vychází ze základů Spark SQL a přidává podporu funkcí strukturovaného streamování.
Uživatelé, kteří znají datové rámce PySpark, můžou preferovat vývoj kódu kanálu pomocí Pythonu. Python podporuje rozsáhlejší testování a operace, které jsou náročné na implementaci s SQL, jako jsou operace metaprogramování. Viz Vývoj kódu kanálu pomocíPythonu .
Úplný přehled syntaxe DLT SQL viz referenční příručka DLT SQL.
Základy SQL pro vývoj datových kanálů
Kód SQL, který vytváří datové sady DLT, používá syntaxi CREATE OR REFRESH
k definování materializovaných zobrazení a streamovaných tabulek proti výsledkům dotazu.
Klíčové slovo STREAM
označuje, jestli má být zdroj dat odkazovaný v klauzuli SELECT
přečten sémantikou streamování.
Ve výchozím nastavení se čtení a zápisy provádějí do katalogu a schématu zadaného během konfigurace kanálu. Viz Nastavte cílový katalog a schéma.
Zdrojový kód DLT se zásadně liší od skriptů SQL: DLT vyhodnotí všechny definice datových sad ve všech souborech zdrojového kódu nakonfigurovaných v kanálu a vytvoří graf toku dat před spuštěním jakýchkoli dotazů. Pořadí dotazů zobrazených v poznámkovém bloku nebo skriptu definuje pořadí vyhodnocení kódu, ale ne pořadí provádění dotazů.
Vytvoření materializovaného zobrazení pomocí SQL
Následující příklad kódu ukazuje základní syntaxi pro vytvoření materializovaného zobrazení pomocí SQL:
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
Vytvoření streamované tabulky pomocí SQL
Následující příklad kódu ukazuje základní syntaxi pro vytvoření tabulky streamování pomocí SQL:
Poznámka
Ne všechny zdroje dat podporují čtení streamování a některé zdroje dat by se měly vždy zpracovávat sémantikou streamování.
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
Načtení dat z úložiště objektů
DLT podporuje načítání dat ze všech formátů podporovaných službou Azure Databricks. Viz Možnosti formátu dat.
Poznámka
Tyto příklady používají data dostupná v /databricks-datasets
automaticky připojená k vašemu pracovnímu prostoru. Databricks doporučuje používat cesty svazků nebo cloudové identifikátory URI k odkazování na data uložená v cloudovém úložišti objektů. Podívejte se na Co jsou svazky katalogu Unity?.
Databricks doporučuje používat Auto Loader a streamované tabulky při konfiguraci úkolů pro přírůstkový příjem dat uložených v objektovém cloudovém úložišti. Podívejte se na Co je automatický načítač?.
SQL používá funkci read_files
k vyvolání funkce automatického zavaděče. Musíte také použít klíčové slovo STREAM
ke konfiguraci streamovaného čtení pomocí read_files
.
Následující příklad vytvoří streamovací tabulku ze souborů JSON pomocí automatického zavaděče:
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Funkce read_files
také podporuje dávkovou sémantiku pro vytváření materializovaných zobrazení. Následující příklad používá sémantiku dávky ke čtení adresáře JSON a vytvoření materializovaného zobrazení:
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");
Ověření dat s očekáváními
Pomocí očekávání můžete nastavit a vynutit omezení kvality dat. Viz Spravujte kvalitu dat pomocí očekávání potrubí.
Následující kód definuje očekávanou pojmenovanou valid_data
, která během příjmu dat zahodí záznamy, které mají hodnotu null:
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Provádění dotazů na materializovaná zobrazení a streamované tabulky definované v datovém potrubí
Následující příklad definuje čtyři datové sady:
- Streamovaná tabulka s názvem
orders
, která načítá data JSON. - Materializované zobrazení s názvem
customers
, které načte data CSV. - Materializované zobrazení s názvem
customer_orders
, které spojuje záznamy z datových sadorders
acustomers
, přetypuje časové razítko objednávky na datum a vybere polecustomer_id
,order_number
,state
aorder_date
. - Materializované zobrazení s názvem
daily_orders_by_state
, které agreguje denní počet objednávek pro každý stav.
Poznámka
Při dotazování zobrazení nebo tabulek v kanálu můžete přímo zadat katalog a schéma nebo můžete použít výchozí hodnoty nakonfigurované v kanálu. V tomto příkladu se tabulky orders
, customers
a customer_orders
zapisují a čtou z výchozího katalogu a schématu nakonfigurovaného pro váš kanál.
Starší režim publikování používá schéma LIVE
k dotazování jiných materializovaných zobrazení a streamovaných tabulek definovaných ve vašem potrubí. V nových potrubích se syntaxe schématu LIVE
ignoruje bez oznámení. Viz LIVE schema (starší verze).
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;