Поделиться через


Управление качеством данных с помощью ожиданий конвейера

Используйте ожидания для применения ограничений качества, которые проверяют данные по мере потоков через конвейеры ETL. Ожидания обеспечивают более подробные сведения о метриках качества данных и позволяют завершать обновления или удалять записи при обнаружении недопустимых записей.

В этой статье представлен обзор ожиданий, включая примеры синтаксиса и параметры поведения. Дополнительные варианты использования и рекомендуемые рекомендации см. в рекомендациях по ожиданиям и расширенных шаблонах.

график ожиданий для потоков таблиц Delta Live Tables

Что такое ожидания?

Ожидания — это необязательные предложения в материализованном представлении конвейера, потоковой таблице или инструкциях создания представлений, которые применяют проверки качества данных к каждой записи, передаваемой через запрос. Ожидания используют стандартные логические инструкции SQL для указания ограничений. Вы можете объединить несколько ожиданий для одного набора данных и задать ожидания во всех объявлениях набора данных в конвейере.

В следующих разделах представлены три компонента ожидания и приведены примеры синтаксиса.

Имя ожидания

Каждое ожидание должно иметь имя, которое используется в качестве идентификатора для отслеживания и мониторинга ожиданий. Выберите имя, которое передает проверяемые метрики. В следующем примере определяется ожидание, valid_customer_age для подтверждения того, что возраст составляет от 0 до 120 лет:

Внимание

Имя ожидания должно быть уникальным для заданного набора данных. Ожидания можно использовать повторно в нескольких наборах данных в конвейере. См. переносные и повторно используемые ожидания.

Python

@dlt.table
@dlt.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
  return spark.readStream.table("datasets.samples.raw_customers")

SQL

CREATE OR REFRESH STREAMING TABLE customers(
  CONSTRAINT valid_customer_age EXPECT (age BETWEEN 0 AND 120)
) AS SELECT * FROM STREAM(datasets.samples.raw_customers);

Ограничение для оценки

Предложение ограничения — это условный оператор SQL, который должен иметь значение true или false для каждой записи. Ограничение содержит фактическую логику для проверяемого объекта. Если запись не соответствует этому условию, ожидание активируется.

Ограничения должны использовать допустимый синтаксис SQL и не могут содержать следующее:

  • Пользовательские функции на языке программирования Python
  • Вызовы внешних служб
  • Вложенные запросы, ссылающиеся на другие таблицы

Ниже приведены примеры ограничений, которые можно добавить в инструкции создания набора данных:

Python

# Simple constraint
@dlt.expect("non_negative_price", "price >= 0")

# SQL functions
@dlt.expect("valid_date", "year(transaction_date) >= 2020")

# CASE statements
@dlt.expect("valid_order_status", """
   CASE
     WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
     WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
     ELSE false
   END
""")

# Multiple constraints
@dlt.expect("non_negative_price", "price >= 0")
@dlt.expect("valid_purchase_date", "date <= current_date()")

# Complex business logic
@dlt.expect(
  "valid_subscription_dates",
  """start_date <= end_date
    AND end_date <= current_date()
    AND start_date >= '2020-01-01'"""
)

