Pijplijncode ontwikkelen met SQL
Delta Live Tables introduceert verschillende nieuwe SQL-trefwoorden en -functies voor het definiƫren van gerealiseerde weergaven en streamingtabellen in pijplijnen. SQL-ondersteuning voor het ontwikkelen van pijplijnen bouwt voort op de basisbeginselen van Spark SQL en voegt ondersteuning toe voor structured streaming-functionaliteit.
Gebruikers die bekend zijn met PySpark DataFrames geven mogelijk de voorkeur aan het ontwikkelen van pijplijncode met Python. Python biedt ondersteuning voor uitgebreidere tests en bewerkingen die lastig te implementeren zijn met SQL, zoals metaprogrammeringsbewerkingen. Zie Pijplijncode ontwikkelen met Python.
Zie de SQL-taalreferentie delta livetabellen voor een volledige verwijzing naar sql-syntaxis van Delta Live Tables.
Basisbeginselen van SQL voor pijplijnontwikkeling
SQL-code waarmee Delta Live Tables-gegevenssets worden gemaakt, maakt gebruik van de CREATE OR REFRESH
syntaxis om gerealiseerde weergaven en streamingtabellen te definiƫren op basis van queryresultaten.
Het STREAM
trefwoord geeft aan of de gegevensbron waarnaar in een SELECT
component wordt verwezen, moet worden gelezen met streaming-semantiek.
De broncode van Delta Live Tables verschilt essentieel van SQL-scripts: Delta Live Tables evalueert alle gegevenssetdefinities in alle broncodebestanden die zijn geconfigureerd in een pijplijn en bouwt een gegevensstroomgrafiek voordat query's worden uitgevoerd. De volgorde van query's die worden weergegeven in een notebook of script definieert niet de uitvoeringsvolgorde.
Een gerealiseerde weergave maken met SQL
In het volgende codevoorbeeld ziet u de basissyntaxis voor het maken van een gerealiseerde weergave met SQL:
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
Een streamingtabel maken met SQL
In het volgende codevoorbeeld ziet u de basissyntaxis voor het maken van een streamingtabel met SQL:
Notitie
Niet alle gegevensbronnen ondersteunen streaming-leesbewerkingen en sommige gegevensbronnen moeten altijd worden verwerkt met semantiek voor streaming.
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
Gegevens laden uit objectopslag
Delta Live Tables ondersteunt het laden van gegevens uit alle indelingen die worden ondersteund door Azure Databricks. Zie opties voor gegevensindeling.
Notitie
In deze voorbeelden worden gegevens gebruikt die beschikbaar zijn onder de /databricks-datasets
automatisch gekoppelde werkruimte. Databricks raadt aan volumepaden of cloud-URI's te gebruiken om te verwijzen naar gegevens die zijn opgeslagen in cloudobjectopslag. Zie Wat zijn Unity Catalog-volumes?
Databricks raadt aan om automatisch laden en streamingtabellen te gebruiken bij het configureren van incrementele opnameworkloads voor gegevens die zijn opgeslagen in de opslag van cloudobjecten. Zie Wat is automatisch laadprogramma?
SQL gebruikt de read_files
functie om de functionaliteit voor automatisch laden aan te roepen. U moet ook het STREAM
trefwoord gebruiken om een streaming-leesbewerking met read_files
te configureren.
In het volgende voorbeeld wordt een streamingtabel gemaakt op basis van JSON-bestanden met behulp van automatisch laden:
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
De read_files
functie ondersteunt ook batch-semantiek om gerealiseerde weergaven te maken. In het volgende voorbeeld wordt batch-semantiek gebruikt om een JSON-map te lezen en een gerealiseerde weergave te maken:
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");
Gegevens valideren met verwachtingen
U kunt verwachtingen gebruiken om beperkingen voor gegevenskwaliteit in te stellen en af te dwingen. Zie Gegevenskwaliteit beheren met Delta Live Tables.
De volgende code definieert een verwachting met de naam valid_data
die records verwijdert die null zijn tijdens gegevensopname:
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Gerealiseerde weergaven en streamingtabellen doorzoeken die zijn gedefinieerd in uw pijplijn
Gebruik het LIVE
schema om een query uit te voeren op andere gerealiseerde weergaven en streamingtabellen die zijn gedefinieerd in uw pijplijn.
In het volgende voorbeeld worden vier gegevenssets gedefinieerd:
- Een streamingtabel met de naam
orders
waarmee JSON-gegevens worden geladen. - Een gerealiseerde weergave met de naam
customers
waarmee CSV-gegevens worden geladen. - Een gerealiseerde weergave met de naam
customer_orders
die records uit deorders
encustomers
gegevenssets samenvoegt, de tijdstempel van de order naar een datum cast en decustomer_id
velden ,order_number
enstate
order_date
selecteert. - Een gerealiseerde weergave met de naam
daily_orders_by_state
waarmee het dagelijkse aantal orders voor elke status wordt samengevoegd.
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM LIVE.orders o
INNER JOIN LIVE.customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM LIVE.customer_orders
GROUP BY state, order_date;