Sdílet prostřednictvím


Vývoj kódu kanálu pomocí Pythonu

Delta Live Tables představuje několik nových konstruktorů kódu Pythonu pro definování materializovaných zobrazení a streamovaných tabulek v kanálech. Podpora Pythonu pro vývoj kanálů vychází ze základních funkcí PySpark DataFrame a rozhraní API strukturovaného streamování.

Pro uživatele, kteří nezná Python a datové rámce, databricks doporučuje používat rozhraní SQL. Viz Vývoj kódu kanálu pomocí SQL.

Úplný přehled syntaxe Delta Live Tables Python naleznete v části Referenční informace o jazyce Delta Live Tables Python.

Základy Pythonu pro vývoj kanálů

Kód Pythonu, který vytváří datové sady Delta Live Tables, musí vracet datové rámce.

V modulu dlt se implementují všechna rozhraní PYTHON API Delta Live Tables. Kód pipeline Delta Live Tables implementovaný pomocí Pythonu musí explicitně importovat modul dlt v horní části poznámkových bloků a souborů Pythonu.

Výchozí nastavení pro čtení a zápisy je do katalogu a schématu určené během konfigurace kanálu. Viz Nastavení cílového katalogu a schématu.

Python kód specifický pro Delta Live Tables se liší od jiných typů kódu Pythonu jejich kritickým způsobem: Python kód pipeline přímo nevolá funkce, které provádějí příjem a transformaci dat za účelem vytvoření datových sad Delta Live Tables. Místo toho Delta Live Tables interpretuje funkce dekorátoru z modulu dlt ve všech souborech zdrojového kódu nakonfigurovaných v pipeline a sestavuje graf toku dat.

Důležité

Abyste se vyhnuli neočekávanému chování při spuštění kanálu, nezahrňte do funkcí, které definují datové sady, kód, který může mít vedlejší účinky. Další informace najdete v referenčních informacích k Pythonu.

Vytvoření materializovaného zobrazení nebo tabulky streamování pomocí Pythonu

Dekorátor @dlt.table instruuje Delta Live Tables, aby vykonalo materializované zobrazení nebo streamovanou tabulku na základě výsledků, které vrací funkce. Výsledky dávkového čtení vytvoří materializované zobrazení, zatímco výsledky streamovaného čtení vytvoří streamovací tabulku.

Ve výchozím nastavení jsou materializované názvy tabulek zobrazení a streamovaných tabulek odvozeny z názvů funkcí. Následující příklad kódu ukazuje základní syntaxi pro vytvoření materializované tabulky zobrazení a streamování:

Poznámka:

Obě funkce odkazují na stejnou tabulku v katalogu samples a používají stejnou funkci dekorátoru. Tyto příklady zdůrazňují, že jediný rozdíl v základní syntaxi materializovaných zobrazení a streamovaných tabulek je používání spark.read namísto spark.readStream.

Ne všechny zdroje dat podporují čtení streamování. Některé zdroje dat by se měly vždy zpracovávat sémantikou streamování.

import dlt

@dlt.table()
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table()
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

Volitelně můžete název tabulky zadat pomocí argumentu name v dekorátoru @dlt.table. Následující příklad ukazuje tento model pro materializované zobrazení a streamovací tabulku:

import dlt

@dlt.table(name = "trips_mv")
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table(name = "trips_st")
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

Načtení dat z úložiště objektů

Delta Live Tables podporuje načítání dat ze všech formátů podporovaných službou Azure Databricks. Viz Možnosti formátu dat.

Poznámka:

Tyto příklady používají data dostupná v rámci automaticky připojeného k vašemu /databricks-datasets pracovnímu prostoru. Databricks doporučuje používat cesty svazků nebo cloudové identifikátory URI k odkazování na data uložená v cloudovém úložišti objektů. Podívejte se na Co jsou svazky katalogu Unity?.

Databricks doporučuje používat Auto Loader a streamované tabulky při konfiguraci úloh inkrementálního načítání dat pro data uložená v cloudovém objektovém úložišti. Podívejte se, co je automatický zavaděč?

Následující příklad vytvoří tabulku pro streamování ze souborů JSON pomocí automatického načítání:

import dlt

@dlt.table()
def ingestion_st():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

Následující příklad používá sémantiku dávky ke čtení adresáře JSON a vytvoření materializovaného zobrazení:

import dlt

@dlt.table()
def batch_mv():
  return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")

Ověření dat s očekáváními

Pomocí očekávání můžete nastavit a vynutit omezení kvality dat. Podívejte se na Správa kvality dat s očekáváními datového toku.

Následující kód používá @dlt.expect_or_drop k definování očekávání s názvem, který zahodí valid_data záznamy, které mají hodnotu null během příjmu dat:

import dlt

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

Dotazování materializovaných zobrazení a streamovaných tabulek definovaných ve vašem kanálu

Následující příklad definuje čtyři datové sady:

  • Streamovaná tabulka s názvem orders, která načítá data JSON.
  • Materializované zobrazení s názvem customers , které načte data CSV.
  • Materializované zobrazení, customer_orders které spojuje záznamy z orders datových sad a customers datových sad, přetypuje časové razítko objednávky na datum a vybere customer_idpole , , order_numberstatea order_date pole.
  • Materializované zobrazení s názvem daily_orders_by_state agreguje denní počet objednávek pro každý stav.

