Utveckla pipelinekod med Python
Delta Live Tables introducerar flera nya Python-kodkonstruktioner för att definiera materialiserade vyer och strömmande tabeller i pipelines. Python-stöd för att utveckla pipelines bygger på grunderna i PySpark DataFrame och API:er för strukturerad direktuppspelning.
För användare som inte är bekanta med Python och DataFrames rekommenderar Databricks att du använder SQL-gränssnittet. Se Utveckla pipelinekod med SQL.
En fullständig referens till Python-syntaxen för Delta Live Tables finns i Delta Live Tables Python language reference (Delta Live Tables Python language reference).
Grunderna i Python för pipelineutveckling
Python-kod som skapar Delta Live Tables-datauppsättningar måste returnera DataFrames.
Alla Delta Live Tables Python-API:er implementeras i modulen dlt
. Din Delta Live Tables-pipelinekod som implementeras med Python måste uttryckligen importera modulen dlt
överst i Python-notebook-filer och -filer.
Delta Live Tables-specifik Python-kod skiljer sig från andra typer av Python-kod på ett kritiskt sätt: Python-pipelinekoden anropar inte direkt de funktioner som utför datainmatning och transformering för att skapa Delta Live Tables-datauppsättningar. I stället tolkar Delta Live Tables dekoratörsfunktionerna från modulen dlt
i alla källkodsfiler som konfigurerats i en pipeline och skapar ett dataflödesdiagram.
Viktigt!
Om du vill undvika oväntat beteende när pipelinen körs ska du inte inkludera kod som kan ha biverkningar i dina funktioner som definierar datauppsättningar. Mer information finns i Python-referensen.
Skapa en materialiserad vy eller strömningstabell med Python
Dekoratören @dlt.table
uppmanar Delta Live Tables att skapa en materialiserad vy eller en strömmande tabell baserat på resultatet som returneras av en funktion. Resultatet av en batchläsning skapar en materialiserad vy, medan resultatet av en direktuppspelningsläsning skapar en strömmande tabell.
Som standard härleds materialiserade vy- och strömningstabellnamn från funktionsnamn. I följande kodexempel visas den grundläggande syntaxen för att skapa en materialiserad vy och en strömmande tabell:
Kommentar
Båda funktionerna refererar till samma tabell i samples
katalogen och använder samma dekoratörsfunktion. De här exemplen visar att den enda skillnaden i den grundläggande syntaxen för materialiserade vyer och strömmande tabeller är att använda jämfört med spark.read
spark.readStream
.
Alla datakällor stöder inte direktuppspelningsläsningar. Vissa datakällor bör alltid bearbetas med strömmande semantik.
import dlt
@dlt.table()
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")
@dlt.table()
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")
Du kan också ange tabellnamnet med argumentet name
i dekoratören @dlt.table
. I följande exempel visas det här mönstret för en materialiserad vy och en strömmande tabell:
import dlt
@dlt.table(name = "trips_mv")
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")
@dlt.table(name = "trips_st")
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")
Läsa in data från objektlagring
Delta Live Tables stöder inläsning av data från alla format som stöds av Azure Databricks. Se Alternativ för dataformat.
Kommentar
De här exemplen använder data som är tillgängliga under automatiskt /databricks-datasets
monterade på din arbetsyta. Databricks rekommenderar att du använder volymsökvägar eller moln-URI:er för att referera till data som lagras i molnobjektlagring. Se Vad är Unity Catalog-volymer?.
Databricks rekommenderar att du använder tabeller för automatisk inläsning och strömning när du konfigurerar inkrementella inmatningsarbetsbelastningar mot data som lagras i molnobjektlagring. Se Vad är automatisk inläsare?.
I följande exempel skapas en strömmande tabell från JSON-filer med autoinläsning:
import dlt
@dlt.table()
def ingestion_st():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
I följande exempel används batchsemantik för att läsa en JSON-katalog och skapa en materialiserad vy:
import dlt
@dlt.table()
def batch_mv():
return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")
Verifiera data med förväntningar
Du kan använda förväntningar för att ange och tillämpa datakvalitetsbegränsningar. Se Hantera datakvalitet med Delta Live Tables.
Följande kod använder @dlt.expect_or_drop
för att definiera en förväntan med namnet valid_data
som tar bort poster som är null under datainmatning:
import dlt
@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
Fråga materialiserade vyer och strömmande tabeller som definierats i din pipeline
Använd schemat LIVE
för att fråga andra materialiserade vyer och strömmande tabeller som definierats i din pipeline.
I följande exempel definieras fyra datauppsättningar:
- En strömmande tabell med namnet
orders
som läser in JSON-data. - En materialiserad vy med namnet
customers
som läser in CSV-data. - En materialiserad vy med namnet
customer_orders
som kopplar poster frånorders
datauppsättningarna ochcustomers
genererar tidsstämpeln för ordningen till ett datum och väljer fältencustomer_id
,order_number
,state
ochorder_date
. - En materialiserad vy med namnet
daily_orders_by_state
som aggregerar det dagliga antalet beställningar för varje tillstånd.
import dlt
from pyspark.sql.functions import col
@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
@dlt.table()
def customers():
return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")
@dlt.table()
def customer_orders():
return (spark.read.table("LIVE.orders")
.join(spark.read.table("LIVE.customers"), "customer_id")
.select("customer_id",
"order_number",
"state",
col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
)
)
@dlt.table()
def daily_orders_by_state():
return (spark.read.table("LIVE.customer_orders")
.groupBy("state", "order_date")
.count().withColumnRenamed("count", "order_count")
)
Skapa tabeller i en for
loop
Du kan använda Python-loopar for
för att skapa flera tabeller programmatiskt. Detta kan vara användbart när du har många datakällor eller måldatauppsättningar som bara varierar med några få parametrar, vilket resulterar i mindre total kod att underhålla och mindre kodredundans.
Loopen for
utvärderar logik i seriell ordning, men när planeringen är klar för datauppsättningarna körs logiken parallellt i pipelinen.
Viktigt!
När du använder det här mönstret för att definiera datauppsättningar kontrollerar du att listan med värden som skickas till loopen for
alltid är additiv. Om en datauppsättning som tidigare definierats i en pipeline utelämnas från en framtida pipelinekörning tas datauppsättningen bort automatiskt från målschemat.
I följande exempel skapas fem tabeller som filtrerar kundbeställningar efter region. Här används regionnamnet för att ange namnet på de materialiserade målvyerna och filtrera källdata. Tillfälliga vyer används för att definiera kopplingar från de källtabeller som används för att skapa de slutliga materialiserade vyerna.
import dlt
from pyspark.sql.functions import collect_list, col
@dlt.view()
def customer_orders():
orders = spark.read.table("samples.tpch.orders")
customer = spark.read.table("samples.tpch.customer")
return (orders.join(customer, orders.o_custkey == customer.c_custkey)
.select(
col("c_custkey").alias("custkey"),
col("c_name").alias("name"),
col("c_nationkey").alias("nationkey"),
col("c_phone").alias("phone"),
col("o_orderkey").alias("orderkey"),
col("o_orderstatus").alias("orderstatus"),
col("o_totalprice").alias("totalprice"),
col("o_orderdate").alias("orderdate"))
)
@dlt.view()
def nation_region():
nation = spark.read.table("samples.tpch.nation")
region = spark.read.table("samples.tpch.region")
return (nation.join(region, nation.n_regionkey == region.r_regionkey)
.select(
col("n_name").alias("nation"),
col("r_name").alias("region"),
col("n_nationkey").alias("nationkey")
)
)
# Extract region names from region table
region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]
# Iterate through region names to create new region-specific materialized views
for region in region_list:
@dlt.table(name=f"{region.lower().replace(' ', '_')}_customer_orders")
def regional_customer_orders(region_filter=region):
customer_orders = spark.read.table("LIVE.customer_orders")
nation_region = spark.read.table("LIVE.nation_region")
return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
.select(
col("custkey"),
col("name"),
col("phone"),
col("nation"),
col("region"),
col("orderkey"),
col("orderstatus"),
col("totalprice"),
col("orderdate")
).filter(f"region = '{region_filter}'")
)
Följande är ett exempel på dataflödesdiagrammet för den här pipelinen:
Felsökning: for
loopen skapar många tabeller med samma värden
Den lata körningsmodellen som pipelines använder för att utvärdera Python-kod kräver att logiken direkt refererar till enskilda värden när funktionen som dekoreras av @dlt.table()
anropas.
I följande exempel visas två korrekta metoder för att definiera tabeller med en for
loop. I båda exemplen tables
refereras varje tabellnamn från listan uttryckligen i funktionen som är dekorerad av @dlt.table()
.
import dlt
# Create a parent function to set local variables
def create_table(table_name):
@dlt.table(name=table_name)
def t():
return spark.read.table(table_name)
tables = ["t1", "t2", "t3"]
for t_name in tables:
create_table(t_name)
# Call `@dlt.table()` within a for loop and pass values as variables
tables = ["t1", "t2", "t3"]
for t_name in tables:
@dlt.table(name=t_name)
def create_table(table_name=t_name):
return spark.read.table(table_name)
Följande exempel refererar inte till värden korrekt. I det här exemplet skapas tabeller med distinkta namn, men alla tabeller läser in data från det sista värdet i loopen for
:
import dlt
# Don't do this!
tables = ["t1", "t2", "t3"]
for t_name in tables:
@dlt.table(name=t_name)
def create_table():
return spark.read.table(t_name)