Pijplijncode ontwikkelen met SQL
DLT introduceert verschillende nieuwe SQL-trefwoorden en -functies voor het definiƫren van gerealiseerde weergaven en streamingtabellen in pijplijnen. SQL-ondersteuning voor het ontwikkelen van pijplijnen bouwt voort op de basisbeginselen van Spark SQL en voegt ondersteuning toe voor structured streaming-functionaliteit.
Gebruikers die bekend zijn met PySpark DataFrames geven mogelijk de voorkeur aan het ontwikkelen van pijplijncode met Python. Python biedt ondersteuning voor uitgebreidere tests en bewerkingen die lastig te implementeren zijn met SQL, zoals metaprogrammeringsbewerkingen. Zie Pijplijncode ontwikkelen met Python.
Zie DLT SQL-taalreferentievoor een volledig overzicht van de DLT SQL-syntaxis.
Basisbeginselen van SQL voor pijplijnontwikkeling
SQL-code waarmee DLT-gegevenssets worden gemaakt, maakt gebruik van de CREATE OR REFRESH
syntaxis om gerealiseerde weergaven en streamingtabellen te definiƫren op basis van queryresultaten.
Het STREAM
trefwoord geeft aan of de gegevensbron waarnaar wordt verwezen in een SELECT
-component moet worden gelezen met semantiek voor streaming.
Lees- en schrijfbewerkingen zijn standaard ingesteld op de catalogus en het schema dat is opgegeven tijdens de pijplijnconfiguratie. Zie De doelcatalogus en het schema instellen.
DLT-broncode verschilt essentieel van SQL-scripts: DLT evalueert alle gegevenssetdefinities voor alle broncodebestanden die zijn geconfigureerd in een pijplijn en bouwt een gegevensstroomgrafiek voordat query's worden uitgevoerd. De volgorde van query's die worden weergegeven in een notebook of script definieert de volgorde van code-evaluatie, maar niet de volgorde van de uitvoering van query's.
Een gerealiseerde weergave maken met SQL
In het volgende codevoorbeeld ziet u de basissyntaxis voor het maken van een gerealiseerde weergave met SQL:
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
Een streamingtabel maken met SQL
In het volgende codevoorbeeld ziet u de basissyntaxis voor het maken van een streamingtabel met SQL:
Notitie
Niet alle gegevensbronnen ondersteunen streaming-leesbewerkingen en sommige gegevensbronnen moeten altijd worden verwerkt met semantiek voor streaming.
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
Gegevens laden uit objectopslag
DLT ondersteunt het laden van gegevens uit alle indelingen die worden ondersteund door Azure Databricks. Zie opties voor gegevensindeling.
Notitie
In deze voorbeelden wordt gebruik gemaakt van gegevens die via de automatisch aan uw werkruimte gekoppelde /databricks-datasets
beschikbaar zijn. Databricks raadt aan volumepaden of cloud-URI's te gebruiken om te verwijzen naar gegevens die zijn opgeslagen in cloudobjectopslag. Zie Wat zijn Unity Catalog-volumes?.
Databricks raadt aan om automatisch laden en streamingtabellen te gebruiken bij het configureren van incrementele opnameworkloads voor gegevens die zijn opgeslagen in de opslag van cloudobjecten. Zie Wat is Auto Loader?.
SQL maakt gebruik van de read_files
-functie om de functionaliteit voor automatisch laden aan te roepen. U moet ook het STREAM
trefwoord gebruiken om een streaming-leesbewerking met read_files
te configureren.
In het volgende voorbeeld wordt een streamingtabel gemaakt op basis van JSON-bestanden met behulp van automatisch laden:
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
De functie read_files
ondersteunt ook batch-semantiek om gerealiseerde weergaven te maken. In het volgende voorbeeld wordt batch-semantiek gebruikt om een JSON-map te lezen en een gerealiseerde weergave te maken:
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");
Gegevens valideren met verwachtingen
U kunt verwachtingen gebruiken om beperkingen voor gegevenskwaliteit in te stellen en af te dwingen. Zie Gegevenskwaliteit beheren met de verwachtingen van pijplijnen.
De volgende code definieert een verwachting met de naam valid_data
waarmee records worden verwijderd die null zijn tijdens gegevensopname:
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Gematerialiseerde weergaven en streamingtabellen die in uw pijplijn zijn gedefinieerd doorzoeken.
In het volgende voorbeeld worden vier gegevenssets gedefinieerd:
- Een streamingtabel met de naam
orders
waarmee JSON-gegevens worden geladen. - Een gerealiseerde weergave met de naam
customers
waarmee CSV-gegevens worden geladen. - Een gerealiseerde weergave genaamd
customer_orders
die records uit deorders
- encustomers
-gegevenssets koppelt, de order-tijdstempel naar een datum omzet en de veldencustomer_id
,order_number
,state
enorder_date
selecteert. - Een gerealiseerde weergave met de naam
daily_orders_by_state
waarmee het dagelijkse aantal orders voor elke status wordt geaggregeerd.
Notitie
Wanneer u query's uitvoert op weergaven of tabellen in uw pijplijn, kunt u de catalogus en het schema rechtstreeks opgeven of kunt u de standaardinstellingen gebruiken die zijn geconfigureerd in uw pijplijn. In dit voorbeeld worden de orders
, customers
en customer_orders
tabellen geschreven en gelezen uit de standaardcatalogus en het standaardschema dat is geconfigureerd voor uw pijplijn.
Verouderde publicatiemodus maakt gebruik van het LIVE
schema om een query uit te voeren op andere gerealiseerde weergaven en streamingtabellen die zijn gedefinieerd in uw pijplijn. In nieuwe pijplijnen wordt de syntaxis van het LIVE
schema ongemerkt genegeerd. Zie LIVE-schema (verouderd).
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;