Управление качеством данных с помощью ожиданий в рамках конвейера
Используйте ожидания для применения ограничений качества, которые проверяют данные по мере потоков через конвейеры ETL. Ожидания обеспечивают более подробные сведения о метриках качества данных и позволяют завершать обновления или удалять записи при обнаружении недопустимых записей.
В этой статье представлен обзор ожиданий, включая примеры синтаксиса и параметры поведения. Дополнительные варианты использования и рекомендуемые рекомендации см. в рекомендациях по ожиданиям и расширенных шаблонах.
Что такое ожидания?
Ожидания — это необязательные предложения в материализованном представлении конвейера, потоковой таблице или инструкциях создания представлений, которые применяют проверки качества данных к каждой записи, передаваемой через запрос. Ожидания используют стандартные логические инструкции SQL для указания ограничений. Вы можете объединить множество ожиданий для одного набора данных и установить ожидания для всех объявлений набора данных в конвейере.
В следующих разделах представлены три компонента ожидания и приведены примеры синтаксиса.
Имя ожидания
Каждое ожидание должно иметь имя, которое используется в качестве идентификатора для мониторинга и отслеживания ожиданий. Выберите имя, которое передает проверяемые метрики. В следующем примере определяется ожидание, valid_customer_age
для подтверждения того, что возраст составляет от 0 до 120 лет:
Важный
Имя ожидания должно быть уникальным для заданного набора данных. Ожидания можно использовать повторно в нескольких наборах данных в потоке данных. См. переносимые и повторно используемые условия.
Питон
@dlt.table
@dlt.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
return spark.readStream.table("datasets.samples.raw_customers")
SQL
CREATE OR REFRESH STREAMING TABLE customers(
CONSTRAINT valid_customer_age EXPECT (age BETWEEN 0 AND 120)
) AS SELECT * FROM STREAM(datasets.samples.raw_customers);
Ограничение для оценки
Предложение ограничения — это условный оператор SQL, который должен иметь значение true или false для каждой записи. Ограничение содержит фактическую логику для проверяемого объекта. Если запись не соответствует этому условию, то срабатывает ожидание.
Ограничения должны использовать допустимый синтаксис SQL и не могут содержать следующее:
- Пользовательские функции Python
- Вызовы внешних служб
- Вложенные запросы, ссылающиеся на другие таблицы
Ниже приведены примеры ограничений, которые можно добавить в инструкции создания набора данных:
Питон
# Simple constraint
@dlt.expect("non_negative_price", "price >= 0")
# SQL functions
@dlt.expect("valid_date", "year(transaction_date) >= 2020")
# CASE statements
@dlt.expect("valid_order_status", """
CASE
WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
ELSE false
END
""")
# Multiple constraints
@dlt.expect("non_negative_price", "price >= 0")
@dlt.expect("valid_purchase_date", "date <= current_date()")
# Complex business logic
@dlt.expect(
"valid_subscription_dates",
"""start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'"""
)
# Complex boolean logic
@dlt.expect("valid_order_state", """
(status = 'ACTIVE' AND balance > 0)
OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")
SQL
-- Simple constraint
CONSTRAINT non_negative_price EXPECT (price >= 0)
-- SQL functions
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020)
-- CASE statements
CONSTRAINT valid_order_status EXPECT (
CASE
WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
ELSE false
END
)
-- Multiple constraints
CONSTRAINT non_negative_price EXPECT (price >= 0)
CONSTRAINT valid_purchase_date EXPECT (date <= current_date())
-- Complex business logic
CONSTRAINT valid_subscription_dates EXPECT (
start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'
)
-- Complex boolean logic
CONSTRAINT valid_order_state EXPECT (
(status = 'ACTIVE' AND balance > 0)
OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
)
Действие по недопустимой записи
Необходимо указать действие, чтобы определить, что происходит при сбое проверки записи. В следующей таблице описываются доступные действия.
Действие | Синтаксис SQL | Синтаксис Python | Результат |
---|---|---|---|
предупреждать (по умолчанию) | EXPECT |
dlt.expect |
Недопустимые записи записываются в целевой объект. Количество допустимых и недопустимых записей регистрируется вместе с другими метриками набора данных. |
падение | EXPECT ... ON VIOLATION DROP ROW |
dlt.expect_or_drop |
Недопустимые записи удаляются перед записью данных в целевой объект. Количество удаленных записей регистрируется вместе с другими метриками набора данных. |
сбой | EXPECT ... ON VIOLATION FAIL UPDATE |
dlt.expect_or_fail |
Недопустимые записи препятствуют успешному обновлению. Перед повторной обработкой требуется вмешательство вручную. Это ожидание приводит к сбою одного потока и не приводит к сбою других потоков в конвейере. |
Вы также можете реализовать расширенную логику для карантина недопустимых записей без сбоя или удаления данных. См. недопустимые записи в карантине.
Метрики для отслеживания ожиданий
Метрики отслеживания для действий warn
или drop
можно просмотреть в пользовательском интерфейсе конвейера. Так как fail
приводит к сбою обновления при обнаружении недопустимой записи, метрики не записываются.
Чтобы просмотреть метрики ожидания, выполните следующие действия.
- Щелкните DLT на боковой панели.
- Щелкните название вашего конвейера.
- Нажмите на набор данных, для которого определено ожидание.
- Выберите вкладку качества данных на правой боковой панели.
Вы можете просматривать метрики качества данных, запрашивая журнал событий DLT. См. Проверьте качество данных запроса из журнала событий.
Сохранить недопустимые записи
Сохранение недопустимых записей — это поведение по умолчанию для ожиданий. Используйте оператор expect
, если вы хотите сохранить записи, которые нарушают ожидание, но чтобы собирать метрики о том, сколько записей соответствуют или не соответствуют ограничению. Записи, которые нарушают ожидание, добавляются в целевой набор данных вместе с допустимыми записями:
Питон
@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")
SQL
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')
Удалить недопустимые записи
Используйте оператор expect_or_drop
, чтобы предотвратить дальнейшую обработку недопустимых записей. Записи, которые нарушают ожидание, удаляются из целевого набора данных:
Питон
@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
SQL
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW
Сбой при наличии недопустимых записей
Если недопустимые записи неприемлемы, используйте оператор expect_or_fail
, чтобы немедленно остановить выполнение при сбое проверки записи. Если операция является обновлением таблицы, система атомарно откатывает транзакцию:
Питон
@dlt.expect_or_fail("valid_count", "count > 0")
SQL
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE
Важный
При наличии нескольких параллельных потоков, определенных в конвейере, сбой одного потока не приводит к сбою других потоков.
Устранение неполадок с обновлениями, не оправдавшими ожиданий
Если конвейер завершается сбоем из-за нарушения ожидания, необходимо исправить код конвейера, чтобы правильно обрабатывать недопустимые данные перед повторной запуском конвейера.
Ожидания, настроенные для сбоя конвейеров, изменяют план запросов Spark для отслеживания сведений, необходимых для обнаружения нарушений и отчетов. Эти сведения можно использовать для определения входной записи, которая привела к нарушению для многих запросов. Ниже приведен пример ожидания.
Expectation Violated:
{
"flowName": "sensor-pipeline",
"verboseInfo": {
"expectationsViolated": [
"temperature_in_valid_range"
],
"inputData": {
"id": "TEMP_001",
"temperature": -500,
"timestamp_ms": "1710498600"
},
"outputRecord": {
"sensor_id": "TEMP_001",
"temperature": -500,
"change_time": "2024-03-15 10:30:00"
},
"missingInputData": false
}
}
управление несколькими ожиданиями
Заметка
Хотя SQL и Python поддерживают несколько ожиданий в одном наборе данных, только Python позволяет объединить несколько отдельных ожиданий и указать коллективные действия.
Можно объединить несколько ожиданий и указать коллективные действия с помощью функций expect_all
, expect_all_or_drop
и expect_all_or_fail
.
Эти декораторы принимают словарь Python в качестве аргумента, где ключом является имя ожидания, а значением - ограничение ожидания. Вы можете повторно использовать один набор ожиданий в нескольких наборах данных в конвейере. Ниже приведены примеры каждого из операторов Python expect_all
:
valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}
@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
# Create a raw dataset
@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
# Create a cleaned and prepared dataset
@dlt.table
@dlt.expect_all_or_fail(valid_pages)
def customer_facing_data():
# Create cleaned and prepared to share the dataset