Поделиться через


Отслеживание конвейеров динамических таблиц Delta

В этой статье описывается использование встроенных функций мониторинга и наблюдаемости для конвейеров Delta Live Tables. Такие функции поддерживают такие задачи, как:

Сведения о проверке и диагностике производительности запросов см. в журнале запросов Access для конвейеров Delta Live Tables. Эта функция предоставляется в режиме общедоступной предварительной версии.

Добавление Уведомления по электронной почте для событий конвейера

Вы можете настроить один или несколько адресов электронной почты для получения уведомлений, когда происходит следующее:

  • Обновление конвейера успешно завершено.
  • Обновление конвейера завершается сбоем либо с повторным повтором, либо с ошибкой, не допускающей повторных попыток. Выберите этот параметр, чтобы получить уведомление обо всех сбоях конвейера.
  • Обновление конвейера завершается ошибкой без повторных попыток (неустранимая). Выберите этот параметр, чтобы получить уведомление только в том случае, если возникает ошибка без повторных попыток.
  • Сбой одного потока данных.

Чтобы настроить Уведомления по электронной почте при создании или изменении конвейера:

  1. Нажмите кнопку "Добавить уведомление".
  2. Введите один или несколько адресов электронной почты для получения уведомлений.
  3. Установите флажок для каждого типа уведомления, чтобы отправить на настроенные адреса электронной почты.
  4. Нажмите кнопку "Добавить уведомление".

Какие сведения о конвейере доступны в пользовательском интерфейсе?

График конвейера отображается сразу после успешного запуска обновления конвейера. Стрелки представляют зависимости между наборами данных в конвейере. По умолчанию на странице сведений о конвейере отображается последнее обновление таблицы, но в раскрывающемся меню можно выбрать старые обновления.

Сведения включают идентификатор конвейера, исходный код, затраты на вычисления, выпуск продукта и канал, настроенный для конвейера.

Чтобы просмотреть табличное представление наборов данных, щелкните вкладку "Список ". Представление списка позволяет просматривать все наборы данных в конвейере, представленные в виде строки в таблице, и полезно, если daG конвейера слишком велик, чтобы визуализироваться в представлении Графа . Вы можете управлять наборами данных, отображаемыми в таблице, с помощью нескольких фильтров, таких как имя набора данных, тип и состояние. Чтобы вернуться к визуализации DAG, щелкните Graph.

Пользователь Запуск от имени является владельцем конвейера, и обновления конвейера выполняются с разрешениями этого пользователя. Чтобы изменить пользователя run as, щелкните Разрешения и измените владельца конвейера.

Как просмотреть сведения о наборе данных?

Щелкнув набор данных в графе конвейера или списке наборов данных, отображаются сведения о наборе данных. Сведения включают схему набора данных, метрики качества данных и ссылку на исходный код, определяющий набор данных.

Просмотр журнала обновлений

Чтобы просмотреть журнал и состояние обновлений конвейера, щелкните раскрывающееся меню журнала обновлений в верхней строке.

Выберите обновление в раскрывающемся меню, чтобы просмотреть граф, сведения и события для обновления. Чтобы вернуться к последнему обновлению, щелкните Показать последнее обновление.

просмотр метрик потоковой передачи

Важный

Наблюдаемость потоковой передачи для таблиц Delta Live доступна в общедоступной предварительной версии.

Вы можете анализировать метрики потоков из источников данных, поддерживаемых структурированной потоковой передачей Spark, таких как Apache Kafka, Amazon Kinesis, Auto Loader и таблицы Delta, для каждого потока в вашем конвейере Delta Live Tables. Метрики отображаются в виде диаграмм в правой области пользовательского интерфейса разностных таблиц и включают секунды невыполненной работы, байты невыполненной работы, записи невыполненной работы и файлы невыполненной работы. На диаграммах отображается максимальное значение, агрегированное по минуте, а подсказка отображает максимальные значения при наведении указателя мыши на диаграмму. Данные ограничены последними 48 часами с текущего времени.

