Dela via


Utveckla pipeline-kod med SQL

DLT introducerar flera nya SQL-nyckelord och funktioner för att definiera materialiserade vyer och strömmande tabeller i pipelines. SQL-stöd för utveckling av pipelines bygger på grunderna i Spark SQL och lägger till stöd för structured Streaming-funktioner.

Användare som är bekanta med PySpark DataFrames kanske föredrar att utveckla pipelinekod med Python. Python har stöd för mer omfattande testning och åtgärder som är svåra att implementera med SQL, till exempel metaprogramåtgärder. Se utveckla pipelinekod med Python.

En fullständig referens till DLT SQL-syntax finns i DLT SQL-språkreferens.

Grunderna i SQL för pipelineutveckling

SQL-kod som skapar DLT-datauppsättningar använder CREATE OR REFRESH syntax för att definiera materialiserade vyer och strömmande tabeller mot frågeresultat.

Nyckelordet STREAM anger om datakällan som refereras i en SELECT-sats ska läsas med strömmande semantik.

Läser och skriver som standard den katalog och det schema som angavs under pipelinekonfigurationen. Se Ange målkatalogen och schemat.

DLT-källkod skiljer sig kritiskt från SQL-skript: DLT utvärderar alla datauppsättningsdefinitioner i alla källkodsfiler som konfigurerats i en pipeline och skapar ett dataflödesdiagram innan några frågor körs. Ordningen av frågor som visas i en notebook-fil eller ett skript definierar ordningen för kodutvärderingen, men inte utförandet av frågorna.

Skapa en materialiserad vy med SQL

I följande kodexempel visas den grundläggande syntaxen för att skapa en materialiserad vy med SQL:

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

Skapa en strömningstabell med SQL

I följande kodexempel visas den grundläggande syntaxen för att skapa en strömningstabell med SQL:

Note

Alla datakällor stöder inte direktuppspelningsläsningar, och vissa datakällor bör alltid bearbetas med strömmande semantik.

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

Läsa in data från objektlagring

DLT stöder inläsning av data från alla format som stöds av Azure Databricks. Se alternativ för dataformat.

Not

I de här exemplen används data som är tillgängliga under /databricks-datasets och monteras automatiskt på din arbetsyta. Databricks rekommenderar att du använder volymsökvägar eller moln-URI:er för att referera till data som lagras i molnobjektlagring. Se Vad är Unity Catalog-volymer?.

Databricks rekommenderar att du använder tabeller för automatisk inläsning och strömning när du konfigurerar inkrementella inmatningsarbetsbelastningar mot data som lagras i molnobjektlagring. Se Vad är Auto Loader?.

SQL använder funktionen read_files för att anropa funktioner för automatisk inläsning. Du måste också använda nyckelordet STREAM för att konfigurera en direktuppspelningsläsning med read_files.

I följande exempel skapas en strömmande tabell från JSON-filer med autoinläsning:

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Funktionen read_files stöder även batchsemantik för att skapa materialiserade vyer. I följande exempel används batchsemantik för att läsa en JSON-katalog och skapa en materialiserad vy:

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");

Verifiera data med förväntningar

Du kan använda förväntningar för att ange och tillämpa datakvalitetsbegränsningar. Se Hantera datakvalitet med pipelineförväntningar.

Följande kod definierar en förväntan med namnet valid_data som tar bort poster som är null under datainmatning:

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Fråga efter materialiserade vyer och strömstabeller som är definierade i din pipeline

I följande exempel definieras fyra datauppsättningar:

  • En strömmande tabell med namnet orders som läser in JSON-data.
  • En materialiserad vy med namnet customers som läser in CSV-data.
  • En materialiserad vy med namnet customer_orders som kopplar poster från datauppsättningarna orders och customers, omvandlar ordertidsstämpeln till ett datum och väljer fälten customer_id, order_number, stateoch order_date.
  • En materialiserad vy med namnet daily_orders_by_state som aggregerar det dagliga antalet order för varje tillstånd.

Not

När du kör frågor mot vyer eller tabeller i pipelinen kan du ange katalogen och schemat direkt, eller så kan du använda standardvärdena som konfigurerats i pipelinen. I det här exemplet skrivs tabellerna orders, customersoch customer_orders från standardkatalogen och schemat som konfigurerats för pipelinen.

Publiceringsläget för äldre system använder LIVE-schemat för att hämta data från andra materialiserade vyer och strömmande tabeller som definierats i din pipeline. I nya pipelines ignoreras LIVE schemasyntaxen tyst. Se LIVE-schema (äldre).

CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;