Pijplijncode ontwikkelen met Python
Delta Live Tables introduceert verschillende nieuwe Python-codeconstructies voor het definiëren van gerealiseerde weergaven en streamingtabellen in pijplijnen. Python-ondersteuning voor het ontwikkelen van pijplijnen is gebaseerd op de basisbeginselen van PySpark DataFrame en Structured Streaming-API's.
Voor gebruikers die niet bekend zijn met Python en DataFrames, raadt Databricks het gebruik van de SQL-interface aan. Zie Pijplijncode ontwikkelen met SQL.
Zie Python-taalreferentie voor Delta Live Tablesvoor een volledig overzicht van de Python-syntaxis van Delta Live Tables.
Basisbeginselen van Python voor pijplijnontwikkeling
Python-code waarmee gegevenssets van Delta Live Tables worden gemaakt, moet DataFrames retourneren.
Alle Python-API's voor Delta Live Tables worden geïmplementeerd in de dlt
-module. Uw Delta Live Tables-pijplijncode die met Python is geïmplementeerd, moet de dlt
-module expliciet bovenaan in de Python-notebooks en -bestanden importeren.
Lees- en schrijfbewerkingen zijn standaard ingesteld op de catalogus en het schema dat is opgegeven tijdens de pijplijnconfiguratie. Zie De doelcatalogus en het schema instellen.
Delta Live Tables-specifieke Python-code verschilt van andere typen Python-code op één kritieke manier: Python-pijplijncode roept niet rechtstreeks de functies aan die gegevensopname en transformatie uitvoeren om Delta Live Tables-gegevenssets te maken. In plaats daarvan interpreteert Delta Live Tables de decoratorfuncties uit de dlt
module in alle broncodebestanden die zijn geconfigureerd in een pijplijn en bouwt een gegevensstroomgrafiek.
Belangrijk
Als u onverwacht gedrag wilt voorkomen wanneer uw pijplijn wordt uitgevoerd, moet u geen code opnemen die mogelijk neveneffecten heeft in uw functies die gegevenssets definiëren. Zie de Python-verwijzing voor meer informatie.
Een gerealiseerde weergave of streamingtabel maken met Python
De @dlt.table
decorator vertelt Delta Live Tables om een gematerialiseerde weergave of streamingtabel te maken op basis van de resultaten die door een functie worden geretourneerd. De resultaten van een batchleesbewerking creëren een gematerialiseerde weergave, terwijl de resultaten van een streamingleesbewerking een streamingtabel creëren.
Standaard worden gerealiseerde weergave- en streamingtabelnamen afgeleid van functienamen. In het volgende codevoorbeeld ziet u de basissyntaxis voor het maken van een gerealiseerde weergave en streamingtabel:
Notitie
Beide functies verwijzen naar dezelfde tabel in de samples
catalogus en gebruiken dezelfde decoratorfunctie. In deze voorbeelden wordt benadrukt dat het enige verschil in de basissyntaxis voor gematerialiseerde weergaven en streamingtabellen het gebruik van spark.read
tegenover spark.readStream
is.
Niet alle gegevensbronnen ondersteunen streaming-leesbewerkingen. Sommige gegevensbronnen moeten altijd worden verwerkt met semantiek voor streaming.
import dlt
@dlt.table()
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")
@dlt.table()
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")
U kunt desgewenst de tabelnaam opgeven met behulp van het argument name
in de @dlt.table
decorator. In het volgende voorbeeld ziet u dit patroon voor een gematerialiseerde weergave en streamingtabel:
import dlt
@dlt.table(name = "trips_mv")
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")
@dlt.table(name = "trips_st")
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")
Gegevens laden uit objectopslag
Delta Live Tables ondersteunt het laden van gegevens uit alle indelingen die worden ondersteund door Azure Databricks. Zie opties voor gegevensindeling.
Notitie
In deze voorbeelden worden gegevens gebruikt die beschikbaar zijn onder de /databricks-datasets
automatisch gekoppelde werkruimte. Databricks raadt aan volumepaden of cloud-URI's te gebruiken om te verwijzen naar gegevens die zijn opgeslagen in cloudobjectopslag. Zie Wat zijn Unity Catalog-volumes?.
Databricks raadt aan om automatisch laden en streamingtabellen te gebruiken bij het configureren van incrementele opnameworkloads voor gegevens die zijn opgeslagen in de opslag van cloudobjecten. Zie Wat is automatisch laadprogramma?
In het volgende voorbeeld wordt een streamingtabel gemaakt op basis van JSON-bestanden met behulp van automatisch laden:
import dlt
@dlt.table()
def ingestion_st():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
In het volgende voorbeeld wordt batch-semantiek gebruikt om een JSON-map te lezen en een gerealiseerde weergave te maken:
import dlt
@dlt.table()
def batch_mv():
return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")
Gegevens valideren met verwachtingen
U kunt verwachtingen gebruiken om beperkingen voor gegevenskwaliteit in te stellen en af te dwingen. Zie Gegevenskwaliteit beheren met de verwachtingen van pijplijnen.
De volgende code gebruikt @dlt.expect_or_drop
om een verwachting te definiëren die records valid_data
verwijdert die null zijn tijdens gegevensopname:
import dlt
@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
Doorzoek de geïnstantieerde weergaven en streamingtabellen die in uw pijplijn zijn gedefinieerd.
In het volgende voorbeeld worden vier gegevenssets gedefinieerd:
- Een streamingtabel met de naam
orders
waarmee JSON-gegevens worden geladen. - Een gerealiseerde weergave met de naam
customers
waarmee CSV-gegevens worden geladen. - Een gerealiseerde weergave met de naam
customer_orders
die records uit deorders
encustomers
gegevenssets samenvoegt, de tijdstempel van de order naar een datum cast en decustomer_id
velden ,order_number
enstate
order_date
selecteert. - Een gerealiseerde weergave met de naam
daily_orders_by_state
waarmee het dagelijkse aantal orders voor elke status wordt samengevoegd.
Notitie
Wanneer u query's uitvoert op weergaven of tabellen in uw pijplijn, kunt u de catalogus en het schema rechtstreeks opgeven of kunt u de standaardinstellingen gebruiken die zijn geconfigureerd in uw pijplijn. In dit voorbeeld worden de orders
, customers
en customer_orders
tabellen geschreven en gelezen uit de standaardcatalogus en het standaardschema dat is geconfigureerd voor uw pijplijn.
Verouderde publicatiemodus maakt gebruik van het LIVE
schema om een query uit te voeren op andere gematerialiseerde weergaven en streamingtabellen die zijn gedefinieerd in uw pijplijn. In nieuwe pijplijnen wordt de syntaxis van het LIVE
schema stilletjes genegeerd. Zie LIVE-schema (verouderd).
import dlt
from pyspark.sql.functions import col
@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
@dlt.table()
def customers():
return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")
@dlt.table()
def customer_orders():
return (spark.read.table("orders")
.join(spark.read.table("customers"), "customer_id")
.select("customer_id",
"order_number",
"state",
col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
)
)
@dlt.table()
def daily_orders_by_state():
return (spark.read.table("customer_orders")
.groupBy("state", "order_date")
.count().withColumnRenamed("count", "order_count")
)
Tabellen maken in een for
lus
U kunt Python for
-lussen gebruiken om programmatisch meerdere tabellen te maken. Dit kan handig zijn wanneer u veel gegevensbronnen of doel-datasets hebt die slechts met een paar parameters variëren, wat resulteert in minder totale code om te onderhouden en minder coderedundantie.
De for
lus evalueert logica in seriële volgorde, maar zodra de planning voor de gegevenssets is voltooid, wordt de logica van de pijplijn parallel uitgevoerd.
Belangrijk
Wanneer u dit patroon gebruikt om gegevenssets te definiëren, moet u ervoor zorgen dat de lijst met waarden die worden doorgegeven aan de for
lus altijd additief is. Als een gegevensset die eerder in een pijplijn is gedefinieerd, wordt weggelaten uit een toekomstige pijplijnuitvoering, wordt die gegevensset automatisch verwijderd uit het doelschema.
In het volgende voorbeeld worden vijf tabellen gemaakt waarmee klantorders per regio worden gefilterd. Hier wordt de regionaam gebruikt om de naam van de doelmaterialized views in te stellen en de brongegevens te filteren. Tijdelijke weergaven worden gebruikt om koppelingen te definiëren uit de brontabellen die worden gebruikt bij het samenstellen van de uiteindelijke gematerialiseerde weergaven.
import dlt
from pyspark.sql.functions import collect_list, col
@dlt.view()
def customer_orders():
orders = spark.read.table("samples.tpch.orders")
customer = spark.read.table("samples.tpch.customer")
return (orders.join(customer, orders.o_custkey == customer.c_custkey)
.select(
col("c_custkey").alias("custkey"),
col("c_name").alias("name"),
col("c_nationkey").alias("nationkey"),
col("c_phone").alias("phone"),
col("o_orderkey").alias("orderkey"),
col("o_orderstatus").alias("orderstatus"),
col("o_totalprice").alias("totalprice"),
col("o_orderdate").alias("orderdate"))
)
@dlt.view()
def nation_region():
nation = spark.read.table("samples.tpch.nation")
region = spark.read.table("samples.tpch.region")
return (nation.join(region, nation.n_regionkey == region.r_regionkey)
.select(
col("n_name").alias("nation"),
col("r_name").alias("region"),
col("n_nationkey").alias("nationkey")
)
)
# Extract region names from region table
region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]
# Iterate through region names to create new region-specific materialized views
for region in region_list:
@dlt.table(name=f"{region.lower().replace(' ', '_')}_customer_orders")
def regional_customer_orders(region_filter=region):
customer_orders = spark.read.table("customer_orders")
nation_region = spark.read.table("nation_region")
return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
.select(
col("custkey"),
col("name"),
col("phone"),
col("nation"),
col("region"),
col("orderkey"),
col("orderstatus"),
col("totalprice"),
col("orderdate")
).filter(f"region = '{region_filter}'")
)
Hier volgt een voorbeeld van de gegevensstroomgrafiek voor deze pijplijn:
Problemen oplossen: for
lus maakt veel tabellen met dezelfde waarden
Het luie uitvoeringsmodel dat pijplijnen gebruiken om Python-code te evalueren, vereist dat je logica direct naar individuele waarden verwijst wanneer de functie, die door @dlt.table()
is versierd, wordt aangeroepen.
In het volgende voorbeeld ziet u twee juiste benaderingen voor het definiëren van tabellen met een for
lus. In beide voorbeelden wordt expliciet naar elke tabelnaam uit de tables
lijst verwezen binnen de functie die is gedecoreerd door @dlt.table()
.
import dlt
# Create a parent function to set local variables
def create_table(table_name):
@dlt.table(name=table_name)
def t():
return spark.read.table(table_name)
tables = ["t1", "t2", "t3"]
for t_name in tables:
create_table(t_name)
# Call `@dlt.table()` within a for loop and pass values as variables
tables = ["t1", "t2", "t3"]
for t_name in tables:
@dlt.table(name=t_name)
def create_table(table_name=t_name):
return spark.read.table(table_name)
Het volgende voorbeeld for
lus:
import dlt
# Don't do this!
tables = ["t1", "t2", "t3"]
for t_name in tables:
@dlt.table(name=t_name)
def create_table():
return spark.read.table(t_name)