Delen via


Pijplijncode ontwikkelen met SQL

Delta Live Tables introduceert verschillende nieuwe SQL-trefwoorden en -functies voor het definiƫren van gerealiseerde weergaven en streamingtabellen in pijplijnen. SQL-ondersteuning voor het ontwikkelen van pijplijnen bouwt voort op de basisbeginselen van Spark SQL en voegt ondersteuning toe voor structured streaming-functionaliteit.

Gebruikers die bekend zijn met PySpark DataFrames geven mogelijk de voorkeur aan het ontwikkelen van pijplijncode met Python. Python biedt ondersteuning voor uitgebreidere tests en bewerkingen die lastig te implementeren zijn met SQL, zoals metaprogrammeringsbewerkingen. Zie Pijplijncode ontwikkelen met Python.

Zie de SQL-taalreferentie delta livetabellen voor een volledige verwijzing naar sql-syntaxis van Delta Live Tables.

Basisbeginselen van SQL voor pijplijnontwikkeling

SQL-code waarmee Delta Live Tables-gegevenssets worden gemaakt, maakt gebruik van de CREATE OR REFRESH syntaxis om gerealiseerde weergaven en streamingtabellen te definiƫren op basis van queryresultaten.

Het STREAM trefwoord geeft aan of de gegevensbron waarnaar in een SELECT component wordt verwezen, moet worden gelezen met streaming-semantiek.

De broncode van Delta Live Tables verschilt essentieel van SQL-scripts: Delta Live Tables evalueert alle gegevenssetdefinities in alle broncodebestanden die zijn geconfigureerd in een pijplijn en bouwt een gegevensstroomgrafiek voordat query's worden uitgevoerd. De volgorde van query's die worden weergegeven in een notebook of script definieert niet de uitvoeringsvolgorde.

Een gerealiseerde weergave maken met SQL

In het volgende codevoorbeeld ziet u de basissyntaxis voor het maken van een gerealiseerde weergave met SQL:

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

Een streamingtabel maken met SQL

In het volgende codevoorbeeld ziet u de basissyntaxis voor het maken van een streamingtabel met SQL:

Notitie

Niet alle gegevensbronnen ondersteunen streaming-leesbewerkingen en sommige gegevensbronnen moeten altijd worden verwerkt met semantiek voor streaming.

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

Gegevens laden uit objectopslag

Delta Live Tables ondersteunt het laden van gegevens uit alle indelingen die worden ondersteund door Azure Databricks. Zie opties voor gegevensindeling.

Notitie

In deze voorbeelden worden gegevens gebruikt die beschikbaar zijn onder de /databricks-datasets automatisch gekoppelde werkruimte. Databricks raadt aan volumepaden of cloud-URI's te gebruiken om te verwijzen naar gegevens die zijn opgeslagen in cloudobjectopslag. Zie Wat zijn Unity Catalog-volumes?

Databricks raadt aan om automatisch laden en streamingtabellen te gebruiken bij het configureren van incrementele opnameworkloads voor gegevens die zijn opgeslagen in de opslag van cloudobjecten. Zie Wat is automatisch laadprogramma?

SQL gebruikt de read_files functie om de functionaliteit voor automatisch laden aan te roepen. U moet ook het STREAM trefwoord gebruiken om een streaming-leesbewerking met read_fileste configureren.

In het volgende voorbeeld wordt een streamingtabel gemaakt op basis van JSON-bestanden met behulp van automatisch laden:

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

De read_files functie ondersteunt ook batch-semantiek om gerealiseerde weergaven te maken. In het volgende voorbeeld wordt batch-semantiek gebruikt om een JSON-map te lezen en een gerealiseerde weergave te maken:

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");

Gegevens valideren met verwachtingen

U kunt verwachtingen gebruiken om beperkingen voor gegevenskwaliteit in te stellen en af te dwingen. Zie Gegevenskwaliteit beheren met Delta Live Tables.

De volgende code definieert een verwachting met de naam valid_data die records verwijdert die null zijn tijdens gegevensopname:

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Gerealiseerde weergaven en streamingtabellen doorzoeken die zijn gedefinieerd in uw pijplijn

Gebruik het LIVE schema om een query uit te voeren op andere gerealiseerde weergaven en streamingtabellen die zijn gedefinieerd in uw pijplijn.

In het volgende voorbeeld worden vier gegevenssets gedefinieerd:

  • Een streamingtabel met de naam orders waarmee JSON-gegevens worden geladen.
  • Een gerealiseerde weergave met de naam customers waarmee CSV-gegevens worden geladen.
  • Een gerealiseerde weergave met de naam customer_orders die records uit de orders en customers gegevenssets samenvoegt, de tijdstempel van de order naar een datum cast en de customer_idvelden , order_numberen stateorder_date selecteert.
  • Een gerealiseerde weergave met de naam daily_orders_by_state waarmee het dagelijkse aantal orders voor elke status wordt samengevoegd.
CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM LIVE.orders o
INNER JOIN LIVE.customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM LIVE.customer_orders
GROUP BY state, order_date;