Sdílet prostřednictvím


Vývoj kódu kanálu pomocí SQL

Delta Live Tables představuje několik nových klíčových slov a funkcí SQL pro definování materializovaných zobrazení a streamovaných tabulek v kanálech. Podpora SQL pro vývoj kanálů vychází ze základů Spark SQL a přidává podporu funkcí strukturovaného streamování.

Uživatelé, kteří znají datové rámce PySpark, můžou preferovat vývoj kódu kanálu pomocí Pythonu. Python podporuje rozsáhlejší testování a operace, které jsou náročné na implementaci s SQL, jako jsou operace metaprogramování. Viz Vývoj kódu kanálu pomocí Pythonu.

Úplný odkaz na syntaxi SQL rozdílových živých tabulek naleznete v tématu Referenční informace jazyka SQL delta live tables.

Základy SQL pro vývoj kanálů

Kód SQL, který vytváří datové sady Delta Live Tables, používá CREATE OR REFRESH syntaxi k definování materializovaných zobrazení a streamovaných tabulek proti výsledkům dotazu.

Klíčové STREAM slovo označuje, jestli má být zdroj dat odkazovaný v SELECT klauzuli přečten pomocí sémantiky streamování.

Zdrojový kód Delta Live Tables se zásadně liší od skriptů SQL: Rozdílové živé tabulky vyhodnocují všechny definice datových sad ve všech souborech zdrojového kódu nakonfigurovaných v kanálu a vytvoří graf toku dat před spuštěním jakýchkoli dotazů. Pořadí dotazů zobrazených v poznámkovém bloku nebo skriptu nedefinuje pořadí provádění.

Vytvoření materializovaného zobrazení pomocí SQL

Následující příklad kódu ukazuje základní syntaxi pro vytvoření materializovaného zobrazení pomocí SQL:

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

Vytvoření streamované tabulky pomocí SQL

Následující příklad kódu ukazuje základní syntaxi pro vytvoření tabulky streamování pomocí SQL:

Poznámka:

Ne všechny zdroje dat podporují čtení streamování a některé zdroje dat by se měly vždy zpracovávat sémantikou streamování.

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

Načtení dat z úložiště objektů

Delta Live Tables podporuje načítání dat ze všech formátů podporovaných službou Azure Databricks. Viz Možnosti formátu dat.

Poznámka:

Tyto příklady používají data dostupná v rámci automaticky připojeného k vašemu /databricks-datasets pracovnímu prostoru. Databricks doporučuje používat cesty svazků nebo cloudové identifikátory URI k odkazování na data uložená v cloudovém úložišti objektů. Podívejte se, co jsou svazky katalogu Unity?

Databricks doporučuje používat automatické zavaděče a streamované tabulky při konfiguraci úloh přírůstkového příjmu dat na data uložená v cloudovém úložišti objektů. Podívejte se, co je automatický zavaděč?

SQL používá read_files funkci k vyvolání funkce automatického zavaděče. Musíte také použít STREAM klíčové slovo ke konfiguraci streamování čtení pomocí read_files.

Následující příklad vytvoří streamovací tabulku ze souborů JSON pomocí automatického zavaděče:

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Funkce read_files také podporuje sémantiku dávky pro vytváření materializovaných zobrazení. Následující příklad používá sémantiku dávky ke čtení adresáře JSON a vytvoření materializovaného zobrazení:

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");

Ověření dat s očekáváními

Pomocí očekávání můžete nastavit a vynutit omezení kvality dat. Viz Správa kvality dat pomocí rozdílových živých tabulek.

Následující kód definuje očekávanou hodnotu, která zahodí valid_data záznamy, které mají hodnotu null během příjmu dat:

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Dotazování materializovaných zobrazení a streamovaných tabulek definovaných v kanálu

Pomocí schématu LIVE můžete dotazovat další materializovaná zobrazení a streamované tabulky definované v kanálu.

Následující příklad definuje čtyři datové sady:

  • Streamovaná tabulka s názvem orders , která načítá data JSON.
  • Materializované zobrazení s názvem customers , které načte data CSV.
  • Materializované zobrazení, customer_orders které spojuje záznamy z orders datových sad a customers datových sad, přetypuje časové razítko objednávky na datum a vybere customer_idpole , , order_numberstatea order_date pole.
  • Materializované zobrazení s názvem daily_orders_by_state agreguje denní počet objednávek pro každý stav.
CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM LIVE.orders o
INNER JOIN LIVE.customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM LIVE.customer_orders
GROUP BY state, order_date;