Dela via


Utveckla pipelinekod med Python

Delta Live Tables introducerar flera nya Python-kodkonstruktioner för att definiera materialiserade vyer och strömmande tabeller i pipelines. Python-stöd för att utveckla pipelines bygger på grunderna i PySpark DataFrame och API:er för strukturerad direktuppspelning.

För användare som inte är bekanta med Python och DataFrames rekommenderar Databricks att du använder SQL-gränssnittet. Se Utveckla pipelinekod med SQL.

En fullständig referens till Python-syntaxen för Delta Live Tables finns i Delta Live Tables Python language reference (Delta Live Tables Python language reference).

Grunderna i Python för pipelineutveckling

Python-kod som skapar Delta Live Tables-datauppsättningar måste returnera DataFrames.

Alla Delta Live Tables Python-API:er implementeras i modulen dlt . Din Delta Live Tables-pipelinekod som implementeras med Python måste uttryckligen importera modulen dlt överst i Python-notebook-filer och -filer.

Delta Live Tables-specifik Python-kod skiljer sig från andra typer av Python-kod på ett kritiskt sätt: Python-pipelinekoden anropar inte direkt de funktioner som utför datainmatning och transformering för att skapa Delta Live Tables-datauppsättningar. I stället tolkar Delta Live Tables dekoratörsfunktionerna från modulen dlt i alla källkodsfiler som konfigurerats i en pipeline och skapar ett dataflödesdiagram.

Viktigt!

Om du vill undvika oväntat beteende när pipelinen körs ska du inte inkludera kod som kan ha biverkningar i dina funktioner som definierar datauppsättningar. Mer information finns i Python-referensen.

Skapa en materialiserad vy eller strömningstabell med Python

Dekoratören @dlt.table uppmanar Delta Live Tables att skapa en materialiserad vy eller en strömmande tabell baserat på resultatet som returneras av en funktion. Resultatet av en batchläsning skapar en materialiserad vy, medan resultatet av en direktuppspelningsläsning skapar en strömmande tabell.

Som standard härleds materialiserade vy- och strömningstabellnamn från funktionsnamn. I följande kodexempel visas den grundläggande syntaxen för att skapa en materialiserad vy och en strömmande tabell:

Kommentar

Båda funktionerna refererar till samma tabell i samples katalogen och använder samma dekoratörsfunktion. De här exemplen visar att den enda skillnaden i den grundläggande syntaxen för materialiserade vyer och strömmande tabeller är att använda jämfört med spark.read spark.readStream.

Alla datakällor stöder inte direktuppspelningsläsningar. Vissa datakällor bör alltid bearbetas med strömmande semantik.

import dlt

@dlt.table()
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table()
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

Du kan också ange tabellnamnet med argumentet name i dekoratören @dlt.table . I följande exempel visas det här mönstret för en materialiserad vy och en strömmande tabell:

import dlt

@dlt.table(name = "trips_mv")
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table(name = "trips_st")
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

Läsa in data från objektlagring

Delta Live Tables stöder inläsning av data från alla format som stöds av Azure Databricks. Se Alternativ för dataformat.

Kommentar

De här exemplen använder data som är tillgängliga under automatiskt /databricks-datasets monterade på din arbetsyta. Databricks rekommenderar att du använder volymsökvägar eller moln-URI:er för att referera till data som lagras i molnobjektlagring. Se Vad är Unity Catalog-volymer?.

Databricks rekommenderar att du använder tabeller för automatisk inläsning och strömning när du konfigurerar inkrementella inmatningsarbetsbelastningar mot data som lagras i molnobjektlagring. Se Vad är automatisk inläsare?.

I följande exempel skapas en strömmande tabell från JSON-filer med autoinläsning:

import dlt

@dlt.table()
def ingestion_st():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

I följande exempel används batchsemantik för att läsa en JSON-katalog och skapa en materialiserad vy:

import dlt

@dlt.table()
def batch_mv():
  return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")

Verifiera data med förväntningar

Du kan använda förväntningar för att ange och tillämpa datakvalitetsbegränsningar. Se Hantera datakvalitet med Delta Live Tables.

Följande kod använder @dlt.expect_or_drop för att definiera en förväntan med namnet valid_data som tar bort poster som är null under datainmatning:

import dlt

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

Fråga materialiserade vyer och strömmande tabeller som definierats i din pipeline

Använd schemat LIVE för att fråga andra materialiserade vyer och strömmande tabeller som definierats i din pipeline.

I följande exempel definieras fyra datauppsättningar:

  • En strömmande tabell med namnet orders som läser in JSON-data.
  • En materialiserad vy med namnet customers som läser in CSV-data.
  • En materialiserad vy med namnet customer_orders som kopplar poster från orders datauppsättningarna och customers genererar tidsstämpeln för ordningen till ett datum och väljer fälten customer_id, order_number, stateoch order_date .
  • En materialiserad vy med namnet daily_orders_by_state som aggregerar det dagliga antalet beställningar för varje tillstånd.
