Compartir a través de


Desarrollo de código de canalización con Python

Delta Live Tables presenta varias nuevas construcciones de código de Python para definir vistas materializadas y tablas de streaming en canalizaciones. La compatibilidad de Python para desarrollar canalizaciones se basa en los conceptos básicos de las API de DataFrame y Structured Streaming de PySpark.

Para los usuarios que no están familiarizados con Python y DataFrames, Databricks recomienda usar la interfaz de SQL. Consulte Desarrollo de código de canalización con SQL.

Para obtener una referencia completa de la sintaxis de Python de Delta Live Tables, consulte Referencia del lenguaje Python de Delta Live Tables.

Conceptos básicos de Python para el desarrollo de canalizaciones

El código de Python que crea conjuntos de datos de Delta Live Tables debe devolver DataFrames.

Todas las API de Python de Delta Live Tables se implementan en el módulo dlt. El código de canalización de Delta Live Tables implementado con Python debe importar explícitamente el módulo dlt en la parte superior de los cuadernos y archivos de Python.

El código de Python específico de Delta Live Tables difiere de otros tipos de código de Python de una manera crítica: el código de canalización de Python no llama directamente a las funciones que realizan la ingesta y transformación de datos para crear conjuntos de datos de Delta Live Tables. En su lugar, Delta Live Tables interpreta las funciones de decorador del dlt módulo en todos los archivos de código fuente configurados en una canalización y crea un gráfico de flujo de datos.

Importante

Para evitar un comportamiento inesperado cuando se ejecuta la canalización, no incluya código que pueda tener efectos secundarios en las funciones que definen conjuntos de datos. Para obtener más información, vea la referencia de Python.

Creación de una vista materializada o una tabla de streaming con Python

El @dlt.table decorador indica a Delta Live Tables que cree una vista materializada o una tabla de streaming en función de los resultados devueltos por una función. Los resultados de una lectura por lotes crean una vista materializada, mientras que los resultados de una lectura de streaming crean una tabla de streaming.

De forma predeterminada, los nombres de tabla de streaming y vista materializada se deducen de los nombres de función. En el ejemplo de código siguiente se muestra la sintaxis básica para crear una vista materializada y una tabla de streaming:

Nota:

Ambas funciones hacen referencia a la misma tabla del samples catálogo y usan la misma función de decorador. Estos ejemplos resaltan que la única diferencia en la sintaxis básica para las vistas materializadas y las tablas de streaming se usa spark.read frente a spark.readStream.

No todos los orígenes de datos admiten lecturas de streaming. Algunos orígenes de datos siempre deben procesarse con la semántica de streaming.

import dlt

@dlt.table()
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table()
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

Opcionalmente, puede especificar el nombre de la tabla mediante el name argumento en el @dlt.table decorador. En el ejemplo siguiente se muestra este patrón para una vista materializada y una tabla de streaming:

import dlt

@dlt.table(name = "trips_mv")
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table(name = "trips_st")
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

Carga de datos desde el almacenamiento de objetos

Delta Live Tables admite la carga de datos de todos los formatos admitidos por Azure Databricks. Consulte Opciones de formato de datos.

Nota:

En estos ejemplos se usan los datos disponibles en el montaje automático en el /databricks-datasets área de trabajo. Databricks recomienda usar rutas de acceso de volumen o URI en la nube para hacer referencia a los datos almacenados en el almacenamiento de objetos en la nube. Consulte ¿Qué son los volúmenes de Unity Catalog?.

Databricks recomienda usar el cargador automático y las tablas de streaming al configurar cargas de trabajo de ingesta incrementales en los datos almacenados en el almacenamiento de objetos en la nube. Consulte ¿Qué es Auto Loader?.

En el ejemplo siguiente se crea una tabla de streaming a partir de archivos JSON mediante el cargador automático:

import dlt

@dlt.table()
def ingestion_st():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

En el ejemplo siguiente se usa la semántica por lotes para leer un directorio JSON y crear una vista materializada:

import dlt

@dlt.table()
def batch_mv():
  return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")

Validación de datos con expectativas

Puede usar expectativas para establecer y aplicar restricciones de calidad de datos. Consulte Administración de la calidad de los datos con Delta Live Tables.

El código siguiente usa @dlt.expect_or_drop para definir una expectativa denominada valid_data que quita registros que son NULL durante la ingesta de datos:

import dlt

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

Consulta de vistas materializadas y tablas de streaming definidas en la canalización

Use el LIVE esquema para consultar otras vistas materializadas y tablas de streaming definidas en la canalización.

En el ejemplo siguiente se definen cuatro conjuntos de datos:

  • Una tabla de streaming denominada orders que carga datos JSON.
  • Una vista materializada denominada customers que carga datos CSV.
  • Una vista materializada denominada customer_orders que combina registros de los orders conjuntos de datos y customers convierte la marca de tiempo de pedido en una fecha y selecciona los customer_idcampos , order_number, statey order_date .
  • Vista materializada denominada daily_orders_by_state que agrega el recuento diario de pedidos para cada estado.
