Použití parametrů s kanály Delta Live Tables
Tento článek vysvětluje, jak můžete použít konfigurace kanálů Delta Live Tables k parametrizaci kódu kanálu.
Parametry odkazu
Během aktualizací může zdrojový kód kanálu přistupovat k parametrům kanálu pomocí syntaxe pro získání hodnot pro konfigurace Sparku.
Na parametry kanálu odkazujete pomocí klíče. Hodnota se vloží do zdrojového kódu jako řetězec před vyhodnocením logiky zdrojového kódu.
Následující příklad syntaxe používá parametr s klíčem source_catalog
a hodnotou dev_catalog
k určení zdroje dat pro materializované zobrazení:
SQL
CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
COUNT(txn_id) txn_count,
SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id
Python
import dlt
from pyspark.sql.functions import col, sum, count
@dlt.table
def transaction_summary():
source_catalog = spark.conf.get("source_catalog")
return (spark.read
.table(f"{source_catalog}.sales.transactions_table")
.groupBy("account_id")
.agg(
count(col("txn_id").alias("txn_count")),
sum(col("txn_amount").alias("account_revenue"))
)
)
Nastavení parametrů
Předejte do kanálů parametry předáním libovolných párů klíč-hodnota jako konfigurace kanálu. Parametry můžete nastavit při definování nebo úpravě konfigurace kanálu pomocí uživatelského rozhraní pracovního prostoru nebo JSON. Viz Konfigurace kanálu delta živých tabulek.
Klíče parametrů kanálu můžou obsahovat _ - .
pouze alfanumerické znaky. Hodnoty parametrů jsou nastaveny jako řetězce.
Parametry kanálu nepodporují dynamické hodnoty. Je nutné aktualizovat hodnotu přidruženou ke klíči v konfiguraci kanálu.
Důležité
Nepoužívejte klíčová slova, která jsou v konfliktu s hodnotami konfigurace vyhrazeného kanálu nebo Apache Sparku.
Parametrizace deklarací datové sady v Pythonu nebo SQL
Kód Pythonu a SQL, který definuje datové sady, je možné parametrizovat nastavením kanálu. Parametrizace umožňuje následující případy použití:
- Oddělení dlouhých cest a dalších proměnných od kódu
- Snížení množství dat zpracovaných ve vývojových nebo přípravných prostředích za účelem urychlení testování
- Opětovné použití stejné logiky transformace pro zpracování z více zdrojů dat
Následující příklad používá startDate
hodnotu konfigurace k omezení vývojového kanálu na podmnožinu vstupních dat:
CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
start_date = spark.conf.get("mypipeline.startDate")
return read("sourceTable").where(col("date") > start_date)
{
"name": "Data Ingest - DEV",
"configuration": {
"mypipeline.startDate": "2021-01-02"
}
}
{
"name": "Data Ingest - PROD",
"configuration": {
"mypipeline.startDate": "2010-01-02"
}
}
Řízení zdrojů dat pomocí parametrů
Parametry kanálu můžete použít k určení různých zdrojů dat v různých konfiguracích stejného kanálu.
Můžete například určit různé cesty při vývoji, testování a produkční konfiguraci kanálu pomocí proměnné data_source_path
a pak na něj odkazovat pomocí následujícího kódu:
SQL
CREATE STREAMING TABLE bronze
AS (
SELECT
*,
_metadata.file_path AS source_file_path
FROM read_files( '${data_source_path}', 'csv',
map("header", "true"))
)
Python
import dlt
from pyspark.sql.functions import col
data_source_path = spark.conf.get("data_source_path")
@dlt.table
def bronze():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", True)
.load(data_source_path )
.select("*", col("_metadata.file_path").alias("source_file_name"))
)
Tento model je užitečný pro testování, jak logika příjmu dat může během počátečního příjmu dat zpracovávat schéma nebo poškozená data. Při přepínání datových sad můžete použít stejný kód v celém kanálu ve všechprostředích