Таблицы в конвейере с метриками потоковой передачи, доступны , отображают значок диаграммы разностных таблиц Delta Live Tables при просмотре DAG конвейера в представлении пользовательского интерфейса Graph. Чтобы просмотреть метрики потоковой передачи, щелкните значок диаграммы Delta Live Tables, чтобы отобразить диаграмму метрик потоковой передачи на вкладке потоки в правой панели. Вы также можете применить фильтр для просмотра только таблиц с метриками потоковой передачи, щелкнув List, а затем щелкнув Имеет метрики потоковой передачи.

Каждый источник потоковой передачи поддерживает только определенные метрики. Метрики, не поддерживаемые источником потоковой передачи, недоступны для просмотра в пользовательском интерфейсе. В следующей таблице показаны метрики, доступные для поддерживаемых источников потоковой передачи:

источник невыполненные байты записи невыполненной работы секунды невыполненной работы файлы невыполненной работы
Кафка
Кинезис
Дельта
Автозагрузчик
Google Pub/Sub

Что такое журнал событий Delta Live Tables?

Журнал событий разностных динамических таблиц содержит все сведения, связанные с конвейером, включая журналы аудита, проверки качества данных, ход конвейера и происхождение данных. С помощью журнала событий можно отслеживать, анализировать и контролировать состояние конвейеров данных.

Записи журнала событий можно просматривать в пользовательском интерфейсе разностных динамических таблиц, API разностных динамических таблиц или путем прямого запроса к журналу событий. В этом разделе основное внимание уделяется запросу журнала событий напрямую.

Можно также определить пользовательские действия, выполняемые при регистрации событий, например отправку оповещений с перехватчиками событий.

Схема журнала событий

Схема журнала событий описана в таблице ниже. Некоторые из этих полей содержат данные JSON, требующие синтаксического анализа для выполнения некоторых запросов, таких как details поле. Azure Databricks поддерживает : оператор для анализа полей JSON. См . оператор : (знак двоеточия).

Поле Description
id Уникальный идентификатор записи журнала событий.
sequence Документ JSON, содержащий метаданные для обнаружения и упорядочения событий.
origin Документ JSON, содержащий метаданные для источника события, например поставщика облачных служб, региона поставщика облачных служб, user_idили для отображения места создания конвейера либо pipeline_idpipeline_type .DBSQLWORKSPACE
timestamp Время записи события.
message Удобное для чтения сообщение, описывающее событие.
level Тип события, например , INFO, WARNERRORили METRICS.
error В случае возникновения ошибки сведения, описывающие ошибку.
details Документ JSON, содержащий структурированные сведения о событии. Это основное поле, используемое для анализа событий.
event_type Тип события.
maturity_level Стабильность схемы событий. Возможны следующие значения:

- STABLE: схема стабильна и не изменится.
- NULL: схема стабильна и не изменится. Значение может быть NULL , если запись была создана до maturity_level добавления поля (выпуск 2022.37).
- EVOLVING: схема не стабильна и может измениться.
- DEPRECATED: схема устарела, и среда выполнения Delta Live Tables может перестать создавать это событие в любое время.

Запрос журнала событий

Расположение журнала событий и интерфейса для запроса журнала событий зависит от того, настроен ли конвейер на использование хранилища метаданных Hive или каталога Unity.

Хранилище метаданных Hive

Если конвейер публикует таблицы в хранилище метаданных Hive, журнал событий хранится в /system/events расположении storage . Например, если параметр конвейера storage настроен как /Users/username/data, то журнал событий хранится в /Users/username/data/system/events пути в DBFS.

Если параметр storage не настроен, расположением журнала событий по умолчанию является /pipelines/<pipeline-id>/system/events в DBFS. Например, если идентификатор конвейера — 91de5e48-35ed-11ec-8d3d-0242ac130003, то место хранения — /pipelines/91de5e48-35ed-11ec-8d3d-0242ac130003/system/events.

Можно создать представление для упрощения запросов к журналу событий. В следующем примере создается временное представление event_log_raw. Это представление используется в примерах запросов журнала событий, включенных в эту статью:

CREATE OR REPLACE TEMP VIEW event_log_raw AS SELECT * FROM delta.`<event-log-path>`;

Замените <event-log-path> расположением журнала событий.

Каждый экземпляр выполнения конвейера называется обновлением. Часто требуется извлечь сведения для последнего обновления. Выполните следующий запрос, чтобы найти идентификатор последнего обновления и сохранить его во временном latest_update_id представлении. Это представление используется в примерах запросов журнала событий, включенных в эту статью:

CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;

Журнал событий можно запросить в записной книжке Azure Databricks или редакторе SQL. Используйте записную книжку или редактор SQL для выполнения примеров запросов журнала событий.

Каталог Unity

Если конвейер публикует таблицы в каталоге Unity, необходимо использовать event_logтабличную функцию (TVF), чтобы получить журнал событий для конвейера. Журнал событий для конвейера извлекается путем передачи идентификатора конвейера или имени таблицы в TVF. Например, чтобы получить записи журнала событий для конвейера с идентификатором 04c78631-3dd7-4856-b2a6-7d84e9b2638b:

SELECT * FROM event_log("04c78631-3dd7-4856-b2a6-7d84e9b2638b")

Чтобы получить записи журнала событий для конвейера, который создал или владеет таблицей my_catalog.my_schema.table1:

SELECT * FROM event_log(TABLE(my_catalog.my_schema.table1))

Чтобы вызвать TVF, необходимо использовать общий кластер или хранилище SQL. Например, можно использовать записную книжку, подключенную к общему кластеру, или использовать редактор SQL, подключенный к хранилищу SQL.

Чтобы упростить запросы событий для конвейера, владелец конвейера может создать представление по event_log TVF. В следующем примере создается представление журнала событий для конвейера. Это представление используется в примерах запросов журнала событий, включенных в эту статью.

Примечание.

event_log TVF может вызываться только владельцем конвейера, а представление, созданное через event_log TVF, может запрашиваться только владельцем конвейера. Представление не может быть предоставлено другим пользователям.

CREATE VIEW event_log_raw AS SELECT * FROM event_log("<pipeline-ID>");

Замените <pipeline-ID> уникальным идентификатором конвейера Delta Live Tables. Идентификатор можно найти на панели сведений о конвейере в пользовательском интерфейсе разностных динамических таблиц.

Каждый экземпляр выполнения конвейера называется обновлением. Часто требуется извлечь сведения для последнего обновления. Выполните следующий запрос, чтобы найти идентификатор последнего обновления и сохранить его во временном latest_update_id представлении. Это представление используется в примерах запросов журнала событий, включенных в эту статью:

CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;

Запрос сведений о происхождении данных из журнала событий

События, содержащие сведения о происхождении данных, имеют тип события flow_definition. Объект details:flow_definition содержит output_dataset и input_datasets определяет каждую связь в графе.

Для извлечения входных и выходных наборов данных можно использовать следующий запрос, чтобы просмотреть сведения о происхождении:

SELECT
  details:flow_definition.output_dataset as output_dataset,
  details:flow_definition.input_datasets as input_dataset
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'flow_definition'
  AND
  origin.update_id = latest_update.id
output_dataset input_datasets
customers null
sales_orders_raw null
sales_orders_cleaned ["customers", "sales_orders_raw"]
sales_order_in_la ["sales_orders_cleaned"]

Запрос качества данных из журнала событий

Если вы определяете ожидания для наборов данных в конвейере, метрики качества данных хранятся в объекте details:flow_progress.data_quality.expectations . События, содержащие сведения о качестве данных, имеют тип события flow_progress. В следующем примере запрашиваются метрики качества данных для последнего обновления конвейера:

SELECT
  row_expectations.dataset as dataset,
  row_expectations.name as expectation,
  SUM(row_expectations.passed_records) as passing_records,
  SUM(row_expectations.failed_records) as failing_records
FROM
  (
    SELECT
      explode(
        from_json(
          details :flow_progress :data_quality :expectations,
          "array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
        )
      ) row_expectations
    FROM
      event_log_raw,
      latest_update
    WHERE
      event_type = 'flow_progress'
      AND origin.update_id = latest_update.id
  )
GROUP BY
  row_expectations.dataset,
  row_expectations.name
dataset expectation passing_records failing_records
sales_orders_cleaned valid_order_number 4083 0

Мониторинг невыполненной работы данных путем запроса к журналу событий

Delta Live Tables отслеживает количество данных, присутствующих в невыполненной работы в объекте details:flow_progress.metrics.backlog_bytes . События, содержащие метрики невыполненной работы, имеют тип flow_progressсобытия. В следующем примере запрашиваются метрики невыполненной работы для последнего обновления конвейера:

