Dela via


Utveckla pipelinekod med SQL

Delta Live Tables introducerar flera nya SQL-nyckelord och funktioner för att definiera materialiserade vyer och strömmande tabeller i pipelines. SQL-stöd för utveckling av pipelines bygger på grunderna i Spark SQL och lägger till stöd för structured Streaming-funktioner.

Användare som är bekanta med PySpark DataFrames kanske föredrar att utveckla pipelinekod med Python. Python har stöd för mer omfattande testning och åtgärder som är svåra att implementera med SQL, till exempel metaprogramåtgärder. Se Utveckla pipelinekod med Python.

En fullständig referens till SQL-syntaxen för Delta Live Tables finns i Sql-språkreferens för Delta Live Tables.

Grunderna i SQL för pipelineutveckling

SQL-kod som skapar Delta Live Tables-datauppsättningar använder syntaxen CREATE OR REFRESH för att definiera materialiserade vyer och strömmande tabeller mot frågeresultat.

Nyckelordet STREAM anger om datakällan som refereras i en SELECT sats ska läsas med strömmande semantik.

Delta Live Tables-källkoden skiljer sig kritiskt från SQL-skript: Delta Live Tables utvärderar alla datauppsättningsdefinitioner för alla källkodsfiler som konfigurerats i en pipeline och skapar ett dataflödesdiagram innan några frågor körs. Ordningen på frågor som visas i en notebook-fil eller ett skript definierar inte körningsordningen.

Skapa en materialiserad vy med SQL

I följande kodexempel visas den grundläggande syntaxen för att skapa en materialiserad vy med SQL:

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

Skapa en strömningstabell med SQL

I följande kodexempel visas den grundläggande syntaxen för att skapa en strömningstabell med SQL:

Kommentar

Alla datakällor stöder inte direktuppspelningsläsningar, och vissa datakällor bör alltid bearbetas med strömmande semantik.

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

Läsa in data från objektlagring

Delta Live Tables stöder inläsning av data från alla format som stöds av Azure Databricks. Se Alternativ för dataformat.

Kommentar

De här exemplen använder data som är tillgängliga under automatiskt /databricks-datasets monterade på din arbetsyta. Databricks rekommenderar att du använder volymsökvägar eller moln-URI:er för att referera till data som lagras i molnobjektlagring. Se Vad är Unity Catalog-volymer?.

Databricks rekommenderar att du använder tabeller för automatisk inläsning och strömning när du konfigurerar inkrementella inmatningsarbetsbelastningar mot data som lagras i molnobjektlagring. Se Vad är automatisk inläsare?.

SQL använder read_files funktionen för att anropa funktioner för automatisk inläsning. Du måste också använda nyckelordet STREAM för att konfigurera en direktuppspelningsläsning med read_files.

I följande exempel skapas en strömmande tabell från JSON-filer med autoinläsning:

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Funktionen read_files stöder även batch-semantik för att skapa materialiserade vyer. I följande exempel används batchsemantik för att läsa en JSON-katalog och skapa en materialiserad vy:

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");

Verifiera data med förväntningar

Du kan använda förväntningar för att ange och tillämpa datakvalitetsbegränsningar. Se Hantera datakvalitet med Delta Live Tables.

Följande kod definierar en förväntan med namnet valid_data som tar bort poster som är null under datainmatning:

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Fråga materialiserade vyer och strömmande tabeller som definierats i din pipeline

Använd schemat LIVE för att fråga andra materialiserade vyer och strömmande tabeller som definierats i din pipeline.

I följande exempel definieras fyra datauppsättningar:

  • En strömmande tabell med namnet orders som läser in JSON-data.
  • En materialiserad vy med namnet customers som läser in CSV-data.
  • En materialiserad vy med namnet customer_orders som kopplar poster från orders datauppsättningarna och customers genererar tidsstämpeln för ordningen till ett datum och väljer fälten customer_id, order_number, stateoch order_date .
  • En materialiserad vy med namnet daily_orders_by_state som aggregerar det dagliga antalet beställningar för varje tillstånd.
CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM LIVE.orders o
INNER JOIN LIVE.customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM LIVE.customer_orders
GROUP BY state, order_date;