Отслеживание конвейеров динамических таблиц Delta
В этой статье описывается использование встроенных функций мониторинга и наблюдаемости для конвейеров Delta Live Tables. Такие функции поддерживают такие задачи, как:
- Наблюдение за ходом выполнения и состоянием обновлений конвейера. Дополнительные сведения о конвейере доступны в пользовательском интерфейсе.
- Оповещение о событиях конвейера, таких как успешность или сбой обновлений конвейера. Дополнительные сведения о событиях конвейера см. в разделе "Добавление Уведомления по электронной почте".
- Просмотр метрик для источников потоковой передачи, таких как Apache Kafka и автозагрузчик (общедоступная предварительная версия). См. просмотр метрик потоковой передачи.
- Извлекая подробные сведения об обновлениях конвейера, таких как происхождение данных, метрики качества данных и использование ресурсов. См. раздел " Что такое журнал событий Delta Live Tables?".
- Определение пользовательских действий, выполняемых при возникновении определенных событий. См. раздел "Определение пользовательского мониторинга конвейеров Delta Live Tables" с помощью перехватчиков событий.
Сведения о проверке и диагностике производительности запросов см. в журнале запросов Access для конвейеров Delta Live Tables. Эта функция предоставляется в режиме общедоступной предварительной версии.
Добавление Уведомления по электронной почте для событий конвейера
Вы можете настроить один или несколько адресов электронной почты для получения уведомлений, когда происходит следующее:
- Обновление конвейера успешно завершено.
- Обновление конвейера завершается сбоем либо с повторным повтором, либо с ошибкой, не допускающей повторных попыток. Выберите этот параметр, чтобы получить уведомление обо всех сбоях конвейера.
- Обновление конвейера завершается ошибкой без повторных попыток (неустранимая). Выберите этот параметр, чтобы получить уведомление только в том случае, если возникает ошибка без повторных попыток.
- Сбой одного потока данных.
Чтобы настроить Уведомления по электронной почте при создании или изменении конвейера:
- Нажмите кнопку "Добавить уведомление".
- Введите один или несколько адресов электронной почты для получения уведомлений.
- Установите флажок для каждого типа уведомления, чтобы отправить на настроенные адреса электронной почты.
- Нажмите кнопку "Добавить уведомление".
Какие сведения о конвейере доступны в пользовательском интерфейсе?
График конвейера отображается сразу после успешного запуска обновления конвейера. Стрелки представляют зависимости между наборами данных в конвейере. По умолчанию на странице сведений о конвейере отображается последнее обновление таблицы, но в раскрывающемся меню можно выбрать старые обновления.
Сведения включают идентификатор конвейера, исходный код, затраты на вычисления, выпуск продукта и канал, настроенный для конвейера.
Чтобы просмотреть табличное представление наборов данных, щелкните вкладку "Список ". Представление списка позволяет просматривать все наборы данных в конвейере, представленные в виде строки в таблице, и полезно, если daG конвейера слишком велик, чтобы визуализироваться в представлении Графа . Вы можете управлять наборами данных, отображаемыми в таблице, с помощью нескольких фильтров, таких как имя набора данных, тип и состояние. Чтобы вернуться к визуализации DAG, щелкните Graph.
Пользователь Запуск от имени является владельцем конвейера, и обновления конвейера выполняются с разрешениями этого пользователя. Чтобы изменить пользователя run as
, щелкните Разрешения и измените владельца конвейера.
Как просмотреть сведения о наборе данных?
Щелкнув набор данных в графе конвейера или списке наборов данных, отображаются сведения о наборе данных. Сведения включают схему набора данных, метрики качества данных и ссылку на исходный код, определяющий набор данных.
Просмотр журнала обновлений
Чтобы просмотреть журнал и состояние обновлений конвейера, щелкните раскрывающееся меню журнала обновлений в верхней строке.
Выберите обновление в раскрывающемся меню, чтобы просмотреть граф, сведения и события для обновления. Чтобы вернуться к последнему обновлению, щелкните Показать последнее обновление.
просмотр метрик потоковой передачи
Важный
Наблюдаемость потоковой передачи для таблиц Delta Live доступна в общедоступной предварительной версии.
Вы можете анализировать метрики потоков из источников данных, поддерживаемых структурированной потоковой передачей Spark, таких как Apache Kafka, Amazon Kinesis, Auto Loader и таблицы Delta, для каждого потока в вашем конвейере Delta Live Tables. Метрики отображаются в виде диаграмм в правой области пользовательского интерфейса разностных таблиц и включают секунды невыполненной работы, байты невыполненной работы, записи невыполненной работы и файлы невыполненной работы. На диаграммах отображается максимальное значение, агрегированное по минуте, а подсказка отображает максимальные значения при наведении указателя мыши на диаграмму. Данные ограничены последними 48 часами с текущего времени.
Таблицы в конвейере с метриками потоковой передачи, доступны диаграммы разностных таблиц Delta Live Tables при просмотре DAG конвейера в представлении пользовательского интерфейса Graph. Чтобы просмотреть метрики потоковой передачи, щелкните значок , чтобы отобразить диаграмму метрик потоковой передачи на вкладке потоки в правой панели. Вы также можете применить фильтр для просмотра только таблиц с метриками потоковой передачи, щелкнув List, а затем щелкнув Имеет метрики потоковой передачи.
Каждый источник потоковой передачи поддерживает только определенные метрики. Метрики, не поддерживаемые источником потоковой передачи, недоступны для просмотра в пользовательском интерфейсе. В следующей таблице показаны метрики, доступные для поддерживаемых источников потоковой передачи:
источник | невыполненные байты | записи невыполненной работы | секунды невыполненной работы | файлы невыполненной работы |
---|---|---|---|---|
Кафка | ✓ | ✓ | ||
Кинезис | ✓ | ✓ | ||
Дельта | ✓ | ✓ | ||
Автозагрузчик | ✓ | ✓ | ||
Google Pub/Sub | ✓ | ✓ |
Что такое журнал событий Delta Live Tables?
Журнал событий разностных динамических таблиц содержит все сведения, связанные с конвейером, включая журналы аудита, проверки качества данных, ход конвейера и происхождение данных. С помощью журнала событий можно отслеживать, анализировать и контролировать состояние конвейеров данных.
Записи журнала событий можно просматривать в пользовательском интерфейсе разностных динамических таблиц, API разностных динамических таблиц или путем прямого запроса к журналу событий. В этом разделе основное внимание уделяется запросу журнала событий напрямую.
Можно также определить пользовательские действия, выполняемые при регистрации событий, например отправку оповещений с перехватчиками событий.
Схема журнала событий
Схема журнала событий описана в таблице ниже. Некоторые из этих полей содержат данные JSON, требующие синтаксического анализа для выполнения некоторых запросов, таких как details
поле. Azure Databricks поддерживает :
оператор для анализа полей JSON. См . оператор : (знак двоеточия).
Поле | Description |
---|---|
id |
Уникальный идентификатор записи журнала событий. |
sequence |
Документ JSON, содержащий метаданные для обнаружения и упорядочения событий. |
origin |
Документ JSON, содержащий метаданные для источника события, например поставщика облачных служб, региона поставщика облачных служб, user_id или для отображения места создания конвейера либо pipeline_id pipeline_type .DBSQL WORKSPACE |
timestamp |
Время записи события. |
message |
Удобное для чтения сообщение, описывающее событие. |
level |
Тип события, например , INFO , WARN ERROR или METRICS . |
error |
В случае возникновения ошибки сведения, описывающие ошибку. |
details |
Документ JSON, содержащий структурированные сведения о событии. Это основное поле, используемое для анализа событий. |
event_type |
Тип события. |
maturity_level |
Стабильность схемы событий. Возможны следующие значения: - STABLE : схема стабильна и не изменится.- NULL : схема стабильна и не изменится. Значение может быть NULL , если запись была создана до maturity_level добавления поля (выпуск 2022.37).- EVOLVING : схема не стабильна и может измениться.- DEPRECATED : схема устарела, и среда выполнения Delta Live Tables может перестать создавать это событие в любое время. |
Запрос журнала событий
Расположение журнала событий и интерфейса для запроса журнала событий зависит от того, настроен ли конвейер на использование хранилища метаданных Hive или каталога Unity.
Хранилище метаданных Hive
Если конвейер публикует таблицы в хранилище метаданных Hive, журнал событий хранится в /system/events
расположении storage
. Например, если параметр конвейера storage
настроен как /Users/username/data
, то журнал событий хранится в /Users/username/data/system/events
пути в DBFS.
Если параметр storage
не настроен, расположением журнала событий по умолчанию является /pipelines/<pipeline-id>/system/events
в DBFS. Например, если идентификатор конвейера — 91de5e48-35ed-11ec-8d3d-0242ac130003
, то место хранения — /pipelines/91de5e48-35ed-11ec-8d3d-0242ac130003/system/events
.
Можно создать представление для упрощения запросов к журналу событий. В следующем примере создается временное представление event_log_raw
. Это представление используется в примерах запросов журнала событий, включенных в эту статью:
CREATE OR REPLACE TEMP VIEW event_log_raw AS SELECT * FROM delta.`<event-log-path>`;
Замените <event-log-path>
расположением журнала событий.
Каждый экземпляр выполнения конвейера называется обновлением. Часто требуется извлечь сведения для последнего обновления. Выполните следующий запрос, чтобы найти идентификатор последнего обновления и сохранить его во временном latest_update_id
представлении. Это представление используется в примерах запросов журнала событий, включенных в эту статью:
CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;
Журнал событий можно запросить в записной книжке Azure Databricks или редакторе SQL. Используйте записную книжку или редактор SQL для выполнения примеров запросов журнала событий.
Каталог Unity
Если конвейер публикует таблицы в каталоге Unity, необходимо использовать event_log
табличную функцию (TVF), чтобы получить журнал событий для конвейера. Журнал событий для конвейера извлекается путем передачи идентификатора конвейера или имени таблицы в TVF. Например, чтобы получить записи журнала событий для конвейера с идентификатором 04c78631-3dd7-4856-b2a6-7d84e9b2638b
:
SELECT * FROM event_log("04c78631-3dd7-4856-b2a6-7d84e9b2638b")
Чтобы получить записи журнала событий для конвейера, который создал или владеет таблицей my_catalog.my_schema.table1
:
SELECT * FROM event_log(TABLE(my_catalog.my_schema.table1))
Чтобы вызвать TVF, необходимо использовать общий кластер или хранилище SQL. Например, можно использовать записную книжку, подключенную к общему кластеру, или использовать редактор SQL, подключенный к хранилищу SQL.
Чтобы упростить запросы событий для конвейера, владелец конвейера может создать представление по event_log
TVF. В следующем примере создается представление журнала событий для конвейера. Это представление используется в примерах запросов журнала событий, включенных в эту статью.
Примечание.
event_log
TVF может вызываться только владельцем конвейера, а представление, созданное через event_log
TVF, может запрашиваться только владельцем конвейера. Представление не может быть предоставлено другим пользователям.
CREATE VIEW event_log_raw AS SELECT * FROM event_log("<pipeline-ID>");
Замените <pipeline-ID>
уникальным идентификатором конвейера Delta Live Tables. Идентификатор можно найти на панели сведений о конвейере в пользовательском интерфейсе разностных динамических таблиц.
Каждый экземпляр выполнения конвейера называется обновлением. Часто требуется извлечь сведения для последнего обновления. Выполните следующий запрос, чтобы найти идентификатор последнего обновления и сохранить его во временном latest_update_id
представлении. Это представление используется в примерах запросов журнала событий, включенных в эту статью:
CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;
Запрос сведений о происхождении данных из журнала событий
События, содержащие сведения о происхождении данных, имеют тип события flow_definition
. Объект details:flow_definition
содержит output_dataset
и input_datasets
определяет каждую связь в графе.
Для извлечения входных и выходных наборов данных можно использовать следующий запрос, чтобы просмотреть сведения о происхождении:
SELECT
details:flow_definition.output_dataset as output_dataset,
details:flow_definition.input_datasets as input_dataset
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_definition'
AND
origin.update_id = latest_update.id
output_dataset |
input_datasets |
---|---|
customers |
null |
sales_orders_raw |
null |
sales_orders_cleaned |
["customers", "sales_orders_raw"] |
sales_order_in_la |
["sales_orders_cleaned"] |
Запрос качества данных из журнала событий
Если вы определяете ожидания для наборов данных в конвейере, метрики качества данных хранятся в объекте details:flow_progress.data_quality.expectations
. События, содержащие сведения о качестве данных, имеют тип события flow_progress
. В следующем примере запрашиваются метрики качества данных для последнего обновления конвейера:
SELECT
row_expectations.dataset as dataset,
row_expectations.name as expectation,
SUM(row_expectations.passed_records) as passing_records,
SUM(row_expectations.failed_records) as failing_records
FROM
(
SELECT
explode(
from_json(
details :flow_progress :data_quality :expectations,
"array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
)
) row_expectations
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_progress'
AND origin.update_id = latest_update.id
)
GROUP BY
row_expectations.dataset,
row_expectations.name
dataset |
expectation |
passing_records |
failing_records |
---|---|---|---|
sales_orders_cleaned |
valid_order_number |
4083 | 0 |
Мониторинг невыполненной работы данных путем запроса к журналу событий
Delta Live Tables отслеживает количество данных, присутствующих в невыполненной работы в объекте details:flow_progress.metrics.backlog_bytes
. События, содержащие метрики невыполненной работы, имеют тип flow_progress
события. В следующем примере запрашиваются метрики невыполненной работы для последнего обновления конвейера:
SELECT
timestamp,
Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
event_log_raw,
latest_update
WHERE
event_type ='flow_progress'
AND
origin.update_id = latest_update.id
Примечание.
Метрики невыполненной работы могут быть недоступны в зависимости от типа источника данных конвейера и версии Databricks Runtime.
Мониторинг расширенных событий автомасштабирования из журнала событий для конвейеров без поддержки сервера
Для конвейеров DLT, которые не используют бессерверные вычисления, журнал событий записывает изменения размера кластера при включении расширенного автомасштабирования в конвейерах. События, содержащие сведения о расширенном автомасштабировании, имеют тип autoscale
события. Сведения об изменении размера кластера хранятся в объекте details:autoscale
. В следующем примере выполняется запросы на изменение размера расширенного кластера автомасштабирования для последнего обновления конвейера:
SELECT
timestamp,
Double(
case
when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
else null
end
) as starting_num_executors,
Double(
case
when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as succeeded_num_executors,
Double(
case
when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as partially_succeeded_num_executors,
Double(
case
when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
else null
end
) as failed_num_executors
FROM
event_log_raw,
latest_update
WHERE
event_type = 'autoscale'
AND
origin.update_id = latest_update.id
Мониторинг использования вычислительных ресурсов
cluster_resources
события предоставляют метрики по количеству слотов задач в кластере, количеству используемых слотов задач и количеству задач, ожидающих планирования.
Если включен расширенный автомасштабирование, cluster_resources
события также содержат метрики для алгоритма автомасштабирования, включая latest_requested_num_executors
и optimal_num_executors
. События также показывают состояние алгоритма в виде различных состояний, таких как CLUSTER_AT_DESIRED_SIZE
, SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORS
и BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION
.
Эти сведения можно просматривать вместе с событиями автомасштабирования, чтобы обеспечить общую картину расширенного автомасштабирования.
В следующем примере выполняется запрос журнала размера очереди задач для последнего обновления конвейера:
SELECT
timestamp,
Double(details :cluster_resources.avg_num_queued_tasks) as queue_size
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
В следующем примере выполняется запрос журнала использования для последнего обновления конвейера:
SELECT
timestamp,
Double(details :cluster_resources.avg_task_slot_utilization) as utilization
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
В следующем примере выполняется запрос журнала счетчиков исполнителя, сопровождаемых метриками, доступными только для расширенных конвейеров автомасштабирования, включая количество исполнителей, запрашиваемых алгоритмом в последнем запросе, оптимальное количество исполнителей, рекомендуемых алгоритмом на основе последних метрик, и состояния алгоритма автомасштабирования:
SELECT
timestamp,
Double(details :cluster_resources.num_executors) as current_executors,
Double(details :cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
Double(details :cluster_resources.optimal_num_executors) as optimal_num_executors,
details :cluster_resources.state as autoscaling_state
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
Аудит конвейеров динамических таблиц Delta
Вы можете использовать записи журнала событий разностных динамических таблиц и другие журналы аудита Azure Databricks, чтобы получить полное представление о том, как данные обновляются в разностных динамических таблицах.
Разностные динамические таблицы используют учетные данные владельца конвейера для выполнения обновлений. Вы можете изменить используемые учетные данные, обновив владельца конвейера. Разностные динамические таблицы записывают пользователя, выполняющего действия в конвейере, включая создание конвейера, изменение конфигурации и активацию обновлений.
Сведения о событиях аудита каталога Unity см. в справочнике по событиям аудита каталога Unity.
Запрос действий пользователя в журнале событий
Можно использовать журнал событий для аудита событий, например действий пользователя. События, содержащие сведения о действиях пользователя, имеют тип события user_action
.
Сведения о действии хранятся в объекте user_action
в поле details
. Используйте следующий запрос, чтобы создать журнал аудита событий пользователя. Сведения о создании представления, используемого event_log_raw
в этом запросе, см. в разделе "Запрос журнала событий".
SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp |
action |
user_name |
---|---|---|
2021-05-20T19:36:03.517+0000 | START |
user@company.com |
2021-05-20T19:35:59.913+0000 | CREATE |
user@company.com |
2021-05-27T00:35:51.971+0000 | START |
user@company.com |
Сведения о среде выполнения
Вы можете просмотреть сведения о среде выполнения для обновления конвейера, например версию Databricks Runtime для обновления:
SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version |
---|
11,0 |