Usare i parametri con le pipeline di tabelle live Delta
Questo articolo illustra come usare le configurazioni della pipeline delle tabelle live Delta per parametrizzare il codice della pipeline.
Parametri di riferimento
Durante gli aggiornamenti, il codice sorgente della pipeline può accedere ai parametri della pipeline usando la sintassi per ottenere valori per le configurazioni spark.
È possibile fare riferimento ai parametri della pipeline usando la chiave . Il valore viene inserito nel codice sorgente come stringa prima che la logica del codice sorgente valuti.
La sintassi di esempio seguente usa un parametro con chiave source_catalog
e valore dev_catalog
per specificare l'origine dati per una vista materializzata:
SQL
CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
COUNT(txn_id) txn_count,
SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id
Python
import dlt
from pyspark.sql.functions import col, sum, count
@dlt.table
def transaction_summary():
source_catalog = spark.conf.get("source_catalog")
return (spark.read
.table(f"{source_catalog}.sales.transactions_table")
.groupBy("account_id")
.agg(
count(col("txn_id").alias("txn_count")),
sum(col("txn_amount").alias("account_revenue"))
)
)
Impostare i parametri
Passare parametri alle pipeline passando coppie chiave-valore arbitrarie come configurazioni per la pipeline. È possibile impostare i parametri durante la definizione o la modifica di una configurazione della pipeline usando l'interfaccia utente dell'area di lavoro o JSON. Vedere Configurare una pipeline di tabelle live Delta.
Le chiavi dei parametri della pipeline possono contenere _ - .
solo caratteri alfanumerici o alfanumerici. I valori dei parametri vengono impostati come stringhe.
I parametri della pipeline non supportano valori dinamici. È necessario aggiornare il valore associato a una chiave nella configurazione della pipeline.
Importante
Non usare parole chiave in conflitto con pipeline riservate o valori di configurazione di Apache Spark.
Parametrizzare le dichiarazioni del set di dati in Python o SQL
Il codice Python e SQL che definisce i set di dati possono essere parametrizzati dalle impostazioni della pipeline. La parametrizzazione abilita i casi d'uso seguenti:
- Separazione di percorsi lunghi e altre variabili dal codice.
- Riduzione della quantità di dati elaborati in ambienti di sviluppo o staging per velocizzare i test.
- Riutilizzo della stessa logica di trasformazione per l'elaborazione da più origini dati.
Nell'esempio seguente viene usato il valore startDate
di configurazione per limitare la pipeline di sviluppo a un subset dei dati di input:
CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
start_date = spark.conf.get("mypipeline.startDate")
return read("sourceTable").where(col("date") > start_date)
{
"name": "Data Ingest - DEV",
"configuration": {
"mypipeline.startDate": "2021-01-02"
}
}
{
"name": "Data Ingest - PROD",
"configuration": {
"mypipeline.startDate": "2010-01-02"
}
}
Controllare le origini dati con parametri
È possibile usare i parametri della pipeline per specificare origini dati diverse in configurazioni diverse della stessa pipeline.
Ad esempio, è possibile specificare percorsi diversi nelle configurazioni di sviluppo, test e produzione per una pipeline usando la variabile data_source_path
e quindi farvi riferimento usando il codice seguente:
SQL
CREATE STREAMING TABLE bronze
AS (
SELECT
*,
_metadata.file_path AS source_file_path
FROM read_files( '${data_source_path}', 'csv',
map("header", "true"))
)
Python
import dlt
from pyspark.sql.functions import col
data_source_path = spark.conf.get("data_source_path")
@dlt.table
def bronze():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", True)
.load(data_source_path )
.select("*", col("_metadata.file_path").alias("source_file_name"))
)
Questo modello è utile per testare il modo in cui la logica di inserimento potrebbe gestire lo schema o i dati in formato non valido durante l'inserimento iniziale. È possibile usare il codice identico in tutta la pipeline in tutti gli ambienti durante la disattivazione dei set di dati.