import dlt
from pyspark.sql.functions import col

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

@dlt.table()
def customers():
    return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")

@dlt.table()
def customer_orders():
  return (spark.read.table("LIVE.orders")
    .join(spark.read.table("LIVE.customers"), "customer_id")
      .select("customer_id",
        "order_number",
        "state",
        col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
      )
  )

@dlt.table()
def daily_orders_by_state():
    return (spark.read.table("LIVE.customer_orders")
      .groupBy("state", "order_date")
      .count().withColumnRenamed("count", "order_count")
    )

Skapa tabeller i en for loop

Du kan använda Python-loopar for för att skapa flera tabeller programmatiskt. Detta kan vara användbart när du har många datakällor eller måldatauppsättningar som bara varierar med några få parametrar, vilket resulterar i mindre total kod att underhålla och mindre kodredundans.

Loopen for utvärderar logik i seriell ordning, men när planeringen är klar för datauppsättningarna körs logiken parallellt i pipelinen.

Viktigt!

När du använder det här mönstret för att definiera datauppsättningar kontrollerar du att listan med värden som skickas till loopen for alltid är additiv. Om en datauppsättning som tidigare definierats i en pipeline utelämnas från en framtida pipelinekörning tas datauppsättningen bort automatiskt från målschemat.

I följande exempel skapas fem tabeller som filtrerar kundbeställningar efter region. Här används regionnamnet för att ange namnet på de materialiserade målvyerna och filtrera källdata. Tillfälliga vyer används för att definiera kopplingar från de källtabeller som används för att skapa de slutliga materialiserade vyerna.

import dlt
from pyspark.sql.functions import collect_list, col

@dlt.view()
def customer_orders():
  orders = spark.read.table("samples.tpch.orders")
  customer = spark.read.table("samples.tpch.customer")

  return (orders.join(customer, orders.o_custkey == customer.c_custkey)
    .select(
      col("c_custkey").alias("custkey"),
      col("c_name").alias("name"),
      col("c_nationkey").alias("nationkey"),
      col("c_phone").alias("phone"),
      col("o_orderkey").alias("orderkey"),
      col("o_orderstatus").alias("orderstatus"),
      col("o_totalprice").alias("totalprice"),
      col("o_orderdate").alias("orderdate"))
  )

@dlt.view()
def nation_region():
  nation = spark.read.table("samples.tpch.nation")
  region = spark.read.table("samples.tpch.region")

  return (nation.join(region, nation.n_regionkey == region.r_regionkey)
    .select(
      col("n_name").alias("nation"),
      col("r_name").alias("region"),
      col("n_nationkey").alias("nationkey")
    )
  )

# Extract region names from region table

region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]

# Iterate through region names to create new region-specific materialized views

for region in region_list:

  @dlt.table(name=f"{region.lower().replace(' ', '_')}_customer_orders")
  def regional_customer_orders(region_filter=region):

    customer_orders = spark.read.table("LIVE.customer_orders")
    nation_region = spark.read.table("LIVE.nation_region")

    return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
      .select(
        col("custkey"),
        col("name"),
        col("phone"),
        col("nation"),
        col("region"),
        col("orderkey"),
        col("orderstatus"),
        col("totalprice"),
        col("orderdate")
      ).filter(f"region = '{region_filter}'")
    )

Följande är ett exempel på dataflödesdiagrammet för den här pipelinen:

Ett dataflödesdiagram med två vyer som leder till fem regionala tabeller.

Felsökning: for loopen skapar många tabeller med samma värden

Den lata körningsmodellen som pipelines använder för att utvärdera Python-kod kräver att logiken direkt refererar till enskilda värden när funktionen som dekoreras av @dlt.table() anropas.

I följande exempel visas två korrekta metoder för att definiera tabeller med en for loop. I båda exemplen tables refereras varje tabellnamn från listan uttryckligen i funktionen som är dekorerad av @dlt.table().

import dlt

# Create a parent function to set local variables

def create_table(table_name):
  @dlt.table(name=table_name)
  def t():
    return spark.read.table(table_name)

tables = ["t1", "t2", "t3"]
for t_name in tables:
  create_table(t_name)

# Call `@dlt.table()` within a for loop and pass values as variables

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table(table_name=t_name):
    return spark.read.table(table_name)

Följande exempel refererar inte till värden korrekt. I det här exemplet skapas tabeller med distinkta namn, men alla tabeller läser in data från det sista värdet i loopen for :

import dlt

# Don't do this!

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table():
    return spark.read.table(t_name)