Poznámka:

Při dotazování zobrazení nebo tabulek v kanálu můžete přímo zadat katalog a schéma nebo můžete použít výchozí hodnoty nakonfigurované v kanálu. V tomto příkladu se tabulky orders, customersa customer_orders zapisují a čtou z výchozího katalogu a schématu nakonfigurovaného pro váš kanál.

Zastaralý režim publikování používá schéma LIVE k dotazování jiných materializovaných zobrazení a streamovaných tabulek definovaných ve vašem pipeline. U nových datových potrubí je syntaxe schématu LIVE tiše ignorována. Viz LIVE schema (starší verze).

import dlt
from pyspark.sql.functions import col

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

@dlt.table()
def customers():
    return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")

@dlt.table()
def customer_orders():
  return (spark.read.table("orders")
    .join(spark.read.table("customers"), "customer_id")
      .select("customer_id",
        "order_number",
        "state",
        col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
      )
  )

@dlt.table()
def daily_orders_by_state():
    return (spark.read.table("customer_orders")
      .groupBy("state", "order_date")
      .count().withColumnRenamed("count", "order_count")
    )

Vytvořte tabulky ve smyčce for

Smyčky for Pythonu můžete použít k programovému vytvoření více tabulek. To může být užitečné v případě, že máte mnoho zdrojů dat nebo cílových datových sad, které se liší pouze několika parametry, což vede k menšímu celkovému kódu pro zachování a snížení redundance kódu.

Smyčka for vyhodnocuje logiku v sériovém pořadí, ale jakmile je plánování dokončeno pro datové sady, kanál spouští logiku paralelně.

Důležité

Při použití tohoto vzoru k definování datových sad se ujistěte, že seznam hodnot předaných do smyčky for je vždy sčítatelný. Pokud je datová sada dříve definovaná v kanálu vynechána z budoucího spuštění kanálu, tato datová sada se automaticky zahodí z cílového schématu.

Následující příklad vytvoří pět tabulek, které filtrují objednávky zákazníků podle oblastí. V tomto případě se název oblasti používá k nastavení názvu cílových materializovaných zobrazení a k filtrování zdrojových dat. Dočasná zobrazení slouží k definování spojení ze zdrojových tabulek používaných při vytváření konečných materializovaných zobrazení.

import dlt
from pyspark.sql.functions import collect_list, col

@dlt.view()
def customer_orders():
  orders = spark.read.table("samples.tpch.orders")
  customer = spark.read.table("samples.tpch.customer")

  return (orders.join(customer, orders.o_custkey == customer.c_custkey)
    .select(
      col("c_custkey").alias("custkey"),
      col("c_name").alias("name"),
      col("c_nationkey").alias("nationkey"),
      col("c_phone").alias("phone"),
      col("o_orderkey").alias("orderkey"),
      col("o_orderstatus").alias("orderstatus"),
      col("o_totalprice").alias("totalprice"),
      col("o_orderdate").alias("orderdate"))
  )

@dlt.view()
def nation_region():
  nation = spark.read.table("samples.tpch.nation")
  region = spark.read.table("samples.tpch.region")

  return (nation.join(region, nation.n_regionkey == region.r_regionkey)
    .select(
      col("n_name").alias("nation"),
      col("r_name").alias("region"),
      col("n_nationkey").alias("nationkey")
    )
  )

# Extract region names from region table

region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]

# Iterate through region names to create new region-specific materialized views

for region in region_list:

  @dlt.table(name=f"{region.lower().replace(' ', '_')}_customer_orders")
  def regional_customer_orders(region_filter=region):

    customer_orders = spark.read.table("customer_orders")
    nation_region = spark.read.table("nation_region")

    return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
      .select(
        col("custkey"),
        col("name"),
        col("phone"),
        col("nation"),
        col("region"),
        col("orderkey"),
        col("orderstatus"),
        col("totalprice"),
        col("orderdate")
      ).filter(f"region = '{region_filter}'")
    )

Následuje příklad grafu toku dat pro tento kanál:

graf toku dat se dvěma zobrazeními vedoucími do pěti regionálních tabulek.

Řešení potíží: smyčka for vytváří mnoho tabulek se stejnými hodnotami

Opožděný model spouštění, který kanály používají k vyhodnocení kódu Pythonu, vyžaduje, aby logika při vyvolání funkce dekorované pomocí @dlt.table() přímo odkazuje na jednotlivé hodnoty.

Následující příklad ukazuje dva správné přístupy k definování tabulek pomocí smyčky for. V obou příkladech je každý název tabulky ze seznamu tables explicitně odkazován v rámci funkce označené @dlt.table().

import dlt

# Create a parent function to set local variables

def create_table(table_name):
  @dlt.table(name=table_name)
  def t():
    return spark.read.table(table_name)

tables = ["t1", "t2", "t3"]
for t_name in tables:
  create_table(t_name)

# Call `@dlt.table()` within a for loop and pass values as variables

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table(table_name=t_name):
    return spark.read.table(table_name)

Následující příklad nesprávně odkazuje na hodnoty. Tento příklad vytvoří tabulky s jedinečnými názvy, ale všechny tabulky načítají data z poslední hodnoty ve smyčce for:

import dlt

# Don't do this!

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table():
    return spark.read.table(t_name)