共用方式為


搭配 Delta Live Tables 管線使用參數

本文說明如何使用 Delta Live Tables 管線組態來參數化管線程序代碼。

傳址參數

在更新期間,您的管線原始程式碼可以使用語法來存取管線參數,以取得Spark組態的值。

您可以使用 索引鍵來參考管線參數。 此值會在原始程式碼邏輯評估之前,以字串的形式插入您的原始程式碼中。

下列範例語法會使用具有索引鍵 source_catalog 和值 dev_catalog 的參數來指定具體化檢視的數據來源:

SQL

CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
  COUNT(txn_id) txn_count,
  SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id

Python

import dlt
from pyspark.sql.functions import col, sum, count

@dlt.table
def transaction_summary():

  source_catalog = spark.conf.get("source_catalog")

  return (spark.read
      .table(f"{source_catalog}.sales.transactions_table")
      .groupBy("account_id")
      .agg(
        count(col("txn_id").alias("txn_count")),
        sum(col("txn_amount").alias("account_revenue"))
      )
    )

設定參數

傳遞任意索引鍵/值組做為管線組的組態,將參數傳遞至管線。 您可以使用工作區 UI 或 JSON 來定義或編輯管線組態時設定參數。 請參閱 設定 Delta Live Tables 管線

管線參數索引鍵只能包含 _ - . 或英數位元。 參數值會設定為字串。

管線參數不支持動態值。 您必須更新與管線組態中索引鍵相關聯的值。

重要

請勿使用與保留管線或 Apache Spark 組態值衝突的關鍵詞。

在 Python 或 SQL 中參數化資料集宣告

定義資料集的 Python 和 SQL 程式碼可由管線的設定參數化。 參數化可啟用下列使用案例:

  • 將長路徑和其他變數與程式碼分開。
  • 減少開發或預備環境中處理的資料量,以加速測試。
  • 重複使用從多個資料來源處理的相同轉換邏輯。

下列範例會使用 startDate 設定值,將開發管線限制為輸入資料的子集:

CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
  start_date = spark.conf.get("mypipeline.startDate")
  return read("sourceTable").where(col("date") > start_date)
{
  "name": "Data Ingest - DEV",
  "configuration": {
    "mypipeline.startDate": "2021-01-02"
  }
}
{
  "name": "Data Ingest - PROD",
  "configuration": {
    "mypipeline.startDate": "2010-01-02"
  }
}

使用參數控制數據源

您可以使用管線參數,在相同管線的不同組態中指定不同的數據源。

例如,您可以使用 變數 data_source_path 指定管線開發、測試和生產組態中的不同路徑,然後使用下列程式代碼加以參考:

SQL

CREATE STREAMING TABLE bronze
AS (
    SELECT
    *,
    _metadata.file_path AS source_file_path
    FROM read_files( '${data_source_path}', 'csv',
            map("header", "true"))
)

Python

import dlt
from pyspark.sql.functions import col

data_source_path = spark.conf.get("data_source_path")

@dlt.table
def bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", True)
        .load(data_source_path )
        .select("*", col("_metadata.file_path").alias("source_file_name"))
    )

此模式有助於測試擷取邏輯在初始擷取期間如何處理架構或格式不正確的數據。 在切換資料集時,您可以在所有環境中在整個管線中使用相同的程序代碼。