次の方法で共有


Delta Live Tables パイプラインでパラメーターを使用する

この記事では、Delta Live Tables パイプライン構成を使用してパイプライン コードをパラメーター化する方法について説明します。

参照パラメーター

更新中、パイプラインのソース コードは、構文を使用してパイプライン パラメーターにアクセスして、Spark 構成の値を取得できます。

キーを使用してパイプライン パラメーターを参照します。 値は、ソース コード ロジックが評価される前に、文字列としてソース コードに挿入されます。

次の構文例では、キー source_catalog と値 dev_catalog パラメーターを使用して、具体化されたビューのデータ ソースを指定します。

SQL

CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
  COUNT(txn_id) txn_count,
  SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id

Python

import dlt
from pyspark.sql.functions import col, sum, count

@dlt.table
def transaction_summary():

  source_catalog = spark.conf.get("source_catalog")

  return (spark.read
      .table(f"{source_catalog}.sales.transactions_table")
      .groupBy("account_id")
      .agg(
        count(col("txn_id").alias("txn_count")),
        sum(col("txn_amount").alias("account_revenue"))
      )
    )

パラメーターを設定する

パイプラインの構成として任意のキーと値のペアを渡すことによって、パイプラインにパラメーターを渡します。 ワークスペース UI または JSON を使用して、パイプライン構成の定義または編集中にパラメーターを設定できます。 「 デルタ ライブ テーブル パイプラインを構成するを参照してください。

パイプライン パラメーター キーには、 _ - . または英数字のみを含めることができます。 パラメーター値は文字列として設定されます。

パイプライン パラメーターは動的な値をサポートしていません。 パイプライン構成のキーに関連付けられている値を更新する必要があります。

重要

予約済みパイプラインまたは Apache Spark 構成値と競合するキーワードは使用しないでください。

Python または SQL でデータセット宣言をパラメーター化する

データセットを定義する Python と SQL コードは、パイプラインの設定によってパラメーター化できます。 パラメーター化により、次のユース ケースが可能になります。

  • コードから長いパスや他の変数を分離します。
  • テストを高速化するために、開発またはステージング環境で処理されるデータの量を減らします。
  • 同じ変換ロジックを再利用して、複数のデータ ソースから処理します。

次の例では、構成値 startDate を使用して、開発パイプラインを入力データのサブセットに制限します。

CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
  start_date = spark.conf.get("mypipeline.startDate")
  return read("sourceTable").where(col("date") > start_date)
{
  "name": "Data Ingest - DEV",
  "configuration": {
    "mypipeline.startDate": "2021-01-02"
  }
}
{
  "name": "Data Ingest - PROD",
  "configuration": {
    "mypipeline.startDate": "2010-01-02"
  }
}

パラメーターを使用してデータ ソースを制御する

パイプライン パラメーターを使用して、同じパイプラインのさまざまな構成で異なるデータ ソースを指定できます。

たとえば、変数の data_source_path を使用してパイプラインの開発、テスト、運用の構成で異なるパスを指定した後、次のコードを使用して参照することができます。

SQL

CREATE STREAMING TABLE bronze
AS (
    SELECT
    *,
    _metadata.file_path AS source_file_path
    FROM read_files( '${data_source_path}', 'csv',
            map("header", "true"))
)

Python

import dlt
from pyspark.sql.functions import col

data_source_path = spark.conf.get("data_source_path")

@dlt.table
def bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", True)
        .load(data_source_path )
        .select("*", col("_metadata.file_path").alias("source_file_name"))
    )

このパターンは、インジェスト ロジックが初期インジェスト中にスキーマまたは形式が正しくないデータを処理する方法をテストするのに役立ちます。 データセットを切り替えながら、すべての環境のパイプライン全体で同一のコードを使用できます。