Condividi tramite


Sviluppare codice della pipeline con Python

Le tabelle live delta introducono diversi nuovi costrutti di codice Python per la definizione di viste materializzate e tabelle di streaming nelle pipeline. Il supporto python per lo sviluppo di pipeline si basa sulle nozioni di base del dataframe PySpark e delle API di streaming strutturato.

Per gli utenti che non hanno familiarità con Python eDataFrame, Databricks consiglia di usare l'interfaccia SQL. Vedere Sviluppare codice della pipeline con SQL.

Per informazioni di riferimento complete sulla sintassi Python delle tabelle live Delta, vedere Le informazioni di riferimento sul linguaggio Python per tabelle live Delta.

Nozioni di base su Python per lo sviluppo di pipeline

Il codice Python che crea set di dati Delta Live Tables deve restituire DataFrame.

Tutte le API Python per Delta Live Tables vengono implementate nel modulo dlt. Il codice della pipeline Delta Live Tables implementato con Python deve importare in modo esplicito il modulo dlt nella parte superiore dei Notebook e dei file Python.

Il codice Python specifico delle tabelle live Delta differisce da altri tipi di codice Python in un modo critico: il codice della pipeline Python non chiama direttamente le funzioni che eseguono l'inserimento e la trasformazione dei dati per creare set di dati delta live tables. Le tabelle live Delta interpretano invece le funzioni decorator del dlt modulo in tutti i file di codice sorgente configurati in una pipeline e compila un grafico del flusso di dati.

Importante

Per evitare comportamenti imprevisti durante l'esecuzione della pipeline, non includere codice che potrebbe avere effetti collaterali nelle funzioni che definiscono i set di dati. Vedere le informazioni di riferimento per Python.

Creare una vista materializzata o una tabella di streaming con Python

L'elemento @dlt.table Decorator indica alle tabelle live delta di creare una vista materializzata o una tabella di streaming in base ai risultati restituiti da una funzione. I risultati di una lettura batch creano una vista materializzata, mentre i risultati di un flusso letto creano una tabella di streaming.

Per impostazione predefinita, i nomi di vista materializzata e tabella di streaming vengono dedotti dai nomi delle funzioni. L'esempio di codice seguente illustra la sintassi di base per la creazione di una vista materializzata e di una tabella di streaming:

Nota

Entrambe le funzioni fanno riferimento alla stessa tabella nel samples catalogo e usano la stessa funzione decorator. Questi esempi evidenziano che l'unica differenza nella sintassi di base per le viste materializzate e le tabelle di streaming consiste nell'usare spark.read rispetto spark.readStreama .

Non tutte le origini dati supportano le letture in streaming. Alcune origini dati devono essere sempre elaborate con la semantica di streaming.

import dlt

@dlt.table()
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table()
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

Facoltativamente, è possibile specificare il nome della tabella usando l'argomento nell'elemento name @dlt.table Decorator. L'esempio seguente illustra questo modello per una vista materializzata e una tabella di streaming:

import dlt

@dlt.table(name = "trips_mv")
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table(name = "trips_st")
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

Caricare dati dall'archivio oggetti

Delta Live Tables supporta il caricamento di dati da tutti i formati supportati da Azure Databricks. Vedere Opzioni di formato dati.

Nota

Questi esempi usano i dati disponibili nell'area di lavoro montata automaticamente nell'area /databricks-datasets di lavoro. Databricks consiglia di usare i percorsi del volume o gli URI cloud per fare riferimento ai dati archiviati nell'archiviazione di oggetti cloud. Vedere Che cosa sono i volumi del catalogo Unity?.

Databricks consiglia di usare il caricatore automatico e le tabelle di streaming quando si configurano carichi di lavoro di inserimento incrementali sui dati archiviati nell'archiviazione di oggetti cloud. Vedere Che cos'è l’Autoloader?.

L'esempio seguente crea una tabella di streaming da file JSON usando il caricatore automatico:

import dlt

@dlt.table()
def ingestion_st():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

L'esempio seguente usa la semantica batch per leggere una directory JSON e creare una vista materializzata:

import dlt

@dlt.table()
def batch_mv():
  return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")

Convalidare i dati con le aspettative

È possibile usare le aspettative per impostare e applicare vincoli di qualità dei dati. Vedere Gestire la qualità dei dati con Delta Live Tables.

Il codice seguente usa @dlt.expect_or_drop per definire un'aspettativa denominata valid_data che elimina i record null durante l'inserimento dati:

import dlt

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

Eseguire query su viste materializzate e tabelle di streaming definite nella pipeline

Usare lo LIVE schema per eseguire query su altre viste materializzate e tabelle di streaming definite nella pipeline.

L'esempio seguente definisce quattro set di dati:

  • Tabella di streaming denominata orders che carica i dati JSON.
  • Vista materializzata denominata customers che carica i dati CSV.
  • Una vista materializzata denominata customer_orders che unisce i record dai orders set di dati e customers , esegue il cast del timestamp dell'ordine a una data e seleziona i customer_idcampi , order_number, statee order_date .
  • Vista materializzata denominata daily_orders_by_state che aggrega il conteggio giornaliero degli ordini per ogni stato.