# Complex boolean logic
@dlt.expect("valid_order_state", """
   (status = 'ACTIVE' AND balance > 0)
   OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")

SQL

-- Simple constraint
CONSTRAINT non_negative_price EXPECT (price >= 0)

-- SQL functions
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020)

-- CASE statements
CONSTRAINT valid_order_status EXPECT (
  CASE
    WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
    WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
    ELSE false
  END
)

-- Multiple constraints
CONSTRAINT non_negative_price EXPECT (price >= 0)
CONSTRAINT valid_purchase_date EXPECT (date <= current_date())

-- Complex business logic
CONSTRAINT valid_subscription_dates EXPECT (
  start_date <= end_date
  AND end_date <= current_date()
  AND start_date >= '2020-01-01'
)

-- Complex boolean logic
CONSTRAINT valid_order_state EXPECT (
  (status = 'ACTIVE' AND balance > 0)
  OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
)

Действие с некорректной записью

Необходимо указать действие, чтобы определить, что происходит при сбое проверки записи. В следующей таблице описываются доступные действия.

Действие Синтаксис SQL Синтаксис Python Результат
предупреждение (по умолчанию) EXPECT dlt.expect Недопустимые записи записываются в целевой объект. Количество допустимых и недопустимых записей регистрируется вместе с другими метриками набора данных.
капля EXPECT ... ON VIOLATION DROP ROW dlt.expect_or_drop Недопустимые записи удаляются перед записью данных в целевой объект. Количество удаленных записей регистрируется вместе с другими метриками набора данных.
fail EXPECT ... ON VIOLATION FAIL UPDATE dlt.expect_or_fail Недопустимые записи препятствуют успешному обновлению. Перед повторной обработкой требуется вмешательство вручную. Это ожидание приводит к сбою одного потока и не приводит к сбою других потоков в конвейере.

Вы также можете реализовать расширенную логику для карантина недопустимых записей без сбоя или удаления данных. См. недопустимые записи на карантине .

Метрики для отслеживания ожиданий

Метрики отслеживания для действий warn или drop можно просмотреть из пользовательского интерфейса конвейера. Так как fail приводит к сбою обновления при обнаружении недопустимой записи, метрики не записываются.

Чтобы просмотреть метрики ожидания, выполните следующие действия.

  1. Щелкните Delta Live Tables в боковой панели.
  2. Щелкните название вашего конвейера.
  3. Щелкните набор данных с заданным ожиданием.
  4. На правой боковой панели выберите вкладку качества данных.

Вы можете просматривать метрики качества данных, запрашивая журнал событий Delta Live Tables. См. качество данных запроса изжурнала событий.

Хранение недопустимых записей

Сохранение некорректных записей является поведением по умолчанию для ожиданий. Используйте оператор expect, если вы хотите сохранить записи, которые нарушают ожидание, но собирать метрики о том, сколько записей соответствуют или не соответствуют ограничению. Записи, отличающиеся от ожидания, добавляются в целевой набор данных вместе с допустимыми записями:

Python

@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

Удаление недопустимых записей

expect_or_drop Используйте оператор, чтобы предотвратить дальнейшую обработку недопустимых записей. Записи, отличающиеся от ожидания, удаляются из целевого набора данных:

Python

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

Сбой при наличии недопустимых записей

Если недопустимые записи неприемлемы, используйте expect_or_fail оператор, чтобы остановить выполнение сразу после сбоя проверки записи. Если операция является обновлением таблицы, система атомарно откатывает транзакцию:

Python

@dlt.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

Внимание

При наличии нескольких параллельных потоков, определенных в конвейере, сбой одного потока не приводит к сбою других потоков.

график объяснения сбоя потока Delta Live Tables

Устранение неполадок с обновлениями, которые не оправдали ожиданий.

При сбое конвейера из-за отклонения от ожидаемых результатов необходимо исправить код конвейера, чтобы правильно обработать недопустимые данные и только после этого повторно запустить конвейер.

Ожидания, настроенные для сбоя конвейеров, изменяют план запросов Spark для отслеживания сведений, необходимых для обнаружения нарушений и отчетов. Эти сведения можно использовать для определения входной записи, которая привела к нарушению для многих запросов. Ниже приведен пример ожидания.

Expectation Violated:
{
  "flowName": "sensor-pipeline",
  "verboseInfo": {
    "expectationsViolated": [
      "temperature_in_valid_range"
    ],
    "inputData": {
      "id": "TEMP_001",
      "temperature": -500,
      "timestamp_ms": "1710498600"
    },
    "outputRecord": {
      "sensor_id": "TEMP_001",
      "temperature": -500,
      "change_time": "2024-03-15 10:30:00"
    },
    "missingInputData": false
  }
}

управление несколькими ожиданиями

Примечание.

Хотя SQL и Python поддерживают несколько ожиданий в одном наборе данных, только Python позволяет объединить несколько отдельных ожиданий и указать коллективные действия.

разностные динамические таблицы с несколькими ожиданиями графа fLow

Можно объединить несколько ожиданий и указать коллективные действия с помощью функций expect_all, expect_all_or_dropи expect_all_or_fail.

Эти декораторы принимают словарь Python в качестве аргумента, где ключ — это имя ожидания, а значение — ограничение ожидания. Вы можете повторно использовать один набор ожиданий в нескольких наборах данных в конвейере. Ниже приведены примеры каждого из операторов Python expect_all:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create a raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create a cleaned and prepared dataset

@dlt.table
@dlt.expect_all_or_fail(valid_pages)
def customer_facing_data():
  # Create cleaned and prepared to share the dataset