Pijplijncode ontwikkelen met Python
Delta Live Tables introduceert verschillende nieuwe Python-codeconstructies voor het definiëren van gerealiseerde weergaven en streamingtabellen in pijplijnen. Python-ondersteuning voor het ontwikkelen van pijplijnen is gebaseerd op de basisbeginselen van PySpark DataFrame en Structured Streaming-API's.
Voor gebruikers die niet bekend zijn met Python en DataFrames, raadt Databricks het gebruik van de SQL-interface aan. Zie Pijplijncode ontwikkelen met SQL.
Zie de Python-taalreferentie voor Delta Live Tables voor een volledige verwijzing naar de Python-syntaxis van Delta Live Tables.
Basisbeginselen van Python voor pijplijnontwikkeling
Python-code waarmee Gegevenssets van Delta Live Tables worden gemaakt, moeten DataFrames retourneren.
Alle Python-API's voor Delta Live Tables worden geïmplementeerd in de dlt
module. Uw Delta Live Tables-pijplijncode die is geïmplementeerd met Python, moet de dlt
module expliciet boven aan Python-notebooks en -bestanden importeren.
Delta Live Tables-specifieke Python-code verschilt van andere typen Python-code op één kritieke manier: Python-pijplijncode roept niet rechtstreeks de functies aan die gegevensopname en transformatie uitvoeren om Delta Live Tables-gegevenssets te maken. In plaats daarvan interpreteert Delta Live Tables de decoratorfuncties van de dlt
module in alle broncodebestanden die zijn geconfigureerd in een pijplijn en bouwt een gegevensstroomgrafiek.
Belangrijk
Als u onverwacht gedrag wilt voorkomen wanneer uw pijplijn wordt uitgevoerd, moet u geen code opnemen die mogelijk neveneffecten heeft in uw functies die gegevenssets definiëren. Zie de Python-verwijzing voor meer informatie.
Een gerealiseerde weergave of streamingtabel maken met Python
De @dlt.table
decorator vertelt Delta Live Tables om een gerealiseerde weergave of streamingtabel te maken op basis van de resultaten die door een functie worden geretourneerd. De resultaten van een batchleesbewerking maken een gerealiseerde weergave, terwijl de resultaten van een streaming-leesbewerking een streamingtabel maken.
Standaard worden gerealiseerde weergave- en streamingtabelnamen afgeleid van functienamen. In het volgende codevoorbeeld ziet u de basissyntaxis voor het maken van een gerealiseerde weergave en streamingtabel:
Notitie
Beide functies verwijzen naar dezelfde tabel in de samples
catalogus en gebruiken dezelfde decoratorfunctie. In deze voorbeelden wordt benadrukt dat het enige verschil in de basissyntaxis voor gerealiseerde weergaven en streamingtabellen wordt gebruikt spark.read
versus spark.readStream
.
Niet alle gegevensbronnen ondersteunen streaming-leesbewerkingen. Sommige gegevensbronnen moeten altijd worden verwerkt met semantiek voor streaming.
import dlt
@dlt.table()
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")
@dlt.table()
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")
U kunt desgewenst de tabelnaam opgeven met behulp van het name
argument in de @dlt.table
decorator. In het volgende voorbeeld ziet u dit patroon voor een gerealiseerde weergave en streamingtabel:
import dlt
@dlt.table(name = "trips_mv")
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")
@dlt.table(name = "trips_st")
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")
Gegevens laden uit objectopslag
Delta Live Tables ondersteunt het laden van gegevens uit alle indelingen die worden ondersteund door Azure Databricks. Zie opties voor gegevensindeling.
Notitie
In deze voorbeelden worden gegevens gebruikt die beschikbaar zijn onder de /databricks-datasets
automatisch gekoppelde werkruimte. Databricks raadt aan volumepaden of cloud-URI's te gebruiken om te verwijzen naar gegevens die zijn opgeslagen in cloudobjectopslag. Zie Wat zijn Unity Catalog-volumes?
Databricks raadt aan om automatisch laden en streamingtabellen te gebruiken bij het configureren van incrementele opnameworkloads voor gegevens die zijn opgeslagen in de opslag van cloudobjecten. Zie Wat is automatisch laadprogramma?
In het volgende voorbeeld wordt een streamingtabel gemaakt op basis van JSON-bestanden met behulp van automatisch laden:
import dlt
@dlt.table()
def ingestion_st():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
In het volgende voorbeeld wordt batch-semantiek gebruikt om een JSON-map te lezen en een gerealiseerde weergave te maken:
import dlt
@dlt.table()
def batch_mv():
return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")
Gegevens valideren met verwachtingen
U kunt verwachtingen gebruiken om beperkingen voor gegevenskwaliteit in te stellen en af te dwingen. Zie Gegevenskwaliteit beheren met Delta Live Tables.
De volgende code gebruikt @dlt.expect_or_drop
om een verwachting te definiëren die records valid_data
verwijdert die null zijn tijdens gegevensopname:
import dlt
@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
Gerealiseerde weergaven en streamingtabellen doorzoeken die zijn gedefinieerd in uw pijplijn
Gebruik het LIVE
schema om een query uit te voeren op andere gerealiseerde weergaven en streamingtabellen die zijn gedefinieerd in uw pijplijn.
In het volgende voorbeeld worden vier gegevenssets gedefinieerd:
- Een streamingtabel met de naam
orders
waarmee JSON-gegevens worden geladen. - Een gerealiseerde weergave met de naam
customers
waarmee CSV-gegevens worden geladen. - Een gerealiseerde weergave met de naam
customer_orders
die records uit deorders
encustomers
gegevenssets samenvoegt, de tijdstempel van de order naar een datum cast en decustomer_id
velden ,order_number
enstate
order_date
selecteert. - Een gerealiseerde weergave met de naam
daily_orders_by_state
waarmee het dagelijkse aantal orders voor elke status wordt samengevoegd.
import dlt
from pyspark.sql.functions import col
@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
@dlt.table()
def customers():
return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")
@dlt.table()
def customer_orders():
return (spark.read.table("LIVE.orders")
.join(spark.read.table("LIVE.customers"), "customer_id")
.select("customer_id",
"order_number",
"state",
col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
)
)
@dlt.table()
def daily_orders_by_state():
return (spark.read.table("LIVE.customer_orders")
.groupBy("state", "order_date")
.count().withColumnRenamed("count", "order_count")
)
Tabellen in een for
lus maken
U kunt Python-lussen for
gebruiken om programmatisch meerdere tabellen te maken. Dit kan handig zijn wanneer u veel gegevensbronnen of doelgegevenssets hebt die variëren met slechts een paar parameters, wat resulteert in minder totale code om code te onderhouden en minder coderedundantie.
De for
lus evalueert logica in seriële volgorde, maar zodra de planning voor de gegevenssets is voltooid, wordt de logica van de pijplijn parallel uitgevoerd.
Belangrijk
Wanneer u dit patroon gebruikt om gegevenssets te definiëren, moet u ervoor zorgen dat de lijst met waarden die aan de for
lus worden doorgegeven, altijd additief is. Als een gegevensset die eerder in een pijplijn is gedefinieerd, wordt weggelaten uit een toekomstige pijplijnuitvoering, wordt die gegevensset automatisch verwijderd uit het doelschema.
In het volgende voorbeeld worden vijf tabellen gemaakt waarmee klantorders per regio worden gefilterd. Hier wordt de regionaam gebruikt om de naam van de gerealiseerde doelweergaven in te stellen en de brongegevens te filteren. Tijdelijke weergaven worden gebruikt om joins te definiëren uit de brontabellen die worden gebruikt bij het samenstellen van de uiteindelijke gerealiseerde weergaven.
import dlt
from pyspark.sql.functions import collect_list, col
@dlt.view()
def customer_orders():
orders = spark.read.table("samples.tpch.orders")
customer = spark.read.table("samples.tpch.customer")
return (orders.join(customer, orders.o_custkey == customer.c_custkey)
.select(
col("c_custkey").alias("custkey"),
col("c_name").alias("name"),
col("c_nationkey").alias("nationkey"),
col("c_phone").alias("phone"),
col("o_orderkey").alias("orderkey"),
col("o_orderstatus").alias("orderstatus"),
col("o_totalprice").alias("totalprice"),
col("o_orderdate").alias("orderdate"))
)
@dlt.view()
def nation_region():
nation = spark.read.table("samples.tpch.nation")
region = spark.read.table("samples.tpch.region")
return (nation.join(region, nation.n_regionkey == region.r_regionkey)
.select(
col("n_name").alias("nation"),
col("r_name").alias("region"),
col("n_nationkey").alias("nationkey")
)
)
# Extract region names from region table
region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]
# Iterate through region names to create new region-specific materialized views
for region in region_list:
@dlt.table(name=f"{region.lower().replace(' ', '_')}_customer_orders")
def regional_customer_orders(region_filter=region):
customer_orders = spark.read.table("LIVE.customer_orders")
nation_region = spark.read.table("LIVE.nation_region")
return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
.select(
col("custkey"),
col("name"),
col("phone"),
col("nation"),
col("region"),
col("orderkey"),
col("orderstatus"),
col("totalprice"),
col("orderdate")
).filter(f"region = '{region_filter}'")
)
Hier volgt een voorbeeld van de gegevensstroomgrafiek voor deze pijplijn:
Problemen oplossen: for
lus maakt veel tabellen met dezelfde waarden
Het luie uitvoeringsmodel dat pijplijnen gebruiken om Python-code te evalueren, vereist dat uw logica rechtstreeks verwijst naar afzonderlijke waarden wanneer de functie @dlt.table()
wordt aangeroepen.
In het volgende voorbeeld ziet u twee juiste benaderingen voor het definiëren van tabellen met een for
lus. In beide voorbeelden wordt expliciet naar elke tabelnaam uit de tables
lijst verwezen binnen de functie die is ingericht door @dlt.table()
.
import dlt
# Create a parent function to set local variables
def create_table(table_name):
@dlt.table(name=table_name)
def t():
return spark.read.table(table_name)
tables = ["t1", "t2", "t3"]
for t_name in tables:
create_table(t_name)
# Call `@dlt.table()` within a for loop and pass values as variables
tables = ["t1", "t2", "t3"]
for t_name in tables:
@dlt.table(name=t_name)
def create_table(table_name=t_name):
return spark.read.table(table_name)
In het volgende voorbeeld wordt niet correct verwezen naar waarden. In dit voorbeeld worden tabellen met verschillende namen gemaakt, maar alle tabellen laden gegevens van de laatste waarde in de lus for
:
import dlt
# Don't do this!
tables = ["t1", "t2", "t3"]
for t_name in tables:
@dlt.table(name=t_name)
def create_table():
return spark.read.table(t_name)