Stosowanie parametrów z potokami DLT
W tym artykule wyjaśniono, jak można użyć konfiguracji potoku DLT do sparametryzowania kodu potoku.
Parametry odwołania
Podczas aktualizacji kod źródłowy potoku może uzyskiwać dostęp do parametrów potoku przy użyciu składni w celu uzyskania wartości dla konfiguracji platformy Spark.
Odwołujesz się do parametrów potoku przy użyciu klucza. Wartość jest wstrzykiwana do twojego kodu źródłowego jako ciąg, zanim logika twojego kodu źródłowego zostanie uruchomiona.
Poniższa przykładowa składnia używa parametru z kluczem source_catalog
i wartością dev_catalog
w celu określenia źródła danych dla zmaterializowanego widoku:
SQL
CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
COUNT(txn_id) txn_count,
SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id
Pyton
import dlt
from pyspark.sql.functions import col, sum, count
@dlt.table
def transaction_summary():
source_catalog = spark.conf.get("source_catalog")
return (spark.read
.table(f"{source_catalog}.sales.transactions_table")
.groupBy("account_id")
.agg(
count(col("txn_id").alias("txn_count")),
sum(col("txn_amount").alias("account_revenue"))
)
)
Ustawianie parametrów
Przekazywanie parametrów do potoków poprzez przekazywanie dowolnych par klucz-wartość jako konfiguracji potoków. Podczas definiowania lub edytowania konfiguracji potoku można skonfigurować parametry przy użyciu interfejsu użytkownika obszaru roboczego lub poprzez JSON. Zobacz Konfigurowanie potoku DLT.
Klucze parametrów rur mogą zawierać tylko znaki _ - .
lub znaki alfanumeryczne. Wartości parametrów są ustawiane jako ciągi.
Parametry potoku nie obsługują wartości dynamicznych. Należy zaktualizować wartość skojarzoną z kluczem w konfiguracji potoku.
Ważny
Nie używaj słów kluczowych, które powodują konflikt z zarezerwowanymi wartościami konfiguracji pipeline lub Apache Spark.
Sparametryzować deklaracje zestawu danych w Pythonie lub SQL
Kod Python i SQL definiujący zestawy danych można sparametryzować poprzez ustawienia procesu przetwarzania. Parametryzacja umożliwia następujące przypadki użycia:
- Oddzielanie długich ścieżek i innych zmiennych od kodu.
- Zmniejszenie ilości danych przetwarzanych w środowiskach programistycznych lub przejściowych w celu przyspieszenia testowania.
- Ponowne użycie tej samej logiki przekształcania do przetwarzania danych z wielu źródeł.
W poniższym przykładzie użyto wartości konfiguracji startDate
w celu ograniczenia potoku programowania do podzestawu danych wejściowych:
CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
start_date = spark.conf.get("mypipeline.startDate")
return read("sourceTable").where(col("date") > start_date)
{
"name": "Data Ingest - DEV",
"configuration": {
"mypipeline.startDate": "2021-01-02"
}
}
{
"name": "Data Ingest - PROD",
"configuration": {
"mypipeline.startDate": "2010-01-02"
}
}
Kontrolowanie źródeł danych przy użyciu parametrów
Parametry potoku umożliwiają określenie różnych źródeł danych w różnych konfiguracjach tego samego potoku.
Można na przykład określić różne ścieżki w konfiguracjach programowania, testowania i produkcji dla potoku przy użyciu zmiennej data_source_path
, a następnie odwołać się do niej przy użyciu następującego kodu:
SQL
CREATE STREAMING TABLE bronze
AS (
SELECT
*,
_metadata.file_path AS source_file_path
FROM read_files( '${data_source_path}', 'csv',
map("header", "true"))
)
Pyton
import dlt
from pyspark.sql.functions import col
data_source_path = spark.conf.get("data_source_path")
@dlt.table
def bronze():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", True)
.load(data_source_path )
.select("*", col("_metadata.file_path").alias("source_file_name"))
)
Ten wzorzec jest korzystny do testowania sposobu, w jaki logika pozyskiwania może obsługiwać schemat lub źle sformułowane dane podczas początkowego pozyskiwania. Możesz użyć identycznego kodu w całym procesie we wszystkich środowiskach przy zamianie zestawów danych.