import dlt
from pyspark.sql.functions import col

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

@dlt.table()
def customers():
    return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")

@dlt.table()
def customer_orders():
  return (spark.read.table("LIVE.orders")
    .join(spark.read.table("LIVE.customers"), "customer_id")
      .select("customer_id",
        "order_number",
        "state",
        col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
      )
  )

@dlt.table()
def daily_orders_by_state():
    return (spark.read.table("LIVE.customer_orders")
      .groupBy("state", "order_date")
      .count().withColumnRenamed("count", "order_count")
    )

Creare tabelle in un for ciclo

È possibile usare cicli Python for per creare più tabelle a livello di codice. Ciò può essere utile quando si hanno molte origini dati o set di dati di destinazione che variano in base solo a pochi parametri, con conseguente minore quantità di codice totale per mantenere e diminuire la ridondanza del codice.

Il for ciclo valuta la logica in ordine seriale, ma una volta completata la pianificazione per i set di dati, la pipeline esegue la logica in parallelo.

Importante

Quando si usa questo modello per definire i set di dati, assicurarsi che l'elenco di valori passati al for ciclo sia sempre aggiuntivo. Se un set di dati definito in precedenza in una pipeline viene omesso da un'esecuzione futura della pipeline, tale set di dati viene eliminato automaticamente dallo schema di destinazione.

Nell'esempio seguente vengono create cinque tabelle che filtrano gli ordini dei clienti in base all'area. In questo caso, il nome dell'area viene usato per impostare il nome delle viste materializzate di destinazione e per filtrare i dati di origine. Le viste temporanee vengono usate per definire join dalle tabelle di origine utilizzate per costruire le viste materializzate finali.

import dlt
from pyspark.sql.functions import collect_list, col

@dlt.view()
def customer_orders():
  orders = spark.read.table("samples.tpch.orders")
  customer = spark.read.table("samples.tpch.customer")

  return (orders.join(customer, orders.o_custkey == customer.c_custkey)
    .select(
      col("c_custkey").alias("custkey"),
      col("c_name").alias("name"),
      col("c_nationkey").alias("nationkey"),
      col("c_phone").alias("phone"),
      col("o_orderkey").alias("orderkey"),
      col("o_orderstatus").alias("orderstatus"),
      col("o_totalprice").alias("totalprice"),
      col("o_orderdate").alias("orderdate"))
  )

@dlt.view()
def nation_region():
  nation = spark.read.table("samples.tpch.nation")
  region = spark.read.table("samples.tpch.region")

  return (nation.join(region, nation.n_regionkey == region.r_regionkey)
    .select(
      col("n_name").alias("nation"),
      col("r_name").alias("region"),
      col("n_nationkey").alias("nationkey")
    )
  )

# Extract region names from region table

region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]

# Iterate through region names to create new region-specific materialized views

for region in region_list:

  @dlt.table(name=f"{region.lower().replace(' ', '_')}_customer_orders")
  def regional_customer_orders(region_filter=region):

    customer_orders = spark.read.table("LIVE.customer_orders")
    nation_region = spark.read.table("LIVE.nation_region")

    return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
      .select(
        col("custkey"),
        col("name"),
        col("phone"),
        col("nation"),
        col("region"),
        col("orderkey"),
        col("orderstatus"),
        col("totalprice"),
        col("orderdate")
      ).filter(f"region = '{region_filter}'")
    )

Di seguito è riportato un esempio del grafico del flusso di dati per questa pipeline:

Grafico del flusso di dati di due viste che portano a cinque tabelle a livello di area.

Risoluzione dei problemi: for il ciclo crea molte tabelle con gli stessi valori

Il modello di esecuzione differita usato dalle pipeline per valutare il codice Python richiede che la logica faccia riferimento direttamente ai singoli valori quando viene richiamata la funzione decorata da @dlt.table() .

Nell'esempio seguente vengono illustrati due approcci corretti per definire le tabelle con un for ciclo . In entrambi gli esempi, a ogni nome di tabella dell'elenco tables viene fatto riferimento in modo esplicito all'interno della funzione decorata da @dlt.table().

import dlt

# Create a parent function to set local variables

def create_table(table_name):
  @dlt.table(name=table_name)
  def t():
    return spark.read.table(table_name)

tables = ["t1", "t2", "t3"]
for t_name in tables:
  create_table(t_name)

# Call `@dlt.table()` within a for loop and pass values as variables

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table(table_name=t_name):
    return spark.read.table(table_name)

L'esempio seguente non fa riferimento correttamente ai valori. In questo esempio vengono create tabelle con nomi distinti, ma tutte le tabelle caricano i dati dall'ultimo valore del for ciclo:

import dlt

# Don't do this!

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table():
    return spark.read.table(t_name)