Поделиться через


Использование параметров с конвейерами DLT

В этой статье объясняется, как использовать конфигурации конвейера DLT для параметризации кода конвейера.

Эталонные параметры

Во время обновлений исходный код конвейера может получить доступ к параметрам конвейера с помощью синтаксиса для получения значений конфигураций Spark.

Вы ссылаетесь на параметры конвейера с помощью ключа. Значение вставляется в исходный код в виде строки перед вычислением логики исходного кода.

В следующем примере синтаксиса используется параметр с ключом source_catalog и значением dev_catalog, чтобы указать источник данных для материализованного представления:

SQL

CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
  COUNT(txn_id) txn_count,
  SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id

Питон

import dlt
from pyspark.sql.functions import col, sum, count

@dlt.table
def transaction_summary():

  source_catalog = spark.conf.get("source_catalog")

  return (spark.read
      .table(f"{source_catalog}.sales.transactions_table")
      .groupBy("account_id")
      .agg(
        count(col("txn_id").alias("txn_count")),
        sum(col("txn_amount").alias("account_revenue"))
      )
    )

Установка параметров

Передайте параметры конвейерам путем передачи произвольных пар "ключ-значение" в качестве конфигураций для конвейера. Параметры можно задать при определении или редактировании конфигурации конвейера с помощью пользовательского интерфейса рабочей области или JSON. См. Настройте конвейер DLT.

Ключи параметров конвейера могут содержать только _ - . или буквенно-цифровые символы. Значения параметров задаются как строки.

Параметры конвейера не поддерживают динамические значения. Необходимо обновить значение, связанное с ключом в конфигурации конвейера.

Важный

Не используйте ключевые слова, конфликтующие с зарезервированными значениями конвейера или конфигурации Apache Spark.

Параметризуйте объявления наборов данных в Python или SQL

Код Python и SQL, определяющие наборы данных, можно параметризовать с помощью параметров конвейера. Параметризация включает следующие варианты использования:

  • Разделение длинных путей и других переменных из кода.
  • Сокращение объема обрабатываемых данных в средах разработки или промежуточного развертывания для ускорения тестирования.
  • Повторное использование одной логики преобразования для обработки из нескольких источников данных.

В следующем примере используется значение конфигурации startDate, чтобы ограничить конвейер разработки подмножеством входных данных:

CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
  start_date = spark.conf.get("mypipeline.startDate")
  return read("sourceTable").where(col("date") > start_date)
{
  "name": "Data Ingest - DEV",
  "configuration": {
    "mypipeline.startDate": "2021-01-02"
  }
}
{
  "name": "Data Ingest - PROD",
  "configuration": {
    "mypipeline.startDate": "2010-01-02"
  }
}

Управление источниками данных с параметрами

Параметры конвейера можно использовать для указания разных источников данных в разных конфигурациях одного конвейера.

Например, можно указать различные пути разработки, тестирования и рабочей конфигурации для конвейера с помощью переменной data_source_path, а затем ссылаться на него с помощью следующего кода:

SQL

CREATE STREAMING TABLE bronze
AS (
    SELECT
    *,
    _metadata.file_path AS source_file_path
    FROM read_files( '${data_source_path}', 'csv',
            map("header", "true"))
)

Питон

import dlt
from pyspark.sql.functions import col

data_source_path = spark.conf.get("data_source_path")

@dlt.table
def bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", True)
        .load(data_source_path )
        .select("*", col("_metadata.file_path").alias("source_file_name"))
    )

Этот шаблон полезен для тестирования того, как логика приема может обрабатывать схему или неправильно сформированные данные во время первоначального приема. Вы можете использовать одинаковый код во всем конвейере во всех средах при переключении наборов данных.