Compartir a través de


Uso de parámetros con canalizaciones de Delta Live Tables

En este artículo se explica cómo puede usar configuraciones de canalización de Delta Live Tables para parametrizar el código de canalización.

Parámetros de referencia

Durante las actualizaciones, el código fuente de la canalización puede acceder a los parámetros de canalización mediante la sintaxis para obtener valores para las configuraciones de Spark.

Se hace referencia a los parámetros de canalización mediante la clave . El valor se inserta en el código fuente como una cadena antes de que se evalúe la lógica del código fuente.

La sintaxis de ejemplo siguiente usa un parámetro con clave source_catalog y valor dev_catalog para especificar el origen de datos para una vista materializada:

SQL

CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
  COUNT(txn_id) txn_count,
  SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id

Python

import dlt
from pyspark.sql.functions import col, sum, count

@dlt.table
def transaction_summary():

  source_catalog = spark.conf.get("source_catalog")

  return (spark.read
      .table(f"{source_catalog}.sales.transactions_table")
      .groupBy("account_id")
      .agg(
        count(col("txn_id").alias("txn_count")),
        sum(col("txn_amount").alias("account_revenue"))
      )
    )

Establecimiento de parámetros

Pase parámetros a canalizaciones pasando pares clave-valor arbitrarios como configuraciones para la canalización. Puede establecer parámetros al definir o editar una configuración de canalización mediante la interfaz de usuario del área de trabajo o JSON. Consulte Configuración de una canalización de Delta Live Tables.

Las claves de parámetro de canalización solo pueden contener _ - . o caracteres alfanuméricos. Los valores de parámetro se establecen como cadenas.

Los parámetros de canalización no admiten valores dinámicos. Debe actualizar el valor asociado a una clave en la configuración de canalización.

Importante

No use palabras clave que entren en conflicto con la canalización reservada o los valores de configuración de Apache Spark.

Parametrizar declaraciones de conjunto de datos en Python o SQL

El código de Python y SQL que define los conjuntos de datos se puede parametrizar mediante la configuración de la canalización. La parametrización permite los siguientes casos de uso:

  • Separar las rutas de acceso largas y otras variables del código.
  • Reducir la cantidad de datos procesados en los entornos de desarrollo o ensayo para acelerar las pruebas.
  • Volver a usar la misma lógica de transformación para procesar desde varios orígenes de datos.

En el ejemplo siguiente se usa el valor de configuración startDate para limitar la canalización de desarrollo a un subconjunto de los datos de entrada:

CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
  start_date = spark.conf.get("mypipeline.startDate")
  return read("sourceTable").where(col("date") > start_date)
{
  "name": "Data Ingest - DEV",
  "configuration": {
    "mypipeline.startDate": "2021-01-02"
  }
}
{
  "name": "Data Ingest - PROD",
  "configuration": {
    "mypipeline.startDate": "2010-01-02"
  }
}

Control de orígenes de datos con parámetros

Puede usar parámetros de canalización para especificar orígenes de datos diferentes en configuraciones diferentes de la misma canalización.

Por ejemplo, puede especificar diferentes rutas de acceso en configuraciones de desarrollo, pruebas y producción para una canalización mediante la variable data_source_path y, a continuación, hacer referencia a ella mediante el código siguiente:

SQL

CREATE STREAMING TABLE bronze
AS (
    SELECT
    *,
    _metadata.file_path AS source_file_path
    FROM read_files( '${data_source_path}', 'csv',
            map("header", "true"))
)

Python

import dlt
from pyspark.sql.functions import col

data_source_path = spark.conf.get("data_source_path")

@dlt.table
def bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", True)
        .load(data_source_path )
        .select("*", col("_metadata.file_path").alias("source_file_name"))
    )

Este patrón es beneficioso para probar cómo la lógica de ingesta podría controlar los datos de esquema o malformados durante la ingesta inicial. Puede usar el código idéntico en toda la canalización en todos los entornos al cambiar los conjuntos de datos.