Använd parametrar med DLT-pipelines
Den här artikeln beskriver hur du kan använda DLT-pipelinekonfigurationer för att parametrisera pipelinekod.
Referensparametrar
Under uppdateringar kan din pipeline-källkod komma åt pipelineparametrar med hjälp av syntax för att hämta värden för Spark-konfigurationer.
Du refererar till pipelineparametrar med hjälp av nyckeln. Värdet matas in i källkoden som en sträng innan källkodslogik utvärderas.
Följande exempelsyntax använder en parameter med nyckel source_catalog
och värde dev_catalog
för att ange datakällan för en materialiserad vy:
SQL
CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
COUNT(txn_id) txn_count,
SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id
Python
import dlt
from pyspark.sql.functions import col, sum, count
@dlt.table
def transaction_summary():
source_catalog = spark.conf.get("source_catalog")
return (spark.read
.table(f"{source_catalog}.sales.transactions_table")
.groupBy("account_id")
.agg(
count(col("txn_id").alias("txn_count")),
sum(col("txn_amount").alias("account_revenue"))
)
)
Ange parametrar
Skicka parametrar till pipelines genom att skicka godtyckliga nyckel/värde-par som konfigurationer för pipelinen. Du kan ange parametrar när du definierar eller redigerar en pipelinekonfiguration med hjälp av arbetsytans användargränssnitt eller JSON. Se Konfigurera en DLT-pipeline.
Pipelineparameternycklar kan bara innehålla _ - .
eller alfanumeriska tecken. Parametervärden anges som strängar.
Pipelineparametrar stöder inte dynamiska värden. Du måste uppdatera värdet som är associerat med en nyckel i pipelinekonfigurationen.
Viktig
Använd inte nyckelord som är i konflikt med reserverade pipeline- eller Apache Spark-konfigurationsvärden.
Parametrisera datamängdsdeklarationer i Python eller SQL
Python- och SQL-koden som definierar dina datauppsättningar kan parametriseras av pipelinens inställningar. Parameterisering möjliggör följande användningsfall:
- Separera långa sökvägar och andra variabler från koden.
- Minska mängden data som bearbetas i utvecklings- eller mellanlagringsmiljöer för att påskynda testningen.
- Återanvända samma transformeringslogik för att bearbeta från flera datakällor.
I följande exempel används konfigurationsvärdet startDate
för att begränsa utvecklingspipelinen till en delmängd av indata:
CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
start_date = spark.conf.get("mypipeline.startDate")
return read("sourceTable").where(col("date") > start_date)
{
"name": "Data Ingest - DEV",
"configuration": {
"mypipeline.startDate": "2021-01-02"
}
}
{
"name": "Data Ingest - PROD",
"configuration": {
"mypipeline.startDate": "2010-01-02"
}
}
Kontrollera datakällor med parametrar
Du kan använda pipelineparametrar för att ange olika datakällor i olika konfigurationer av samma pipeline.
Du kan till exempel ange olika sökvägar i utvecklings-, testnings- och produktionskonfigurationer för en pipeline med hjälp av variabeln data_source_path
och sedan referera till den med hjälp av följande kod:
SQL
CREATE STREAMING TABLE bronze
AS (
SELECT
*,
_metadata.file_path AS source_file_path
FROM read_files( '${data_source_path}', 'csv',
map("header", "true"))
)
Python
import dlt
from pyspark.sql.functions import col
data_source_path = spark.conf.get("data_source_path")
@dlt.table
def bronze():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", True)
.load(data_source_path )
.select("*", col("_metadata.file_path").alias("source_file_name"))
)
Det här mönstret är fördelaktigt för att testa hur inmatningslogik kan hantera schemadata eller felaktiga data under inledande inmatning. Du kan använda den identiska koden i hela pipelinen i alla miljöer när du växlar ut datauppsättningar.