Преобразование данных с помощью разностных динамических таблиц
В этой статье описывается, как использовать разностные динамические таблицы для объявления преобразований в наборах данных и указания способа обработки записей с помощью логики запроса. Он также содержит примеры распространенных шаблонов преобразования для создания конвейеров Delta Live Tables.
Набор данных можно определить для любого запроса, возвращающего кадр данных. Встроенные операции Apache Spark, определяемые пользователем функции, пользовательские логики и модели MLflow можно использовать в качестве преобразований в конвейере Delta Live Tables. После приема данных в конвейер Разностных динамических таблиц можно определить новые наборы данных для вышестоящих источников для создания новых потоковых таблиц, материализованных представлений и представлений.
Сведения о том, как эффективно выполнять обработку с отслеживанием состояния с помощью разностных динамических таблиц, см. в статье "Оптимизация обработки с отслеживанием состояния" в разностных динамических таблицах с подложками.
Когда следует использовать представления, материализованные представления и таблицы потоковой передачи
При реализации запросов конвейера выберите лучший тип набора данных, чтобы убедиться, что они эффективны и поддерживаются.
Рекомендуется использовать представление для выполнения следующих действий:
- Разорвать большой или сложный запрос, который требуется для упрощения управления запросами.
- Проверьте промежуточные результаты с помощью ожиданий.
- Уменьшите затраты на хранение и вычислительные ресурсы для результатов, которые не нужно сохранять. Так как таблицы материализованы, они требуют дополнительных вычислительных ресурсов и ресурсов хранилища.
Рекомендуется использовать материализованное представление, когда:
- Несколько подчиненных запросов используют таблицу. Так как представления вычисляются по запросу, представление вычисляется повторно при каждом запросе представления.
- Другие конвейеры, задания или запросы используют таблицу. Поскольку представления не материализованы, их можно использовать только в одном конвейере.
- Вы хотите просмотреть результаты запроса во время разработки. Так как таблицы материализованы и могут просматриваться и запрашиваться за пределами конвейера, использование таблиц во время разработки может помочь проверить правильность вычислений. После проверки преобразуйте запросы, которые не требуют материализации в представления.
Рекомендуется использовать таблицу потоковой передачи, когда:
- Запрос определяется для источника данных, который постоянно или постепенно растет.
- Результаты запроса должны вычисляться постепенно.
- Конвейеру требуется высокая пропускная способность и низкая задержка.
Примечание.
Таблицы потоковой передачи всегда определяются для источников потоковой передачи. Вы также можете использовать источники потоковой передачи для APPLY CHANGES INTO
применения обновлений из веб-каналов CDC. См . API APPLY CHANGES: упрощение отслеживания изменений с помощью разностных динамических таблиц.
Исключение таблиц из целевой схемы
Если необходимо вычислить промежуточные таблицы, не предназначенные для внешнего потребления, их можно предотвратить публикацию в схеме с помощью ключевого TEMPORARY
слова. Временные таблицы по-прежнему хранят и обрабатывают данные в соответствии с семантикой разностных динамических таблиц, но не должны быть доступны за пределами текущего конвейера. Временная таблица сохраняется в течение всего времени существования конвейера, создающего его. Используйте следующий синтаксис для объявления временных таблиц:
SQL
CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;
Python
@dlt.table(
temporary=True)
def temp_table():
return ("...")
Объединение потоковых таблиц и материализованных представлений в одном конвейере
Таблицы потоковой передачи наследуют гарантии обработки структурированной потоковой передачи Apache Spark и настраиваются для обработки запросов из источников данных только для добавления, где новые строки всегда вставляются в исходную таблицу, а не изменяются.
Примечание.
Хотя по умолчанию для потоковых таблиц требуются источники данных только для добавления, если источник потоковой передачи является другой потоковой таблицей, требующей обновления или удаления, можно переопределить это поведение с помощью флага skipChangeCommits.
Распространенный шаблон потоковой передачи включает прием исходных данных для создания начальных наборов данных в конвейере. Эти начальные наборы данных обычно называются бронзовыми таблицами и часто выполняют простые преобразования.
Напротив, конечные таблицы в конвейере, часто называемые золотыми таблицами, часто требуют сложных агрегатов или считывания из целевых APPLY CHANGES INTO
объектов операции. Поскольку эти операции по сути создают обновления, а не добавляются, они не поддерживаются как входные данные для потоковых таблиц. Эти преобразования лучше подходят для материализованных представлений.
Смешав потоковые таблицы и материализованные представления в один конвейер, вы можете упростить конвейер, избежать дорогостоящей повторной приема или повторной обработки необработанных данных и получить полную мощность SQL для вычисления сложных агрегатов по эффективно закодированному и фильтруемому набору данных. Такой тип смешанной обработки показан в следующем примере:
Примечание.
В этих примерах используется автозагрузчик для загрузки файлов из облачного хранилища. Для загрузки файлов с помощью автозагрузчика в конвейере с поддержкой каталога Unity необходимо использовать внешние расположения. Дополнительные сведения об использовании каталога Unity с разностными динамическими таблицами см. в статье Использование каталога Unity с конвейерами таблиц Delta Live Tables.
Python
@dlt.table
def streaming_bronze():
return (
# Since this is a streaming source, this table is incremental.
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("abfss://path/to/raw/data")
)
@dlt.table
def streaming_silver():
# Since we read the bronze table as a stream, this silver table is also
# updated incrementally.
return spark.readStream.table("LIVE.streaming_bronze").where(...)
@dlt.table
def live_gold():
# This table will be recomputed completely by reading the whole silver table
# when it is updated.
return spark.readStream.table("LIVE.streaming_silver").groupBy("user_id").count()
SQL
CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM read_files(
"abfss://path/to/raw/data", "json"
)
CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(LIVE.streaming_bronze) WHERE...
CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM LIVE.streaming_silver GROUP BY user_id
Дополнительные сведения об использовании автозагрузчика для добавочного приема JSON-файлов из хранилища Azure.
Статические соединения потоковой передачи
Потоковые статические соединения являются хорошим выбором при денормализации непрерывного потока данных только для добавления с помощью таблицы статических измерений.
При каждом обновлении конвейера новые записи из потока присоединяются к самому текущему моментальному снимку статической таблицы. Если записи добавляются или обновляются в статической таблице после обработки соответствующих данных из потоковой таблицы, результирующие записи не пересчитываются, если не выполняется полное обновление.
В конвейерах, настроенных для запуска, статическая таблица возвращает результаты по истечении времени запуска обновления. В конвейерах, настроенных для непрерывного выполнения, последняя версия статической таблицы запрашивается каждый раз, когда таблица обрабатывает обновление.
Ниже приведен пример статитического соединения потока:
Python
@dlt.table
def customer_sales():
return spark.readStream.table("LIVE.sales").join(spark.readStream.table("LIVE.customers"), ["customer_id"], "left")
SQL
CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(LIVE.sales)
INNER JOIN LEFT LIVE.customers USING (customer_id)
Эффективное вычисление статистических выражений
Таблицы потоковой передачи можно использовать для постепенного вычисления простых агрегатов распределения, таких как число, мин, макс, сумма и алгебраические агрегаты, такие как среднее или стандартное отклонение. Databricks рекомендует добавочную агрегирование для запросов с ограниченным числом групп, таких как запрос с предложением GROUP BY country
. После каждого обновления считываются только новые входные данные.
Дополнительные сведения о написании запросов Delta Live Tables, выполняющих добавочные агрегаты, см. в статье "Выполнение агрегирования с помощью подложек".
Использование моделей MLflow в конвейере разностных динамических таблиц
Примечание.
Чтобы использовать модели MLflow в конвейере с поддержкой каталога Unity, необходимо настроить конвейер для использования preview
канала. Чтобы использовать current
канал, необходимо настроить конвейер для публикации в хранилище метаданных Hive.
Вы можете использовать обученные MLflow модели в конвейерах Delta Live Tables. Модели MLflow рассматриваются как преобразования в Azure Databricks, что означает, что они действуют на входных данных Spark DataFrame и возвращают результаты в качестве кадра данных Spark. Так как разностные динамические таблицы определяют наборы данных для кадров данных, можно преобразовать рабочие нагрузки Apache Spark, использующие MLflow в разностные динамические таблицы с несколькими строками кода. Дополнительные сведения об управлении жизненным циклом машинного обучения с помощью MLflow.
Если у вас уже есть записная книжка Python, вызывающая модель MLflow, можно адаптировать этот код к разностным динамическим таблицам с помощью @dlt.table
декоратора и обеспечения определения функций для возврата результатов преобразования. Разностные динамические таблицы по умолчанию не устанавливают MLflow, поэтому убедитесь, что вы установили библиотеки %pip install mlflow
MLFlow и dlt
импортировали mlflow
и в верхней части записной книжки. Общие сведения о синтаксисе Delta Live Table см. в статье "Разработка кода конвейера с помощью Python".
Чтобы использовать модели MLflow в разностных динамических таблицах, выполните следующие действия.
- Получите идентификатор выполнения и имя модели MLflow. Идентификатор выполнения и имя модели используются для создания URI модели MLflow.
- Используйте URI для определения UDF Spark для загрузки модели MLflow.
- Вызовите UDF в определениях таблиц, чтобы использовать модель MLflow.
В следующем примере показан базовый синтаксис для этого шаблона:
%pip install mlflow
import dlt
import mlflow
run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
@dlt.table
def model_predictions():
return spark.read.table(<input-data>)
.withColumn("prediction", loaded_model_udf(<model-features>))
В качестве полного примера следующий код определяет UDF Spark с именем loaded_model_udf
, который загружает модель MLflow, обученную по данным риска кредита. Столбцы данных, используемые для прогнозирования, передаются в качестве аргумента в UDF. Таблица loan_risk_predictions
вычисляет прогнозы для каждой строки в loan_risk_input_data
.
%pip install mlflow
import dlt
import mlflow
from pyspark.sql.functions import struct
run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
categoricals = ["term", "home_ownership", "purpose",
"addr_state","verification_status","application_type"]
numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
"revol_util", "total_acc", "credit_length_in_years"]
features = categoricals + numerics
@dlt.table(
comment="GBT ML predictions of loan risk",
table_properties={
"quality": "gold"
}
)
def loan_risk_predictions():
return spark.read.table("loan_risk_input_data")
.withColumn('predictions', loaded_model_udf(struct(features)))
Сохранение удалений или обновлений вручную
Разностные динамические таблицы позволяют вручную удалять или обновлять записи из таблицы и выполнять операцию обновления для повторной компиляции подчиненных таблиц.
По умолчанию Разностные динамические таблицы перекомпилируют результаты на основе входных данных при каждом обновлении конвейера, поэтому необходимо убедиться, что удаленная запись не перезагрузится из исходных данных. pipelines.reset.allowed
Задание свойства таблицы, чтобы false
предотвратить обновление таблицы, но не предотвращает добавочную запись в таблицы или новые данные, которые будут передаваться в таблицу.
На следующей схеме показан пример использования двух таблиц потоковой передачи:
raw_user_table
прием необработанных данных пользователя из источника.bmi_table
последовательно вычисляет оценки индекса массы тела (ИМТ), используя значения веса и роста изraw_user_table
.
Вы хотите вручную удалить или обновить записи пользователей из и raw_user_table
перекомпьютировать его bmi_table
.
В следующем коде показано, как задать pipelines.reset.allowed
свойство таблицы, чтобы false
отключить полное обновление raw_user_table
, чтобы с течением времени сохранялись предполагаемые изменения, но при запуске обновления конвейера перекомпьютеризованы следующие таблицы:
CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM read_files("/databricks-datasets/iot-stream/data-user", "csv");
CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(LIVE.raw_user_table);