Использование параметров с конвейерами Delta Live Tables
В этой статье объясняется, как использовать конфигурации конвейера Delta Live Table для параметризации кода конвейера.
Параметры ссылок
Во время обновлений исходный код конвейера может получить доступ к параметрам конвейера с помощью синтаксиса для получения значений конфигураций Spark.
Вы ссылаетесь на параметры конвейера с помощью ключа. Значение вставляется в исходный код в виде строки перед вычислением логики исходного кода.
В следующем примере синтаксиса используется параметр с ключом source_catalog
и значением dev_catalog
, чтобы указать источник данных для материализованного представления:
SQL
CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
COUNT(txn_id) txn_count,
SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id
Python
import dlt
from pyspark.sql.functions import col, sum, count
@dlt.table
def transaction_summary():
source_catalog = spark.conf.get("source_catalog")
return (spark.read
.table(f"{source_catalog}.sales.transactions_table")
.groupBy("account_id")
.agg(
count(col("txn_id").alias("txn_count")),
sum(col("txn_amount").alias("account_revenue"))
)
)
Установка параметров
Передайте параметры конвейерам путем передачи произвольных пар "ключ-значение" в качестве конфигураций для конвейера. Параметры можно задать при определении или редактировании конфигурации конвейера с помощью пользовательского интерфейса рабочей области или JSON. См. статью "Настройка конвейера разностных динамических таблиц".
Ключи параметров конвейера могут содержать _ - .
только буквенно-цифровые символы. Значения параметров задаются как строки.
Параметры конвейера не поддерживают динамические значения. Необходимо обновить значение, связанное с ключом в конфигурации конвейера.
Внимание
Не используйте ключевые слова, конфликтующие с зарезервированными значениями конвейера или конфигурации Apache Spark.
Параметризация объявлений набора данных в Python или SQL
Код Python и SQL, который определяет наборы данных, может быть параметризован с помощью настроек конвейера. Параметризация позволяет задействовать следующие варианты использования:
- Выделение из кода длинных путей и других переменных.
- Сокращение объема данных, обработанных в средах разработки или промежуточной среды, чтобы ускорить тестирование.
- Повторное использование логики преобразования для обработки содержимого нескольких источников данных.
В следующем примере используется значение конфигурации startDate
, ограничивающее конвейер разработки определенным подмножеством входных данных:
CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
start_date = spark.conf.get("mypipeline.startDate")
return read("sourceTable").where(col("date") > start_date)
{
"name": "Data Ingest - DEV",
"configuration": {
"mypipeline.startDate": "2021-01-02"
}
}
{
"name": "Data Ingest - PROD",
"configuration": {
"mypipeline.startDate": "2010-01-02"
}
}
Управление источниками данных с параметрами
Параметры конвейера можно использовать для указания разных источников данных в разных конфигурациях одного конвейера.
Например, можно указать различные пути разработки, тестирования и рабочей конфигурации для конвейера с помощью переменной data_source_path
, а затем ссылаться на него с помощью следующего кода:
SQL
CREATE STREAMING TABLE bronze
AS (
SELECT
*,
_metadata.file_path AS source_file_path
FROM read_files( '${data_source_path}', 'csv',
map("header", "true"))
)
Python
import dlt
from pyspark.sql.functions import col
data_source_path = spark.conf.get("data_source_path")
@dlt.table
def bronze():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", True)
.load(data_source_path )
.select("*", col("_metadata.file_path").alias("source_file_name"))
)
Этот шаблон полезен для тестирования того, как логика приема может обрабатывать схему или неправильно сформированные данные во время первоначального приема. Вы можете использовать одинаковый код во всем конвейере во всех средах при переключении наборов данных.