将参数与增量实时表管道配合使用

本文介绍如何使用增量实时表管道配置参数化管道代码。

引用参数

在更新期间,管道源代码可以使用语法访问管道参数,以获取 Spark 配置的值。

使用密钥引用管道参数。 在源代码逻辑求值之前,该值将作为字符串被注入源代码中。

以下示例语法使用具有键 source_catalog 和值 dev_catalog 的参数来指定具体化视图的数据源:

SQL

CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
  COUNT(txn_id) txn_count,
  SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id

Python

import dlt
from pyspark.sql.functions import col, sum, count

@dlt.table
def transaction_summary():

  source_catalog = spark.conf.get("source_catalog")

  return (spark.read
      .table(f"{source_catalog}.sales.transactions_table")
      .groupBy("account_id")
      .agg(
        count(col("txn_id").alias("txn_count")),
        sum(col("txn_amount").alias("account_revenue"))
      )
    )

设置参数

通过将任意键值对作为管道的配置传递给管道,从而将参数传递给管道。 使用工作区 UI 或 JSON 定义或编辑管道配置时,可以设置参数。 请参阅配置增量实时表管道

作业参数键只能包含 _ - . 或字母数字字符。 参数值设置为字符串。

管道参数不支持动态值。 必须更新与管道配置中的键关联的值。

重要

不要使用与保留管道或 Apache Spark 配置值冲突的关键字。

在 Python 或 SQL 中参数化数据集声明

可以通过管道设置将定义数据集的 Python 和 SQL 代码参数化。 参数化支持以下用例:

  • 从代码中分离长路径和其他变量。
  • 减少在开发或过渡环境中处理的数据量,以加快测试速度。
  • 重用同一转换逻辑来处理多个数据源。

以下示例使用 startDate 配置值将开发管道限制为输入数据的子集:

CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
  start_date = spark.conf.get("mypipeline.startDate")
  return read("sourceTable").where(col("date") > start_date)
{
  "name": "Data Ingest - DEV",
  "configuration": {
    "mypipeline.startDate": "2021-01-02"
  }
}
{
  "name": "Data Ingest - PROD",
  "configuration": {
    "mypipeline.startDate": "2010-01-02"
  }
}

使用参数控制数据源

可使用管道参数在同一管道的不同配置中指定不同的数据源。

例如,可以使用变量 data_source_path 在管道的开发、测试和生产配置中指定不同的路径,然后使用以下代码引用该变量:

SQL

CREATE STREAMING TABLE bronze
AS (
    SELECT
    *,
    _metadata.file_path AS source_file_path
    FROM read_files( '${data_source_path}', 'csv',
            map("header", "true"))
)

Python

import dlt
from pyspark.sql.functions import col

data_source_path = spark.conf.get("data_source_path")

@dlt.table
def bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", True)
        .load(data_source_path )
        .select("*", col("_metadata.file_path").alias("source_file_name"))
    )

要测试引入逻辑如何在初始引入期间处理架构或格式错误的数据,此模式比较有用。 在切换数据集时,可以在所有环境的整个管道中使用相同的代码。