Partilhar via


Usar parâmetros com pipelines DLT

Este artigo explica como você pode usar configurações de pipeline DLT para parametrizar o código do pipeline.

Parâmetros de referência

Durante as atualizações, o código-fonte do pipeline pode acessar os parâmetros do pipeline usando a sintaxe para obter valores para as configurações do Spark.

Você faz referência a parâmetros de pipeline usando a chave. O valor é injetado no código-fonte como uma cadeia de caracteres antes que a lógica do código-fonte seja avaliada.

O exemplo de sintaxe a seguir usa um parâmetro com source_catalog de chave e dev_catalog de valor para especificar a fonte de dados para uma exibição materializada:

SQL

CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
  COUNT(txn_id) txn_count,
  SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id

Python

import dlt
from pyspark.sql.functions import col, sum, count

@dlt.table
def transaction_summary():

  source_catalog = spark.conf.get("source_catalog")

  return (spark.read
      .table(f"{source_catalog}.sales.transactions_table")
      .groupBy("account_id")
      .agg(
        count(col("txn_id").alias("txn_count")),
        sum(col("txn_amount").alias("account_revenue"))
      )
    )

Definir parâmetros

Para passar parâmetros para pipelines, utilize pares chave-valor arbitrários como configurações do pipeline. Você pode definir parâmetros ao definir ou editar uma configuração de pipeline usando a interface do usuário do espaço de trabalho ou JSON. Consulte Configurar um pipeline de DLT.

As chaves de parâmetro do pipeline só podem conter _ - . ou caracteres alfanuméricos. Os valores dos parâmetros são definidos como strings.

Os parâmetros de pipeline não suportam valores dinâmicos. Você deve atualizar o valor associado a uma chave na configuração do pipeline.

Importante

Não use palavras-chave que entrem em conflito com valores reservados do pipeline ou de configuração do Apache Spark.

Parametrizar declarações de conjunto de dados em Python ou SQL

O código Python e SQL que define seus conjuntos de dados pode ser parametrizado pelas configurações do pipeline. A parametrização permite os seguintes casos de uso:

  • Separar caminhos longos e outras variáveis do seu código.
  • Reduzir a quantidade de dados processados em ambientes de desenvolvimento ou preparação para acelerar os testes.
  • Reutilizar a mesma lógica de transformação para processar a partir de várias fontes de dados.

O exemplo a seguir usa o valor de configuração startDate para limitar o pipeline de desenvolvimento a um subconjunto dos dados de entrada:

CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
  start_date = spark.conf.get("mypipeline.startDate")
  return read("sourceTable").where(col("date") > start_date)
{
  "name": "Data Ingest - DEV",
  "configuration": {
    "mypipeline.startDate": "2021-01-02"
  }
}
{
  "name": "Data Ingest - PROD",
  "configuration": {
    "mypipeline.startDate": "2010-01-02"
  }
}

Controlar fontes de dados com parâmetros

Você pode usar parâmetros de pipeline para especificar diferentes fontes de dados em configurações diferentes do mesmo pipeline.

Por exemplo, você pode especificar caminhos diferentes em configurações de desenvolvimento, teste e produção para um pipeline usando a variável data_source_path e, em seguida, fazer referência a ele usando o seguinte código:

SQL

CREATE STREAMING TABLE bronze
AS (
    SELECT
    *,
    _metadata.file_path AS source_file_path
    FROM read_files( '${data_source_path}', 'csv',
            map("header", "true"))
)

Python

import dlt
from pyspark.sql.functions import col

data_source_path = spark.conf.get("data_source_path")

@dlt.table
def bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", True)
        .load(data_source_path )
        .select("*", col("_metadata.file_path").alias("source_file_name"))
    )

Esse padrão é benéfico para testar como a lógica de ingestão pode lidar com o esquema ou dados malformados durante a ingestão inicial. Você pode usar o código idêntico em todo o pipeline em todos os ambientes enquanto alterna conjuntos de dados.