Używanie parametrów z potokami delty tabel na żywo
W tym artykule wyjaśniono, jak za pomocą konfiguracji potoku delta Live Tables sparametryzować kod potoku.
Parametry odwołania
Podczas aktualizacji kod źródłowy potoku może uzyskiwać dostęp do parametrów potoku przy użyciu składni w celu uzyskania wartości dla konfiguracji platformy Spark.
Odwołujesz się do parametrów potoku przy użyciu klucza. Wartość jest wstrzykiwana do kodu źródłowego jako ciąg, zanim zostanie obliczona logika kodu źródłowego.
Poniższa przykładowa składnia używa parametru z kluczem source_catalog
i wartością dev_catalog
, aby określić źródło danych dla zmaterializowanego widoku:
SQL
CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
COUNT(txn_id) txn_count,
SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id
Python
import dlt
from pyspark.sql.functions import col, sum, count
@dlt.table
def transaction_summary():
source_catalog = spark.conf.get("source_catalog")
return (spark.read
.table(f"{source_catalog}.sales.transactions_table")
.groupBy("account_id")
.agg(
count(col("txn_id").alias("txn_count")),
sum(col("txn_amount").alias("account_revenue"))
)
)
Ustawianie parametrów
Przekazywanie parametrów do potoków przez przekazanie dowolnych par klucz-wartość jako konfiguracji potoku. Parametry można ustawić podczas definiowania lub edytowania konfiguracji potoku przy użyciu interfejsu użytkownika obszaru roboczego lub JSON. Zobacz Konfigurowanie potoku tabel na żywo delty.
Klucze parametrów potoku mogą zawierać _ - .
tylko znaki alfanumeryczne lub . Wartości parametrów są ustawiane jako ciągi.
Parametry potoku nie obsługują wartości dynamicznych. Należy zaktualizować wartość skojarzona z kluczem w konfiguracji potoku.
Ważne
Nie używaj słów kluczowych, które powodują konflikt z wartościami konfiguracji potoku zarezerwowanego ani platformy Apache Spark.
Parametryzowanie deklaracji zestawu danych w języku Python lub SQL
Kod Python i SQL definiujący zestawy danych można sparametryzować przy użyciu ustawień potoku. Parametryzacja umożliwia następujące przypadki użycia:
- Oddzielanie długich ścieżek i innych zmiennych od kodu.
- Zmniejszenie ilości danych przetwarzanych w środowiskach programistycznych lub przejściowych w celu przyspieszenia testowania.
- Ponowne użycie tej samej logiki przekształcania w celu przetworzenia z wielu źródeł danych.
W poniższym przykładzie użyto startDate
wartości konfiguracji, aby ograniczyć potok programowania do podzestawu danych wejściowych:
CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
start_date = spark.conf.get("mypipeline.startDate")
return read("sourceTable").where(col("date") > start_date)
{
"name": "Data Ingest - DEV",
"configuration": {
"mypipeline.startDate": "2021-01-02"
}
}
{
"name": "Data Ingest - PROD",
"configuration": {
"mypipeline.startDate": "2010-01-02"
}
}
Kontrolowanie źródeł danych przy użyciu parametrów
Parametry potoku umożliwiają określenie różnych źródeł danych w różnych konfiguracjach tego samego potoku.
Można na przykład określić różne ścieżki w konfiguracjach programistycznych, testowych i produkcyjnych dla potoku przy użyciu zmiennej data_source_path
, a następnie odwołać się do niej przy użyciu następującego kodu:
SQL
CREATE STREAMING TABLE bronze
AS (
SELECT
*,
_metadata.file_path AS source_file_path
FROM read_files( '${data_source_path}', 'csv',
map("header", "true"))
)
Python
import dlt
from pyspark.sql.functions import col
data_source_path = spark.conf.get("data_source_path")
@dlt.table
def bronze():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", True)
.load(data_source_path )
.select("*", col("_metadata.file_path").alias("source_file_name"))
)
Ten wzorzec jest korzystny do testowania sposobu, w jaki logika pozyskiwania może obsługiwać schemat lub źle sformułowane dane podczas początkowego pozyskiwania. Możesz użyć identycznego kodu w całym potoku we wszystkich środowiskach podczas przełączania zestawów danych.