SELECT
  timestamp,
  Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
  event_log_raw,
  latest_update
WHERE
  event_type ='flow_progress'
  AND
  origin.update_id = latest_update.id

Примечание.

Метрики невыполненной работы могут быть недоступны в зависимости от типа источника данных конвейера и версии Databricks Runtime.

Мониторинг расширенных событий автомасштабирования из журнала событий для конвейеров без поддержки сервера

Для конвейеров DLT, которые не используют бессерверные вычисления, журнал событий записывает изменения размера кластера при включении расширенного автомасштабирования в конвейерах. События, содержащие сведения о расширенном автомасштабировании, имеют тип autoscaleсобытия. Сведения об изменении размера кластера хранятся в объекте details:autoscale. В следующем примере выполняется запросы на изменение размера расширенного кластера автомасштабирования для последнего обновления конвейера:

SELECT
  timestamp,
  Double(
    case
      when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
      else null
    end
  ) as starting_num_executors,
  Double(
    case
      when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as partially_succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
      else null
    end
  ) as failed_num_executors
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'autoscale'
  AND
  origin.update_id = latest_update.id

Мониторинг использования вычислительных ресурсов

cluster_resources события предоставляют метрики по количеству слотов задач в кластере, количеству используемых слотов задач и количеству задач, ожидающих планирования.

Если включен расширенный автомасштабирование, cluster_resources события также содержат метрики для алгоритма автомасштабирования, включая latest_requested_num_executorsи optimal_num_executors. События также показывают состояние алгоритма в виде различных состояний, таких как CLUSTER_AT_DESIRED_SIZE, SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORSи BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION. Эти сведения можно просматривать вместе с событиями автомасштабирования, чтобы обеспечить общую картину расширенного автомасштабирования.

В следующем примере выполняется запрос журнала размера очереди задач для последнего обновления конвейера:

SELECT
  timestamp,
  Double(details :cluster_resources.avg_num_queued_tasks) as queue_size
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

В следующем примере выполняется запрос журнала использования для последнего обновления конвейера:

SELECT
  timestamp,
  Double(details :cluster_resources.avg_task_slot_utilization) as utilization
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

В следующем примере выполняется запрос журнала счетчиков исполнителя, сопровождаемых метриками, доступными только для расширенных конвейеров автомасштабирования, включая количество исполнителей, запрашиваемых алгоритмом в последнем запросе, оптимальное количество исполнителей, рекомендуемых алгоритмом на основе последних метрик, и состояния алгоритма автомасштабирования:

SELECT
  timestamp,
  Double(details :cluster_resources.num_executors) as current_executors,
  Double(details :cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
  Double(details :cluster_resources.optimal_num_executors) as optimal_num_executors,
  details :cluster_resources.state as autoscaling_state
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

Аудит конвейеров динамических таблиц Delta

Вы можете использовать записи журнала событий разностных динамических таблиц и другие журналы аудита Azure Databricks, чтобы получить полное представление о том, как данные обновляются в разностных динамических таблицах.

Разностные динамические таблицы используют учетные данные владельца конвейера для выполнения обновлений. Вы можете изменить используемые учетные данные, обновив владельца конвейера. Разностные динамические таблицы записывают пользователя, выполняющего действия в конвейере, включая создание конвейера, изменение конфигурации и активацию обновлений.

Сведения о событиях аудита каталога Unity см. в справочнике по событиям аудита каталога Unity.

Запрос действий пользователя в журнале событий

Можно использовать журнал событий для аудита событий, например действий пользователя. События, содержащие сведения о действиях пользователя, имеют тип события user_action.

Сведения о действии хранятся в объекте user_action в поле details. Используйте следующий запрос, чтобы создать журнал аудита событий пользователя. Сведения о создании представления, используемого event_log_raw в этом запросе, см. в разделе "Запрос журнала событий".

SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp action user_name
2021-05-20T19:36:03.517+0000 START user@company.com
2021-05-20T19:35:59.913+0000 CREATE user@company.com
2021-05-27T00:35:51.971+0000 START user@company.com

Сведения о среде выполнения

Вы можете просмотреть сведения о среде выполнения для обновления конвейера, например версию Databricks Runtime для обновления:

SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version
11,0