Partager via


Utiliser des paramètres avec des pipelines Delta Live Tables

Cet article explique comment utiliser les configurations de pipeline Delta Live Tables pour paramétrer le code du pipeline.

Paramètres de référence

Pendant les mises à jour, votre code source de pipeline peut accéder aux paramètres de pipeline à l’aide de la syntaxe pour obtenir des valeurs pour les configurations Spark.

Vous référencez les paramètres de pipeline à l’aide de la clé. La valeur est injectée dans votre code source sous forme de chaîne avant l’évaluation de votre logique de code source.

L’exemple de syntaxe suivant utilise un paramètre avec clé source_catalog et valeur dev_catalog pour spécifier la source de données d’une vue matérialisée :

SQL

CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
  COUNT(txn_id) txn_count,
  SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id

Python

import dlt
from pyspark.sql.functions import col, sum, count

@dlt.table
def transaction_summary():

  source_catalog = spark.conf.get("source_catalog")

  return (spark.read
      .table(f"{source_catalog}.sales.transactions_table")
      .groupBy("account_id")
      .agg(
        count(col("txn_id").alias("txn_count")),
        sum(col("txn_amount").alias("account_revenue"))
      )
    )

Définition des paramètres

Transmettez des paramètres aux pipelines en passant des paires clé-valeur arbitraires en tant que configurations pour le pipeline. Vous pouvez définir des paramètres lors de la définition ou de la modification d’une configuration de pipeline à l’aide de l’interface utilisateur de l’espace de travail ou json. Consultez Configurer un pipeline Delta Live Tables.

Les clés de paramètre de pipeline ne peuvent contenir _ - . que des caractères alphanumériques. Les valeurs de paramètre sont définies en tant que chaînes.

Les paramètres de pipeline ne prennent pas en charge les valeurs dynamiques. Vous devez mettre à jour la valeur associée à une clé dans la configuration du pipeline.

Important

N’utilisez pas de mots clés qui entrent en conflit avec les valeurs de configuration du pipeline réservé ou Apache Spark.

Paramétrer les déclarations de jeu de données en Python ou SQL

Le code Python et SQL qui définit vos jeux de données peut être paramétrisé par les paramètres du pipeline. La paramétrisation permet les cas d’usage suivants :

  • Séparation des chemins longs et d’autres variables dans votre code.
  • Réduction de la quantité de données traitées dans les environnements de développement ou intermédiaire pour accélérer la phase de test.
  • Réutilisation de la même logique de transformation pour le traitement à partir de plusieurs sources de données.

L’exemple suivant utilise la valeur de configuration startDate pour limiter le pipeline de développement à un sous-ensemble des données d’entrée :

CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
  start_date = spark.conf.get("mypipeline.startDate")
  return read("sourceTable").where(col("date") > start_date)
{
  "name": "Data Ingest - DEV",
  "configuration": {
    "mypipeline.startDate": "2021-01-02"
  }
}
{
  "name": "Data Ingest - PROD",
  "configuration": {
    "mypipeline.startDate": "2010-01-02"
  }
}

Contrôler les sources de données avec des paramètres

Vous pouvez utiliser des paramètres de pipeline pour spécifier différentes sources de données dans différentes configurations du même pipeline.

Par exemple, vous pouvez spécifier différents chemins d’accès dans les configurations de développement, de test et de production d’un pipeline à l’aide de la variable data_source_path, puis la référencer à l’aide du code suivant :

SQL

CREATE STREAMING TABLE bronze
AS (
    SELECT
    *,
    _metadata.file_path AS source_file_path
    FROM read_files( '${data_source_path}', 'csv',
            map("header", "true"))
)

Python

import dlt
from pyspark.sql.functions import col

data_source_path = spark.conf.get("data_source_path")

@dlt.table
def bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", True)
        .load(data_source_path )
        .select("*", col("_metadata.file_path").alias("source_file_name"))
    )

Ce modèle est bénéfique pour tester la façon dont la logique d’ingestion peut gérer des données de schéma ou mal formées pendant l’ingestion initiale. Vous pouvez utiliser le même code dans l’ensemble de votre pipeline pour tous les environnements lors de l’échange de jeux de données.