Condividi tramite


Usare i parametri con le pipeline di tabelle live Delta

Questo articolo illustra come usare le configurazioni della pipeline delle tabelle live Delta per parametrizzare il codice della pipeline.

Parametri di riferimento

Durante gli aggiornamenti, il codice sorgente della pipeline può accedere ai parametri della pipeline usando la sintassi per ottenere valori per le configurazioni spark.

È possibile fare riferimento ai parametri della pipeline usando la chiave . Il valore viene inserito nel codice sorgente come stringa prima che la logica del codice sorgente valuti.

La sintassi di esempio seguente usa un parametro con chiave source_catalog e valore dev_catalog per specificare l'origine dati per una vista materializzata:

SQL

CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
  COUNT(txn_id) txn_count,
  SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id

Python

import dlt
from pyspark.sql.functions import col, sum, count

@dlt.table
def transaction_summary():

  source_catalog = spark.conf.get("source_catalog")

  return (spark.read
      .table(f"{source_catalog}.sales.transactions_table")
      .groupBy("account_id")
      .agg(
        count(col("txn_id").alias("txn_count")),
        sum(col("txn_amount").alias("account_revenue"))
      )
    )

Impostare i parametri

Passare parametri alle pipeline passando coppie chiave-valore arbitrarie come configurazioni per la pipeline. È possibile impostare i parametri durante la definizione o la modifica di una configurazione della pipeline usando l'interfaccia utente dell'area di lavoro o JSON. Vedere Configurare una pipeline di tabelle live Delta.

Le chiavi dei parametri della pipeline possono contenere _ - . solo caratteri alfanumerici o alfanumerici. I valori dei parametri vengono impostati come stringhe.

I parametri della pipeline non supportano valori dinamici. È necessario aggiornare il valore associato a una chiave nella configurazione della pipeline.

Importante

Non usare parole chiave in conflitto con pipeline riservate o valori di configurazione di Apache Spark.

Parametrizzare le dichiarazioni del set di dati in Python o SQL

Il codice Python e SQL che definisce i set di dati possono essere parametrizzati dalle impostazioni della pipeline. La parametrizzazione abilita i casi d'uso seguenti:

  • Separazione di percorsi lunghi e altre variabili dal codice.
  • Riduzione della quantità di dati elaborati in ambienti di sviluppo o staging per velocizzare i test.
  • Riutilizzo della stessa logica di trasformazione per l'elaborazione da più origini dati.

Nell'esempio seguente viene usato il valore startDate di configurazione per limitare la pipeline di sviluppo a un subset dei dati di input:

CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
  start_date = spark.conf.get("mypipeline.startDate")
  return read("sourceTable").where(col("date") > start_date)
{
  "name": "Data Ingest - DEV",
  "configuration": {
    "mypipeline.startDate": "2021-01-02"
  }
}
{
  "name": "Data Ingest - PROD",
  "configuration": {
    "mypipeline.startDate": "2010-01-02"
  }
}

Controllare le origini dati con parametri

È possibile usare i parametri della pipeline per specificare origini dati diverse in configurazioni diverse della stessa pipeline.

Ad esempio, è possibile specificare percorsi diversi nelle configurazioni di sviluppo, test e produzione per una pipeline usando la variabile data_source_path e quindi farvi riferimento usando il codice seguente:

SQL

CREATE STREAMING TABLE bronze
AS (
    SELECT
    *,
    _metadata.file_path AS source_file_path
    FROM read_files( '${data_source_path}', 'csv',
            map("header", "true"))
)

Python

import dlt
from pyspark.sql.functions import col

data_source_path = spark.conf.get("data_source_path")

@dlt.table
def bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", True)
        .load(data_source_path )
        .select("*", col("_metadata.file_path").alias("source_file_name"))
    )

Questo modello è utile per testare il modo in cui la logica di inserimento potrebbe gestire lo schema o i dati in formato non valido durante l'inserimento iniziale. È possibile usare il codice identico in tutta la pipeline in tutti gli ambienti durante la disattivazione dei set di dati.