import dlt
from pyspark.sql.functions import col

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

@dlt.table()
def customers():
    return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")

@dlt.table()
def customer_orders():
  return (spark.read.table("LIVE.orders")
    .join(spark.read.table("LIVE.customers"), "customer_id")
      .select("customer_id",
        "order_number",
        "state",
        col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
      )
  )

@dlt.table()
def daily_orders_by_state():
    return (spark.read.table("LIVE.customer_orders")
      .groupBy("state", "order_date")
      .count().withColumnRenamed("count", "order_count")
    )

Creación de tablas en un for bucle

Puede usar bucles de Python for para crear varias tablas mediante programación. Esto puede ser útil cuando tiene muchos orígenes de datos o conjuntos de datos de destino que varían en solo unos pocos parámetros, lo que da como resultado un código menos total para mantener y menos redundancia de código.

El for bucle evalúa la lógica en orden serie, pero una vez completado el planeamiento de los conjuntos de datos, la canalización ejecuta lógica en paralelo.

Importante

Al usar este patrón para definir conjuntos de datos, asegúrese de que la lista de valores pasados al for bucle siempre es suma. Si se omite un conjunto de datos definido previamente en una canalización de una ejecución de canalización futura, ese conjunto de datos se quita automáticamente del esquema de destino.

En el ejemplo siguiente se crean cinco tablas que filtran los pedidos de clientes por región. Aquí, el nombre de la región se usa para establecer el nombre de las vistas materializadas de destino y para filtrar los datos de origen. Las vistas temporales se usan para definir combinaciones de las tablas de origen usadas para construir las vistas materializadas finales.

import dlt
from pyspark.sql.functions import collect_list, col

@dlt.view()
def customer_orders():
  orders = spark.read.table("samples.tpch.orders")
  customer = spark.read.table("samples.tpch.customer")

  return (orders.join(customer, orders.o_custkey == customer.c_custkey)
    .select(
      col("c_custkey").alias("custkey"),
      col("c_name").alias("name"),
      col("c_nationkey").alias("nationkey"),
      col("c_phone").alias("phone"),
      col("o_orderkey").alias("orderkey"),
      col("o_orderstatus").alias("orderstatus"),
      col("o_totalprice").alias("totalprice"),
      col("o_orderdate").alias("orderdate"))
  )

@dlt.view()
def nation_region():
  nation = spark.read.table("samples.tpch.nation")
  region = spark.read.table("samples.tpch.region")

  return (nation.join(region, nation.n_regionkey == region.r_regionkey)
    .select(
      col("n_name").alias("nation"),
      col("r_name").alias("region"),
      col("n_nationkey").alias("nationkey")
    )
  )

# Extract region names from region table

region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]

# Iterate through region names to create new region-specific materialized views

for region in region_list:

  @dlt.table(name=f"{region.lower().replace(' ', '_')}_customer_orders")
  def regional_customer_orders(region_filter=region):

    customer_orders = spark.read.table("LIVE.customer_orders")
    nation_region = spark.read.table("LIVE.nation_region")

    return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
      .select(
        col("custkey"),
        col("name"),
        col("phone"),
        col("nation"),
        col("region"),
        col("orderkey"),
        col("orderstatus"),
        col("totalprice"),
        col("orderdate")
      ).filter(f"region = '{region_filter}'")
    )

A continuación se muestra un ejemplo del gráfico de flujo de datos para esta canalización:

Gráfico de flujo de datos de dos vistas que conducen a cinco tablas regionales.

Solución de problemas: for el bucle crea muchas tablas con los mismos valores

El modelo de ejecución diferida que las canalizaciones usan para evaluar el código de Python requiere que la lógica haga referencia directamente a valores individuales cuando se invoque la función decorada por @dlt.table() .

En el ejemplo siguiente se muestran dos enfoques correctos para definir tablas con un for bucle . En ambos ejemplos, se hace referencia explícita a cada nombre de tabla de la tables lista dentro de la función decorada por @dlt.table().

import dlt

# Create a parent function to set local variables

def create_table(table_name):
  @dlt.table(name=table_name)
  def t():
    return spark.read.table(table_name)

tables = ["t1", "t2", "t3"]
for t_name in tables:
  create_table(t_name)

# Call `@dlt.table()` within a for loop and pass values as variables

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table(table_name=t_name):
    return spark.read.table(table_name)

En el ejemplo siguiente no se hace referencia a los valores correctamente. En este ejemplo se crean tablas con nombres distintos, pero todas las tablas cargan datos del último valor del for bucle:

import dlt

# Don't do this!

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table():
    return spark.read.table(t_name)