แชร์ผ่าน


Use parameters with Delta Live Tables pipelines

This article explains how you can use Delta Live Tables pipeline configurations to parameterize pipeline code.

Reference parameters

During updates, your pipeline source code can access pipeline parameters using syntax to get values for Spark configurations.

You reference pipeline parameters using the key. The value is injected into your source code as a string before your source code logic evaluates.

The following example syntax uses a parameter with key source_catalog and value dev_catalog to specify the data source for a materialized view:

SQL

CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
  COUNT(txn_id) txn_count,
  SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id

Python

import dlt
from pyspark.sql.functions import col, sum, count

@dlt.table
def transaction_summary():

  source_catalog = spark.conf.get("source_catalog")

  return (spark.read
      .table(f"{source_catalog}.sales.transactions_table")
      .groupBy("account_id")
      .agg(
        count(col("txn_id").alias("txn_count")),
        sum(col("txn_amount").alias("account_revenue"))
      )
    )

Set parameters

Pass parameters to pipelines by passing arbitrary key-value pairs as configurations for the pipeline. You can set parameters while defining or editing a pipeline configuration using the workspace UI or JSON. See Configure a Delta Live Tables pipeline.

Pipeline parameter keys can only contain _ - . or alphanumeric characters. Parameter values are set as strings.

Pipeline parameters do not support dynamic values. You must update the value associated with a key in the pipeline configuration.

Important

Do not use keywords that conflict with reserved pipeline or Apache Spark configuration values.

Parameterize dataset declarations in Python or SQL

The Python and SQL code that defines your datasets can be parameterized by the pipeline’s settings. Parameterization enables the following use cases:

  • Separating long paths and other variables from your code.
  • Reducing the amount of data processed in development or staging environments to speed up testing.
  • Reusing the same transformation logic to process from multiple data sources.

The following example uses the startDate configuration value to limit the development pipeline to a subset of the input data:

CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
  start_date = spark.conf.get("mypipeline.startDate")
  return read("sourceTable").where(col("date") > start_date)
{
  "name": "Data Ingest - DEV",
  "configuration": {
    "mypipeline.startDate": "2021-01-02"
  }
}
{
  "name": "Data Ingest - PROD",
  "configuration": {
    "mypipeline.startDate": "2010-01-02"
  }
}

Control data sources with parameters

You can use pipeline parameters to specify different data sources in different configurations of the same pipeline.

For example, you can specify different paths in development, testing, and production configurations for a pipeline using the variable data_source_path and then reference it using the following code:

SQL

CREATE STREAMING TABLE bronze
AS (
    SELECT
    *,
    _metadata.file_path AS source_file_path
    FROM read_files( '${data_source_path}', 'csv',
            map("header", "true"))
)

Python

import dlt
from pyspark.sql.functions import col

data_source_path = spark.conf.get("data_source_path")

@dlt.table
def bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", True)
        .load(data_source_path )
        .select("*", col("_metadata.file_path").alias("source_file_name"))
    )

This pattern is beneficial for testing how ingestion logic might handle schema or malformed data during initial ingestion. You can use the identical code throughout your entire pipeline in all environments while switching out datasets.