Freigeben über


Verwenden von Parametern mit Delta Live Tables-Pipelines

In diesem Artikel wird erläutert, wie Sie Pipelinekonfigurationen von Delta Live Tables verwenden können, um Pipelinecode zu parametrisieren.

Parameter referenzieren

Während updates kann Ihr Pipelinequellcode mithilfe der Syntax auf Pipelineparameter zugreifen, um Werte für Spark-Konfigurationen abzurufen.

Sie verweisen mithilfe des Schlüssels auf Pipelineparameter. Der Wert wird in den Quellcode als Zeichenfolge eingefügt, bevor die Quellcodelogik ausgewertet wird.

Die folgende Beispielsyntax verwendet einen Parameter mit Schlüssel source_catalog und Wert dev_catalog , um die Datenquelle für eine materialisierte Ansicht anzugeben:

SQL

CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
  COUNT(txn_id) txn_count,
  SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id

Python

import dlt
from pyspark.sql.functions import col, sum, count

@dlt.table
def transaction_summary():

  source_catalog = spark.conf.get("source_catalog")

  return (spark.read
      .table(f"{source_catalog}.sales.transactions_table")
      .groupBy("account_id")
      .agg(
        count(col("txn_id").alias("txn_count")),
        sum(col("txn_amount").alias("account_revenue"))
      )
    )

Festlegen von Parametern

Übergeben Sie Parameter an Pipelines, indem Sie beliebige Schlüsselwertpaare als Konfigurationen für die Pipeline übergeben. Sie können Parameter festlegen, während Sie eine Pipelinekonfiguration mithilfe der Arbeitsbereich-UI oder JSON definieren oder bearbeiten. Siehe Konfigurieren einer Delta Live Tables-Pipeline.

Pipelineparameterschlüssel können nur alphanumerische Zeichen enthalten _ - . . Parameterwerte werden als Zeichenfolgen festgelegt.

Pipelineparameter unterstützen keine dynamischen Werte. Sie müssen den Wert aktualisieren, der einem Schlüssel in der Pipelinekonfiguration zugeordnet ist.

Wichtig

Verwenden Sie keine Schlüsselwörter, die mit reservierten Pipeline- oder Apache Spark-Konfigurationswerten in Konflikt geraten.

Parametrisieren von Datasetdeklarationen in Python oder SQL

Der Python- und SQL-Code, der Ihre Datasets definiert, kann von den Pipelineeinstellungen parametrisiert werden. Die Parametrisierung ermöglicht die folgenden Anwendungsfälle:

  • Trennen von langen Pfaden und anderen Variablen von Ihrem Code
  • Reduzieren der Datenmenge, die in Entwicklungs- oder Stagingumgebungen verarbeitet wird, um Tests zu beschleunigen.
  • Wiederverwenden derselben Transformationslogik für die Verarbeitung aus mehreren Datenquellen

Im folgenden Beispiel wird der Konfigurationswert startDate verwendet, um die Entwicklungspipeline auf eine Teilmenge der Eingabedaten zu beschränken:

CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
  start_date = spark.conf.get("mypipeline.startDate")
  return read("sourceTable").where(col("date") > start_date)
{
  "name": "Data Ingest - DEV",
  "configuration": {
    "mypipeline.startDate": "2021-01-02"
  }
}
{
  "name": "Data Ingest - PROD",
  "configuration": {
    "mypipeline.startDate": "2010-01-02"
  }
}

Steuern von Datenquellen mit Parametern

Sie können Pipelineparameter verwenden, um unterschiedliche Datenquellen in verschiedenen Konfigurationen derselben Pipeline anzugeben.

Sie können beispielsweise verschiedene Pfade in Entwicklungs-, Test- und Produktionskonfigurationen für eine Pipeline mithilfe der Variablen data_source_path angeben und dann mit dem folgenden Code darauf verweisen:

SQL

CREATE STREAMING TABLE bronze
AS (
    SELECT
    *,
    _metadata.file_path AS source_file_path
    FROM read_files( '${data_source_path}', 'csv',
            map("header", "true"))
)

Python

import dlt
from pyspark.sql.functions import col

data_source_path = spark.conf.get("data_source_path")

@dlt.table
def bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", True)
        .load(data_source_path )
        .select("*", col("_metadata.file_path").alias("source_file_name"))
    )

Dieses Muster eignet sich für Tests, wie die Aufnahmelogik schema- oder falsch formatierte Daten während der erstaufnahme behandeln kann. Sie können den identischen Code in der gesamten Pipeline in allen Umgebungen verwenden, während Sie Datasets austauschen.