Delen via


Pijplijncode ontwikkelen met Python

Delta Live Tables introduceert verschillende nieuwe Python-codeconstructies voor het definiëren van gerealiseerde weergaven en streamingtabellen in pijplijnen. Python-ondersteuning voor het ontwikkelen van pijplijnen is gebaseerd op de basisbeginselen van PySpark DataFrame en Structured Streaming-API's.

Voor gebruikers die niet bekend zijn met Python en DataFrames, raadt Databricks het gebruik van de SQL-interface aan. Zie Pijplijncode ontwikkelen met SQL.

Zie de Python-taalreferentie voor Delta Live Tables voor een volledige verwijzing naar de Python-syntaxis van Delta Live Tables.

Basisbeginselen van Python voor pijplijnontwikkeling

Python-code waarmee Gegevenssets van Delta Live Tables worden gemaakt, moeten DataFrames retourneren.

Alle Python-API's voor Delta Live Tables worden geïmplementeerd in de dlt module. Uw Delta Live Tables-pijplijncode die is geïmplementeerd met Python, moet de dlt module expliciet boven aan Python-notebooks en -bestanden importeren.

Delta Live Tables-specifieke Python-code verschilt van andere typen Python-code op één kritieke manier: Python-pijplijncode roept niet rechtstreeks de functies aan die gegevensopname en transformatie uitvoeren om Delta Live Tables-gegevenssets te maken. In plaats daarvan interpreteert Delta Live Tables de decoratorfuncties van de dlt module in alle broncodebestanden die zijn geconfigureerd in een pijplijn en bouwt een gegevensstroomgrafiek.

Belangrijk

Als u onverwacht gedrag wilt voorkomen wanneer uw pijplijn wordt uitgevoerd, moet u geen code opnemen die mogelijk neveneffecten heeft in uw functies die gegevenssets definiëren. Zie de Python-verwijzing voor meer informatie.

Een gerealiseerde weergave of streamingtabel maken met Python

De @dlt.table decorator vertelt Delta Live Tables om een gerealiseerde weergave of streamingtabel te maken op basis van de resultaten die door een functie worden geretourneerd. De resultaten van een batchleesbewerking maken een gerealiseerde weergave, terwijl de resultaten van een streaming-leesbewerking een streamingtabel maken.

Standaard worden gerealiseerde weergave- en streamingtabelnamen afgeleid van functienamen. In het volgende codevoorbeeld ziet u de basissyntaxis voor het maken van een gerealiseerde weergave en streamingtabel:

Notitie

Beide functies verwijzen naar dezelfde tabel in de samples catalogus en gebruiken dezelfde decoratorfunctie. In deze voorbeelden wordt benadrukt dat het enige verschil in de basissyntaxis voor gerealiseerde weergaven en streamingtabellen wordt gebruikt spark.read versus spark.readStream.

Niet alle gegevensbronnen ondersteunen streaming-leesbewerkingen. Sommige gegevensbronnen moeten altijd worden verwerkt met semantiek voor streaming.

import dlt

@dlt.table()
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table()
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

U kunt desgewenst de tabelnaam opgeven met behulp van het name argument in de @dlt.table decorator. In het volgende voorbeeld ziet u dit patroon voor een gerealiseerde weergave en streamingtabel:

import dlt

@dlt.table(name = "trips_mv")
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table(name = "trips_st")
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

Gegevens laden uit objectopslag

Delta Live Tables ondersteunt het laden van gegevens uit alle indelingen die worden ondersteund door Azure Databricks. Zie opties voor gegevensindeling.

Notitie

In deze voorbeelden worden gegevens gebruikt die beschikbaar zijn onder de /databricks-datasets automatisch gekoppelde werkruimte. Databricks raadt aan volumepaden of cloud-URI's te gebruiken om te verwijzen naar gegevens die zijn opgeslagen in cloudobjectopslag. Zie Wat zijn Unity Catalog-volumes?

Databricks raadt aan om automatisch laden en streamingtabellen te gebruiken bij het configureren van incrementele opnameworkloads voor gegevens die zijn opgeslagen in de opslag van cloudobjecten. Zie Wat is automatisch laadprogramma?

In het volgende voorbeeld wordt een streamingtabel gemaakt op basis van JSON-bestanden met behulp van automatisch laden:

import dlt

@dlt.table()
def ingestion_st():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

In het volgende voorbeeld wordt batch-semantiek gebruikt om een JSON-map te lezen en een gerealiseerde weergave te maken:

import dlt

@dlt.table()
def batch_mv():
  return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")

Gegevens valideren met verwachtingen

U kunt verwachtingen gebruiken om beperkingen voor gegevenskwaliteit in te stellen en af te dwingen. Zie Gegevenskwaliteit beheren met Delta Live Tables.

De volgende code gebruikt @dlt.expect_or_drop om een verwachting te definiëren die records valid_data verwijdert die null zijn tijdens gegevensopname:

import dlt

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

Gerealiseerde weergaven en streamingtabellen doorzoeken die zijn gedefinieerd in uw pijplijn

Gebruik het LIVE schema om een query uit te voeren op andere gerealiseerde weergaven en streamingtabellen die zijn gedefinieerd in uw pijplijn.

