Sdílet prostřednictvím


Vývoj kódu kanálu pomocí SQL

DLT zavádí několik nových klíčových slov a funkcí SQL pro definování materializovaných zobrazení a streamovaných tabulek v kanálech. Podpora SQL pro vývoj kanálů vychází ze základů Spark SQL a přidává podporu funkcí strukturovaného streamování.

Uživatelé, kteří znají datové rámce PySpark, můžou preferovat vývoj kódu kanálu pomocí Pythonu. Python podporuje rozsáhlejší testování a operace, které jsou náročné na implementaci s SQL, jako jsou operace metaprogramování. Viz Vývoj kódu kanálu pomocíPythonu .

Úplný přehled syntaxe DLT SQL viz referenční příručka DLT SQL.

Základy SQL pro vývoj datových kanálů

Kód SQL, který vytváří datové sady DLT, používá syntaxi CREATE OR REFRESH k definování materializovaných zobrazení a streamovaných tabulek proti výsledkům dotazu.

Klíčové slovo STREAM označuje, jestli má být zdroj dat odkazovaný v klauzuli SELECT přečten sémantikou streamování.

Ve výchozím nastavení se čtení a zápisy provádějí do katalogu a schématu zadaného během konfigurace kanálu. Viz Nastavte cílový katalog a schéma.

Zdrojový kód DLT se zásadně liší od skriptů SQL: DLT vyhodnotí všechny definice datových sad ve všech souborech zdrojového kódu nakonfigurovaných v kanálu a vytvoří graf toku dat před spuštěním jakýchkoli dotazů. Pořadí dotazů zobrazených v poznámkovém bloku nebo skriptu definuje pořadí vyhodnocení kódu, ale ne pořadí provádění dotazů.

Vytvoření materializovaného zobrazení pomocí SQL

Následující příklad kódu ukazuje základní syntaxi pro vytvoření materializovaného zobrazení pomocí SQL:

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

Vytvoření streamované tabulky pomocí SQL

Následující příklad kódu ukazuje základní syntaxi pro vytvoření tabulky streamování pomocí SQL:

Poznámka

Ne všechny zdroje dat podporují čtení streamování a některé zdroje dat by se měly vždy zpracovávat sémantikou streamování.

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

Načtení dat z úložiště objektů

DLT podporuje načítání dat ze všech formátů podporovaných službou Azure Databricks. Viz Možnosti formátu dat.

Poznámka

Tyto příklady používají data dostupná v /databricks-datasets automaticky připojená k vašemu pracovnímu prostoru. Databricks doporučuje používat cesty svazků nebo cloudové identifikátory URI k odkazování na data uložená v cloudovém úložišti objektů. Podívejte se na Co jsou svazky katalogu Unity?.

Databricks doporučuje používat Auto Loader a streamované tabulky při konfiguraci úkolů pro přírůstkový příjem dat uložených v objektovém cloudovém úložišti. Podívejte se na Co je automatický načítač?.

SQL používá funkci read_files k vyvolání funkce automatického zavaděče. Musíte také použít klíčové slovo STREAM ke konfiguraci streamovaného čtení pomocí read_files.

Následující příklad vytvoří streamovací tabulku ze souborů JSON pomocí automatického zavaděče:

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Funkce read_files také podporuje dávkovou sémantiku pro vytváření materializovaných zobrazení. Následující příklad používá sémantiku dávky ke čtení adresáře JSON a vytvoření materializovaného zobrazení:

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");

Ověření dat s očekáváními

Pomocí očekávání můžete nastavit a vynutit omezení kvality dat. Viz Spravujte kvalitu dat pomocí očekávání potrubí.

Následující kód definuje očekávanou pojmenovanou valid_data, která během příjmu dat zahodí záznamy, které mají hodnotu null:

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Provádění dotazů na materializovaná zobrazení a streamované tabulky definované v datovém potrubí

Následující příklad definuje čtyři datové sady:

  • Streamovaná tabulka s názvem orders, která načítá data JSON.
  • Materializované zobrazení s názvem customers, které načte data CSV.
  • Materializované zobrazení s názvem customer_orders, které spojuje záznamy z datových sad orders a customers, přetypuje časové razítko objednávky na datum a vybere pole customer_id, order_number, statea order_date.
  • Materializované zobrazení s názvem daily_orders_by_state, které agreguje denní počet objednávek pro každý stav.

Poznámka

Při dotazování zobrazení nebo tabulek v kanálu můžete přímo zadat katalog a schéma nebo můžete použít výchozí hodnoty nakonfigurované v kanálu. V tomto příkladu se tabulky orders, customersa customer_orders zapisují a čtou z výchozího katalogu a schématu nakonfigurovaného pro váš kanál.

Starší režim publikování používá schéma LIVE k dotazování jiných materializovaných zobrazení a streamovaných tabulek definovaných ve vašem potrubí. V nových potrubích se syntaxe schématu LIVE ignoruje bez oznámení. Viz LIVE schema (starší verze).

CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;