Desarrollo de código de canalización con Python
Delta Live Tables presenta varias nuevas construcciones de código de Python para definir vistas materializadas y tablas de streaming en canalizaciones. La compatibilidad de Python para desarrollar canalizaciones se basa en los conceptos básicos de las API de DataFrame y Structured Streaming de PySpark.
Para los usuarios que no están familiarizados con Python y DataFrames, Databricks recomienda usar la interfaz de SQL. Consulte Desarrollo de código de canalización con SQL.
Para obtener una referencia completa de la sintaxis de Python de Delta Live Tables, consulte Referencia del lenguaje Python de Delta Live Tables.
Conceptos básicos de Python para el desarrollo de canalizaciones
El código de Python que crea conjuntos de datos de Delta Live Tables debe devolver DataFrames.
Todas las API de Python de Delta Live Tables se implementan en el módulo dlt
. El código de canalización de Delta Live Tables implementado con Python debe importar explícitamente el módulo dlt
en la parte superior de los cuadernos y archivos de Python.
El código de Python específico de Delta Live Tables difiere de otros tipos de código de Python de una manera crítica: el código de canalización de Python no llama directamente a las funciones que realizan la ingesta y transformación de datos para crear conjuntos de datos de Delta Live Tables. En su lugar, Delta Live Tables interpreta las funciones de decorador del dlt
módulo en todos los archivos de código fuente configurados en una canalización y crea un gráfico de flujo de datos.
Importante
Para evitar un comportamiento inesperado cuando se ejecuta la canalización, no incluya código que pueda tener efectos secundarios en las funciones que definen conjuntos de datos. Para obtener más información, vea la referencia de Python.
Creación de una vista materializada o una tabla de streaming con Python
El @dlt.table
decorador indica a Delta Live Tables que cree una vista materializada o una tabla de streaming en función de los resultados devueltos por una función. Los resultados de una lectura por lotes crean una vista materializada, mientras que los resultados de una lectura de streaming crean una tabla de streaming.
De forma predeterminada, los nombres de tabla de streaming y vista materializada se deducen de los nombres de función. En el ejemplo de código siguiente se muestra la sintaxis básica para crear una vista materializada y una tabla de streaming:
Nota:
Ambas funciones hacen referencia a la misma tabla del samples
catálogo y usan la misma función de decorador. Estos ejemplos resaltan que la única diferencia en la sintaxis básica para las vistas materializadas y las tablas de streaming se usa spark.read
frente a spark.readStream
.
No todos los orígenes de datos admiten lecturas de streaming. Algunos orígenes de datos siempre deben procesarse con la semántica de streaming.
import dlt
@dlt.table()
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")
@dlt.table()
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")
Opcionalmente, puede especificar el nombre de la tabla mediante el name
argumento en el @dlt.table
decorador. En el ejemplo siguiente se muestra este patrón para una vista materializada y una tabla de streaming:
import dlt
@dlt.table(name = "trips_mv")
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")
@dlt.table(name = "trips_st")
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")
Carga de datos desde el almacenamiento de objetos
Delta Live Tables admite la carga de datos de todos los formatos admitidos por Azure Databricks. Consulte Opciones de formato de datos.
Nota:
En estos ejemplos se usan los datos disponibles en el montaje automático en el /databricks-datasets
área de trabajo. Databricks recomienda usar rutas de acceso de volumen o URI en la nube para hacer referencia a los datos almacenados en el almacenamiento de objetos en la nube. Consulte ¿Qué son los volúmenes de Unity Catalog?.
Databricks recomienda usar el cargador automático y las tablas de streaming al configurar cargas de trabajo de ingesta incrementales en los datos almacenados en el almacenamiento de objetos en la nube. Consulte ¿Qué es Auto Loader?.
En el ejemplo siguiente se crea una tabla de streaming a partir de archivos JSON mediante el cargador automático:
import dlt
@dlt.table()
def ingestion_st():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
En el ejemplo siguiente se usa la semántica por lotes para leer un directorio JSON y crear una vista materializada:
import dlt
@dlt.table()
def batch_mv():
return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")
Validación de datos con expectativas
Puede usar expectativas para establecer y aplicar restricciones de calidad de datos. Consulte Administración de la calidad de los datos con Delta Live Tables.
El código siguiente usa @dlt.expect_or_drop
para definir una expectativa denominada valid_data
que quita registros que son NULL durante la ingesta de datos:
import dlt
@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
Consulta de vistas materializadas y tablas de streaming definidas en la canalización
Use el LIVE
esquema para consultar otras vistas materializadas y tablas de streaming definidas en la canalización.
En el ejemplo siguiente se definen cuatro conjuntos de datos:
- Una tabla de streaming denominada
orders
que carga datos JSON. - Una vista materializada denominada
customers
que carga datos CSV. - Una vista materializada denominada
customer_orders
que combina registros de losorders
conjuntos de datos ycustomers
convierte la marca de tiempo de pedido en una fecha y selecciona loscustomer_id
campos ,order_number
,state
yorder_date
. - Vista materializada denominada
daily_orders_by_state
que agrega el recuento diario de pedidos para cada estado.
import dlt
from pyspark.sql.functions import col
@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
@dlt.table()
def customers():
return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")
@dlt.table()
def customer_orders():
return (spark.read.table("LIVE.orders")
.join(spark.read.table("LIVE.customers"), "customer_id")
.select("customer_id",
"order_number",
"state",
col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
)
)
@dlt.table()
def daily_orders_by_state():
return (spark.read.table("LIVE.customer_orders")
.groupBy("state", "order_date")
.count().withColumnRenamed("count", "order_count")
)
Creación de tablas en un for
bucle
Puede usar bucles de Python for
para crear varias tablas mediante programación. Esto puede ser útil cuando tiene muchos orígenes de datos o conjuntos de datos de destino que varían en solo unos pocos parámetros, lo que da como resultado un código menos total para mantener y menos redundancia de código.
El for
bucle evalúa la lógica en orden serie, pero una vez completado el planeamiento de los conjuntos de datos, la canalización ejecuta lógica en paralelo.
Importante
Al usar este patrón para definir conjuntos de datos, asegúrese de que la lista de valores pasados al for
bucle siempre es suma. Si se omite un conjunto de datos definido previamente en una canalización de una ejecución de canalización futura, ese conjunto de datos se quita automáticamente del esquema de destino.
En el ejemplo siguiente se crean cinco tablas que filtran los pedidos de clientes por región. Aquí, el nombre de la región se usa para establecer el nombre de las vistas materializadas de destino y para filtrar los datos de origen. Las vistas temporales se usan para definir combinaciones de las tablas de origen usadas para construir las vistas materializadas finales.
import dlt
from pyspark.sql.functions import collect_list, col
@dlt.view()
def customer_orders():
orders = spark.read.table("samples.tpch.orders")
customer = spark.read.table("samples.tpch.customer")
return (orders.join(customer, orders.o_custkey == customer.c_custkey)
.select(
col("c_custkey").alias("custkey"),
col("c_name").alias("name"),
col("c_nationkey").alias("nationkey"),
col("c_phone").alias("phone"),
col("o_orderkey").alias("orderkey"),
col("o_orderstatus").alias("orderstatus"),
col("o_totalprice").alias("totalprice"),
col("o_orderdate").alias("orderdate"))
)
@dlt.view()
def nation_region():
nation = spark.read.table("samples.tpch.nation")
region = spark.read.table("samples.tpch.region")
return (nation.join(region, nation.n_regionkey == region.r_regionkey)
.select(
col("n_name").alias("nation"),
col("r_name").alias("region"),
col("n_nationkey").alias("nationkey")
)
)
# Extract region names from region table
region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]
# Iterate through region names to create new region-specific materialized views
for region in region_list:
@dlt.table(name=f"{region.lower().replace(' ', '_')}_customer_orders")
def regional_customer_orders(region_filter=region):
customer_orders = spark.read.table("LIVE.customer_orders")
nation_region = spark.read.table("LIVE.nation_region")
return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
.select(
col("custkey"),
col("name"),
col("phone"),
col("nation"),
col("region"),
col("orderkey"),
col("orderstatus"),
col("totalprice"),
col("orderdate")
).filter(f"region = '{region_filter}'")
)
A continuación se muestra un ejemplo del gráfico de flujo de datos para esta canalización:
Solución de problemas: for
el bucle crea muchas tablas con los mismos valores
El modelo de ejecución diferida que las canalizaciones usan para evaluar el código de Python requiere que la lógica haga referencia directamente a valores individuales cuando se invoque la función decorada por @dlt.table()
.
En el ejemplo siguiente se muestran dos enfoques correctos para definir tablas con un for
bucle . En ambos ejemplos, se hace referencia explícita a cada nombre de tabla de la tables
lista dentro de la función decorada por @dlt.table()
.
import dlt
# Create a parent function to set local variables
def create_table(table_name):
@dlt.table(name=table_name)
def t():
return spark.read.table(table_name)
tables = ["t1", "t2", "t3"]
for t_name in tables:
create_table(t_name)
# Call `@dlt.table()` within a for loop and pass values as variables
tables = ["t1", "t2", "t3"]
for t_name in tables:
@dlt.table(name=t_name)
def create_table(table_name=t_name):
return spark.read.table(table_name)
En el ejemplo siguiente no se hace referencia a los valores correctamente. En este ejemplo se crean tablas con nombres distintos, pero todas las tablas cargan datos del último valor del for
bucle:
import dlt
# Don't do this!
tables = ["t1", "t2", "t3"]
for t_name in tables:
@dlt.table(name=t_name)
def create_table():
return spark.read.table(t_name)