Dela via


Använd parametrar med DLT-pipelines

Den här artikeln beskriver hur du kan använda DLT-pipelinekonfigurationer för att parametrisera pipelinekod.

Referensparametrar

Under uppdateringar kan din pipeline-källkod komma åt pipelineparametrar med hjälp av syntax för att hämta värden för Spark-konfigurationer.

Du refererar till pipelineparametrar med hjälp av nyckeln. Värdet matas in i källkoden som en sträng innan källkodslogik utvärderas.

Följande exempelsyntax använder en parameter med nyckel source_catalog och värde dev_catalog för att ange datakällan för en materialiserad vy:

SQL

CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
  COUNT(txn_id) txn_count,
  SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id

Python

import dlt
from pyspark.sql.functions import col, sum, count

@dlt.table
def transaction_summary():

  source_catalog = spark.conf.get("source_catalog")

  return (spark.read
      .table(f"{source_catalog}.sales.transactions_table")
      .groupBy("account_id")
      .agg(
        count(col("txn_id").alias("txn_count")),
        sum(col("txn_amount").alias("account_revenue"))
      )
    )

Ange parametrar

Skicka parametrar till pipelines genom att skicka godtyckliga nyckel/värde-par som konfigurationer för pipelinen. Du kan ange parametrar när du definierar eller redigerar en pipelinekonfiguration med hjälp av arbetsytans användargränssnitt eller JSON. Se Konfigurera en DLT-pipeline.

Pipelineparameternycklar kan bara innehålla _ - . eller alfanumeriska tecken. Parametervärden anges som strängar.

Pipelineparametrar stöder inte dynamiska värden. Du måste uppdatera värdet som är associerat med en nyckel i pipelinekonfigurationen.

Viktig

Använd inte nyckelord som är i konflikt med reserverade pipeline- eller Apache Spark-konfigurationsvärden.

Parametrisera datamängdsdeklarationer i Python eller SQL

Python- och SQL-koden som definierar dina datauppsättningar kan parametriseras av pipelinens inställningar. Parameterisering möjliggör följande användningsfall:

  • Separera långa sökvägar och andra variabler från koden.
  • Minska mängden data som bearbetas i utvecklings- eller mellanlagringsmiljöer för att påskynda testningen.
  • Återanvända samma transformeringslogik för att bearbeta från flera datakällor.

I följande exempel används konfigurationsvärdet startDate för att begränsa utvecklingspipelinen till en delmängd av indata:

CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
  start_date = spark.conf.get("mypipeline.startDate")
  return read("sourceTable").where(col("date") > start_date)
{
  "name": "Data Ingest - DEV",
  "configuration": {
    "mypipeline.startDate": "2021-01-02"
  }
}
{
  "name": "Data Ingest - PROD",
  "configuration": {
    "mypipeline.startDate": "2010-01-02"
  }
}

Kontrollera datakällor med parametrar

Du kan använda pipelineparametrar för att ange olika datakällor i olika konfigurationer av samma pipeline.

Du kan till exempel ange olika sökvägar i utvecklings-, testnings- och produktionskonfigurationer för en pipeline med hjälp av variabeln data_source_path och sedan referera till den med hjälp av följande kod:

SQL

CREATE STREAMING TABLE bronze
AS (
    SELECT
    *,
    _metadata.file_path AS source_file_path
    FROM read_files( '${data_source_path}', 'csv',
            map("header", "true"))
)

Python

import dlt
from pyspark.sql.functions import col

data_source_path = spark.conf.get("data_source_path")

@dlt.table
def bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", True)
        .load(data_source_path )
        .select("*", col("_metadata.file_path").alias("source_file_name"))
    )

Det här mönstret är fördelaktigt för att testa hur inmatningslogik kan hantera schemadata eller felaktiga data under inledande inmatning. Du kan använda den identiska koden i hela pipelinen i alla miljöer när du växlar ut datauppsättningar.