In het volgende voorbeeld worden vier gegevenssets gedefinieerd:

  • Een streamingtabel met de naam orders waarmee JSON-gegevens worden geladen.
  • Een gerealiseerde weergave met de naam customers waarmee CSV-gegevens worden geladen.
  • Een gerealiseerde weergave met de naam customer_orders die records uit de orders en customers gegevenssets samenvoegt, de tijdstempel van de order naar een datum cast en de customer_idvelden , order_numberen stateorder_date selecteert.
  • Een gerealiseerde weergave met de naam daily_orders_by_state waarmee het dagelijkse aantal orders voor elke status wordt samengevoegd.
import dlt
from pyspark.sql.functions import col

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

@dlt.table()
def customers():
    return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")

@dlt.table()
def customer_orders():
  return (spark.read.table("LIVE.orders")
    .join(spark.read.table("LIVE.customers"), "customer_id")
      .select("customer_id",
        "order_number",
        "state",
        col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
      )
  )

@dlt.table()
def daily_orders_by_state():
    return (spark.read.table("LIVE.customer_orders")
      .groupBy("state", "order_date")
      .count().withColumnRenamed("count", "order_count")
    )

Tabellen in een for lus maken

U kunt Python-lussen for gebruiken om programmatisch meerdere tabellen te maken. Dit kan handig zijn wanneer u veel gegevensbronnen of doelgegevenssets hebt die variëren met slechts een paar parameters, wat resulteert in minder totale code om code te onderhouden en minder coderedundantie.

De for lus evalueert logica in seriële volgorde, maar zodra de planning voor de gegevenssets is voltooid, wordt de logica van de pijplijn parallel uitgevoerd.

Belangrijk

Wanneer u dit patroon gebruikt om gegevenssets te definiëren, moet u ervoor zorgen dat de lijst met waarden die aan de for lus worden doorgegeven, altijd additief is. Als een gegevensset die eerder in een pijplijn is gedefinieerd, wordt weggelaten uit een toekomstige pijplijnuitvoering, wordt die gegevensset automatisch verwijderd uit het doelschema.

In het volgende voorbeeld worden vijf tabellen gemaakt waarmee klantorders per regio worden gefilterd. Hier wordt de regionaam gebruikt om de naam van de gerealiseerde doelweergaven in te stellen en de brongegevens te filteren. Tijdelijke weergaven worden gebruikt om joins te definiëren uit de brontabellen die worden gebruikt bij het samenstellen van de uiteindelijke gerealiseerde weergaven.

import dlt
from pyspark.sql.functions import collect_list, col

@dlt.view()
def customer_orders():
  orders = spark.read.table("samples.tpch.orders")
  customer = spark.read.table("samples.tpch.customer")

  return (orders.join(customer, orders.o_custkey == customer.c_custkey)
    .select(
      col("c_custkey").alias("custkey"),
      col("c_name").alias("name"),
      col("c_nationkey").alias("nationkey"),
      col("c_phone").alias("phone"),
      col("o_orderkey").alias("orderkey"),
      col("o_orderstatus").alias("orderstatus"),
      col("o_totalprice").alias("totalprice"),
      col("o_orderdate").alias("orderdate"))
  )

@dlt.view()
def nation_region():
  nation = spark.read.table("samples.tpch.nation")
  region = spark.read.table("samples.tpch.region")

  return (nation.join(region, nation.n_regionkey == region.r_regionkey)
    .select(
      col("n_name").alias("nation"),
      col("r_name").alias("region"),
      col("n_nationkey").alias("nationkey")
    )
  )

# Extract region names from region table

region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]

# Iterate through region names to create new region-specific materialized views

for region in region_list:

  @dlt.table(name=f"{region.lower().replace(' ', '_')}_customer_orders")
  def regional_customer_orders(region_filter=region):

    customer_orders = spark.read.table("LIVE.customer_orders")
    nation_region = spark.read.table("LIVE.nation_region")

    return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
      .select(
        col("custkey"),
        col("name"),
        col("phone"),
        col("nation"),
        col("region"),
        col("orderkey"),
        col("orderstatus"),
        col("totalprice"),
        col("orderdate")
      ).filter(f"region = '{region_filter}'")
    )

Hier volgt een voorbeeld van de gegevensstroomgrafiek voor deze pijplijn:

Een gegevensstroomgrafiek van twee weergaven die leiden tot vijf regionale tabellen.

Problemen oplossen: for lus maakt veel tabellen met dezelfde waarden

Het luie uitvoeringsmodel dat pijplijnen gebruiken om Python-code te evalueren, vereist dat uw logica rechtstreeks verwijst naar afzonderlijke waarden wanneer de functie @dlt.table() wordt aangeroepen.

In het volgende voorbeeld ziet u twee juiste benaderingen voor het definiëren van tabellen met een for lus. In beide voorbeelden wordt expliciet naar elke tabelnaam uit de tables lijst verwezen binnen de functie die is ingericht door @dlt.table().

import dlt

# Create a parent function to set local variables

def create_table(table_name):
  @dlt.table(name=table_name)
  def t():
    return spark.read.table(table_name)

tables = ["t1", "t2", "t3"]
for t_name in tables:
  create_table(t_name)

# Call `@dlt.table()` within a for loop and pass values as variables

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table(table_name=t_name):
    return spark.read.table(table_name)

In het volgende voorbeeld wordt niet correct verwezen naar waarden. In dit voorbeeld worden tabellen met verschillende namen gemaakt, maar alle tabellen laden gegevens van de laatste waarde in de lus for :

import dlt

# Don't do this!

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table():
    return spark.read.table(t_name)