将参数与增量实时表管道配合使用
本文介绍如何使用增量实时表管道配置参数化管道代码。
引用参数
在更新期间,管道源代码可以使用语法访问管道参数,以获取 Spark 配置的值。
使用密钥引用管道参数。 在源代码逻辑求值之前,该值将作为字符串被注入源代码中。
以下示例语法使用具有键 source_catalog
和值 dev_catalog
的参数来指定具体化视图的数据源:
SQL
CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
COUNT(txn_id) txn_count,
SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id
Python
import dlt
from pyspark.sql.functions import col, sum, count
@dlt.table
def transaction_summary():
source_catalog = spark.conf.get("source_catalog")
return (spark.read
.table(f"{source_catalog}.sales.transactions_table")
.groupBy("account_id")
.agg(
count(col("txn_id").alias("txn_count")),
sum(col("txn_amount").alias("account_revenue"))
)
)
设置参数
通过将任意键值对作为管道的配置传递给管道,从而将参数传递给管道。 使用工作区 UI 或 JSON 定义或编辑管道配置时,可以设置参数。 请参阅配置增量实时表管道。
作业参数键只能包含 _ - .
或字母数字字符。 参数值设置为字符串。
管道参数不支持动态值。 必须更新与管道配置中的键关联的值。
重要
不要使用与保留管道或 Apache Spark 配置值冲突的关键字。
在 Python 或 SQL 中参数化数据集声明
可以通过管道设置将定义数据集的 Python 和 SQL 代码参数化。 参数化支持以下用例:
- 从代码中分离长路径和其他变量。
- 减少在开发或过渡环境中处理的数据量,以加快测试速度。
- 重用同一转换逻辑来处理多个数据源。
以下示例使用 startDate
配置值将开发管道限制为输入数据的子集:
CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
start_date = spark.conf.get("mypipeline.startDate")
return read("sourceTable").where(col("date") > start_date)
{
"name": "Data Ingest - DEV",
"configuration": {
"mypipeline.startDate": "2021-01-02"
}
}
{
"name": "Data Ingest - PROD",
"configuration": {
"mypipeline.startDate": "2010-01-02"
}
}
使用参数控制数据源
可使用管道参数在同一管道的不同配置中指定不同的数据源。
例如,可以使用变量 data_source_path
在管道的开发、测试和生产配置中指定不同的路径,然后使用以下代码引用该变量:
SQL
CREATE STREAMING TABLE bronze
AS (
SELECT
*,
_metadata.file_path AS source_file_path
FROM read_files( '${data_source_path}', 'csv',
map("header", "true"))
)
Python
import dlt
from pyspark.sql.functions import col
data_source_path = spark.conf.get("data_source_path")
@dlt.table
def bronze():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", True)
.load(data_source_path )
.select("*", col("_metadata.file_path").alias("source_file_name"))
)
要测试引入逻辑如何在初始引入期间处理架构或格式错误的数据,此模式比较有用。 在切换数据集时,可以在所有环境的整个管道中使用相同的代码。