Поделиться через


Преобразование данных с помощью конвейеров

В этой статье описывается, как использовать DLT для объявления преобразований в наборах данных и указания способа обработки записей с помощью логики запроса. Он также содержит примеры распространенных шаблонов преобразования для создания конвейеров DLT.

Набор данных можно определить для любого запроса, возвращающего DataFrame. Встроенные операции Apache Spark, функции, определяемые пользователем, пользовательская логика и модели MLflow могут использоваться в качестве преобразований в конвейере DLT. После приема данных в конвейер DLT, можно определить новые наборы данных для входных источников, чтобы создать новые потоковые таблицы, материализованные представления и представления.

Сведения о том, как эффективно выполнять обработку с отслеживанием состояния с помощью DLT, см. в статье Оптимизация обработки с отслеживанием состояния в DLT с подложками.

Когда следует использовать представления, материализованные представления и потоковые таблицы

При реализации запросов конвейера выберите лучший тип набора данных, чтобы убедиться, что они эффективны и поддерживаются.

Рассмотрите возможность использования представления для выполнения следующих действий:

  • Разделите большой или сложный запрос, чтобы он стал более управляемым и легче обрабатывать.
  • Проверьте промежуточные результаты с помощью ожиданий.
  • Уменьшите затраты на хранение и вычислительные ресурсы для результатов, которые не нужно сохранять. Так как таблицы материализованы, они требуют дополнительных вычислительных ресурсов и ресурсов хранилища.

Рекомендуется использовать материализованное представление, когда:

  • Несколько подчиненных запросов используют таблицу. Так как представления вычисляются по запросу, представление вычисляется повторно при каждом запросе представления.
  • Другие каналы, задания или запросы потребляют таблицу. Поскольку представления не материализованы, их можно использовать только в одном конвейере.
  • Вы хотите просмотреть результаты запроса во время разработки. Так как таблицы материализованы и могут просматриваться и запрашиваться за пределами конвейера, использование таблиц во время разработки может помочь проверить правильность вычислений. После проверки преобразуйте запросы, которые не требуют материализации в представления.

Подумайте об использовании потоковой таблицы, когда:

  • Запрос определяется для источника данных, который постоянно или постепенно растет.
  • Результаты запроса должны вычисляться постепенно.
  • Конвейеру требуется высокая пропускная способность и низкая задержка.

Заметка

Таблицы потоковых данных всегда определяются по отношению к потоковым источникам. Вы также можете использовать источники потоковой передачи с APPLY CHANGES INTO для применения обновлений из веб-каналов CDC. См. API APPLY CHANGES: упрощение захвата изменений с помощью DLT.

Исключение таблиц из целевой схемы

Если необходимо вычислить промежуточные таблицы, не предназначенные для внешнего потребления, можно предотвратить их публикацию в схеме, используя ключевое слово TEMPORARY. Временные таблицы по-прежнему хранят и обрабатывают данные в соответствии с семантикой DLT, но не должны быть доступны за пределами текущего конвейера. Временная таблица сохраняется в течение всего срока существования конвейера, создающего её. Используйте следующий синтаксис для объявления временных таблиц:

SQL

CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;

Питон

@dlt.table(
  temporary=True)
def temp_table():
  return ("...")

Объединение потоковых таблиц и материализованных представлений в одном конвейере

Таблицы потоковой передачи наследуют гарантии обработки структурированной потоковой передачи Apache Spark и настраиваются для обработки запросов из добавляемых источников данных, где новые строки всегда вставляются в исходную таблицу без изменений.

Заметка

Хотя по умолчанию для потоковых таблиц требуются источники данных только для добавления, если источник потоковой передачи является другой таблицей потоковой передачи, требующей обновлений или удалений, можно переопределить это поведение с помощью флага skipChangeCommits.

Распространенный шаблон потоковой обработки данных включает прием исходных данных для создания начальных наборов данных в потоке обработки. Эти начальные наборы данных обычно называются бронзовыми таблицами и часто выполняют простые преобразования.

Напротив, конечные таблицы в конвейере, часто называемые золотыми таблицами, часто требуют сложных агрегатов или чтения из целевых объектов операции APPLY CHANGES INTO. Поскольку эти операции по сути создают обновления, а не добавляются, они не поддерживаются как входные данные для потоковых таблиц. Эти преобразования лучше подходят для материализованных представлений.

Смешав потоковые таблицы и материализованные представления в один конвейер, вы можете упростить конвейер, избежать дорогостоящей повторной приема или повторной обработки необработанных данных и получить полную мощность SQL для вычисления сложных агрегатов по эффективно закодированному и фильтруемому набору данных. В следующем примере показан этот тип смешанной обработки:

Заметка

В этих примерах используется автозагрузчик для загрузки файлов из облачного хранилища. Чтобы загрузить файлы с помощью Auto Loader в конвейере с поддержкой Unity Catalog, необходимо использовать внешние расположения. Дополнительные сведения об использовании каталога Unity с DLT см. в статье Использование каталога Unity с конвейерами DLT.

Питон

@dlt.table
def streaming_bronze():
  return (
    # Since this is a streaming source, this table is incremental.
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://path/to/raw/data")
  )

@dlt.table
def streaming_silver():
  # Since we read the bronze table as a stream, this silver table is also
  # updated incrementally.
  return spark.readStream.table("streaming_bronze").where(...)

@dlt.table
def live_gold():
  # This table will be recomputed completely by reading the whole silver table
  # when it is updated.
  return spark.readStream.table("streaming_silver").groupBy("user_id").count()

SQL

CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM read_files(
  "abfss://path/to/raw/data", "json"
)

CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(streaming_bronze) WHERE...

CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM streaming_silver GROUP BY user_id

Дополнительные сведения о постепенной загрузке JSON-файлов из хранилища Azure с использованием автозагрузчика .

статические потоки соединений

Потоковые статические соединения являются хорошим выбором при денормализации непрерывного потока данных только для добавления с помощью таблицы статических измерений.

При каждом обновлении конвейера новые записи из потока присоединяются к самому текущему моментальному снимку статической таблицы. Если записи добавляются или обновляются в статической таблице после обработки соответствующих данных из потоковой таблицы, результирующие записи не пересчитываются, если не выполняется полное обновление.

В конвейерах, настроенных для выполнения по триггеру, статическая таблица возвращает результаты на момент начала обновления. В конвейерах, настроенных для непрерывного выполнения, последняя версия статической таблицы запрашивается каждый раз, когда таблица обрабатывает обновление.

Ниже приведен пример статитического соединения потока:

Питон

@dlt.table
def customer_sales():
  return spark.readStream.table("sales").join(spark.readStream.table("customers"), ["customer_id"], "left")

SQL

CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(sales)
  INNER JOIN LEFT customers USING (customer_id)

эффективно вычислять агрегаты

Таблицы потоковой передачи можно использовать для постепенного вычисления простых агрегатов распределения, таких как число, мин, макс, сумма и алгебраические агрегаты, такие как среднее или стандартное отклонение. Databricks рекомендует инкрементальное агрегирование запросов с ограниченным числом групп, например, запрос с предложением GROUP BY country. С каждым обновлением считываются только новые входные данные.

Дополнительные сведения о написании запросов DLT, выполняющих инкрементальные агрегации, см. в статье Выполнение оконных агрегаций с водяными знаками.

Использование моделей MLflow в конвейере DLT

Заметка

Чтобы использовать модели MLflow в конвейере с поддержкой каталога Unity, необходимо настроить конвейер для использования канала preview. Чтобы использовать канал current, необходимо настроить конвейер для публикации в хранилище метаданных Hive.

В конвейерах DLT можно использовать обученные MLflow модели. Модели MLflow рассматриваются как преобразования в Azure Databricks, что означает, что они действуют на входной Spark DataFrame и возвращают результаты в виде Spark DataFrame. Так как DLT определяет наборы данных против DataFrames, вы можете преобразовать рабочие нагрузки Apache Spark, которые используют MLflow, в DLT с помощью всего нескольких строк кода. Дополнительные сведения о MLflow см. в MLflow для агента генеративного ИИ и жизненного цикла модели машинного обучения.

Если у вас уже есть записная книжка Python, вызывающая модель MLflow, можно адаптировать этот код к DLT с помощью декоратора @dlt.table и обеспечения определения функций для возврата результатов преобразования. DLT не устанавливает MLflow по умолчанию, поэтому убедитесь, что вы установили библиотеки MLflow с %pip install mlflow и импортировали mlflow и dlt в начале вашего блокнота. Общие сведения о синтаксисе DLT см. в статье Разработка кода конвейера с помощью Python.

Чтобы использовать модели MLflow в DLT, выполните следующие действия.

  1. Получите идентификатор выполнения и имя модели MLflow. Идентификатор выполнения и имя модели используются для создания URI модели MLflow.
  2. Используйте URI для определения UDF для Spark для загрузки модели MLflow.
  3. Вызовите UDF в определениях таблиц, чтобы использовать модель MLflow.

В следующем примере показан базовый синтаксис для этого шаблона:

%pip install mlflow

import dlt
import mlflow

run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

@dlt.table
def model_predictions():
  return spark.read.table(<input-data>)
    .withColumn("prediction", loaded_model_udf(<model-features>))

В качестве полного примера следующий код определяет UDF Spark с именем loaded_model_udf, который загружает модель MLflow, обученную по данным риска кредита. Столбцы данных, используемые для прогнозирования, передаются в качестве аргумента в UDF. Таблица loan_risk_predictions вычисляет прогнозы для каждой строки в loan_risk_input_data.

%pip install mlflow

import dlt
import mlflow
from pyspark.sql.functions import struct

run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dlt.table(
  comment="GBT ML predictions of loan risk",
  table_properties={
    "quality": "gold"
  }
)
def loan_risk_predictions():
  return spark.read.table("loan_risk_input_data")
    .withColumn('predictions', loaded_model_udf(struct(features)))

Сохранить выполненные вручную удаления или обновления

DLT позволяет вручную удалять или обновлять записи из таблицы и выполнять операцию обновления для повторной компиляции нижестоящих таблиц.

По умолчанию DLT перекомпилирует результаты таблицы на основе входных данных при каждом обновлении конвейера, поэтому необходимо убедиться, что удаленная запись не перезагрузится из исходных данных. Установка свойства таблицы pipelines.reset.allowed на false предотвращает обновление таблицы, но не мешает добавочным записям в таблицы и поступлению новых данных в таблицу.

На следующей схеме показан пример использования двух потоковых таблиц.

  • raw_user_table обрабатывает необработанные пользовательские данные из источника.
  • bmi_table постепенно вычисляет показатели ИМТ, используя вес и рост от raw_user_table.

Вы хотите вручную удалить или обновить записи пользователей из raw_user_table и перекомпьютировать bmi_table.

Сохранить схему данных

В следующем коде показано, как установить свойство таблицы pipelines.reset.allowed в false, чтобы отключить полное обновление для raw_user_table, таким образом обеспечивая сохранение запланированных изменений со временем, при этом подчиненные таблицы пересчитываются при запуске обновления конвейера.

CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM read_files("/databricks-datasets/iot-stream/data-user", "csv");

CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(raw_user_table);