Verwenden von Parametern mit Delta Live Tables-Pipelines
In diesem Artikel wird erläutert, wie Sie Pipelinekonfigurationen von Delta Live Tables verwenden können, um Pipelinecode zu parametrisieren.
Parameter referenzieren
Während updates kann Ihr Pipelinequellcode mithilfe der Syntax auf Pipelineparameter zugreifen, um Werte für Spark-Konfigurationen abzurufen.
Sie verweisen mithilfe des Schlüssels auf Pipelineparameter. Der Wert wird in den Quellcode als Zeichenfolge eingefügt, bevor die Quellcodelogik ausgewertet wird.
Die folgende Beispielsyntax verwendet einen Parameter mit Schlüssel source_catalog
und Wert dev_catalog
, um die Datenquelle für eine materialisierte Ansicht anzugeben:
SQL
CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
COUNT(txn_id) txn_count,
SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id
Python
import dlt
from pyspark.sql.functions import col, sum, count
@dlt.table
def transaction_summary():
source_catalog = spark.conf.get("source_catalog")
return (spark.read
.table(f"{source_catalog}.sales.transactions_table")
.groupBy("account_id")
.agg(
count(col("txn_id").alias("txn_count")),
sum(col("txn_amount").alias("account_revenue"))
)
)
Festlegen von Parametern
Übergeben Sie Parameter an Pipelines, indem Sie beliebige Schlüsselwertpaare als Konfigurationen für die Pipeline übergeben. Sie können Parameter festlegen, während Sie eine Pipelinekonfiguration mithilfe der Arbeitsbereich-UI oder JSON definieren oder bearbeiten. Siehe Konfigurieren einer Delta Live Tables-Pipeline.
Pipelineparameterschlüssel können nur alphanumerische Zeichen enthalten _ - .
. Parameterwerte werden als Zeichenfolgen festgelegt.
Pipelineparameter unterstützen keine dynamischen Werte. Sie müssen den Wert aktualisieren, der einem Schlüssel in der Pipelinekonfiguration zugeordnet ist.
Wichtig
Verwenden Sie keine Schlüsselwörter, die mit reservierten Pipeline- oder Apache Spark-Konfigurationswerten in Konflikt geraten.
Parametrisieren von Datasetdeklarationen in Python oder SQL
Der Python- und SQL-Code, der Ihre Datasets definiert, kann von den Pipelineeinstellungen parametrisiert werden. Die Parametrisierung ermöglicht die folgenden Anwendungsfälle:
- Trennen von langen Pfaden und anderen Variablen von Ihrem Code
- Reduzieren der Datenmenge, die in Entwicklungs- oder Stagingumgebungen verarbeitet wird, um Tests zu beschleunigen.
- Wiederverwenden derselben Transformationslogik für die Verarbeitung aus mehreren Datenquellen
Im folgenden Beispiel wird der Konfigurationswert startDate
verwendet, um die Entwicklungspipeline auf eine Teilmenge der Eingabedaten zu beschränken:
CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
start_date = spark.conf.get("mypipeline.startDate")
return read("sourceTable").where(col("date") > start_date)
{
"name": "Data Ingest - DEV",
"configuration": {
"mypipeline.startDate": "2021-01-02"
}
}
{
"name": "Data Ingest - PROD",
"configuration": {
"mypipeline.startDate": "2010-01-02"
}
}
Steuern von Datenquellen mit Parametern
Sie können Pipelineparameter verwenden, um unterschiedliche Datenquellen in verschiedenen Konfigurationen derselben Pipeline anzugeben.
Sie können beispielsweise verschiedene Pfade in Entwicklungs-, Test- und Produktionskonfigurationen für eine Pipeline mithilfe der Variablen data_source_path
angeben und dann mit dem folgenden Code darauf verweisen:
SQL
CREATE STREAMING TABLE bronze
AS (
SELECT
*,
_metadata.file_path AS source_file_path
FROM read_files( '${data_source_path}', 'csv',
map("header", "true"))
)
Python
import dlt
from pyspark.sql.functions import col
data_source_path = spark.conf.get("data_source_path")
@dlt.table
def bronze():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", True)
.load(data_source_path )
.select("*", col("_metadata.file_path").alias("source_file_name"))
)
Dieses Muster eignet sich für Tests, wie die Aufnahmelogik schema- oder falsch formatierte Daten während der erstaufnahme behandeln kann. Sie können den identischen Code in der gesamten Pipeline in allen Umgebungen verwenden, während Sie Datasets austauschen.