다음을 통해 공유


파이프라인 기대치를 사용하여 데이터 품질 관리

ETL 파이프라인을 통해 흐르는 데이터의 유효성을 검사하는 품질 제약 조건을 적용하려면 기대치를 사용합니다. 기대치는 데이터 품질 지표에 대한 더 깊은 통찰력을 제공하며, 잘못된 레코드를 감지할 때 업데이트를 중단하거나 레코드를 삭제할 수 있습니다.

이 문서에서는 구문 예제 및 동작 옵션을 포함하여 기대에 대한 개요를 제공합니다. 고급 사용 사례 및 권장 모범 사례는 Expectation 권장 사항 및 고급 패턴참조하세요.

Delta Live Tables 예상 흐름 그래프

기대는 무엇입니까?

기대 조건은 파이프라인에서 생성된 뷰, 스트리밍 테이블 또는 뷰 생성을 위한 문에서 선택적으로 추가할 수 있는 절로, 쿼리를 통과하는 각 레코드에 데이터 품질 검사를 적용합니다. 기대치에는 표준 SQL Boolean 문을 사용하여 제약을 부과합니다. 단일 데이터 세트에 대한 여러 기대치를 결합하고 파이프라인의 모든 데이터 세트 선언에서 기대치를 설정할 수 있습니다.

다음 섹션에서는 기대의 세 가지 구성 요소를 소개하고 구문 예제를 제공합니다.

예상 이름

각 기대치에는 기대를 추적하고 모니터링하는 식별자로 사용되는 이름이 있어야 합니다. 유효성을 검사할 메트릭을 전달하는 이름을 선택합니다. 다음 예제에서는 연령이 0년에서 120년 사이인지 확인하기 위한 예상 valid_customer_age 정의합니다.

Important

예상 이름은 지정된 데이터 세트에 대해 고유해야 합니다. 파이프라인의 여러 데이터 세트에 대한 기대치를 다시 사용할 수 있습니다. 이식 가능하고 재사용 가능한 보세요.

Python

@dlt.table
@dlt.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
  return spark.readStream.table("datasets.samples.raw_customers")

SQL

CREATE OR REFRESH STREAMING TABLE customers(
  CONSTRAINT valid_customer_age EXPECT (age BETWEEN 0 AND 120)
) AS SELECT * FROM STREAM(datasets.samples.raw_customers);

평가할 제약 조건

제약 조건 절은 각 레코드에 대해 true 또는 false로 평가해야 하는 SQL 조건문입니다. 제약 조건에는 유효성 검사 대상에 대한 실제 논리가 포함됩니다. 레코드가 이 조건에 실패하면 기대가 트리거됩니다.

제약 조건은 유효한 SQL 구문을 사용해야 하며 다음을 포함할 수 없습니다.

  • 사용자 지정 Python 함수
  • 외부 서비스 호출
  • 다른 테이블을 참조하는 하위 쿼리

다음은 데이터 세트 만들기 문에 추가할 수 있는 제약 조건의 예입니다.

Python

# Simple constraint
@dlt.expect("non_negative_price", "price >= 0")

# SQL functions
@dlt.expect("valid_date", "year(transaction_date) >= 2020")

# CASE statements
@dlt.expect("valid_order_status", """
   CASE
     WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
     WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
     ELSE false
   END
""")

# Multiple constraints
@dlt.expect("non_negative_price", "price >= 0")
@dlt.expect("valid_purchase_date", "date <= current_date()")

# Complex business logic
@dlt.expect(
  "valid_subscription_dates",
  """start_date <= end_date
    AND end_date <= current_date()
    AND start_date >= '2020-01-01'"""
)

# Complex boolean logic
@dlt.expect("valid_order_state", """
   (status = 'ACTIVE' AND balance > 0)
   OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")

SQL

-- Simple constraint
CONSTRAINT non_negative_price EXPECT (price >= 0)

-- SQL functions
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020)

-- CASE statements
CONSTRAINT valid_order_status EXPECT (
  CASE
    WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
    WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
    ELSE false
  END
)

-- Multiple constraints
CONSTRAINT non_negative_price EXPECT (price >= 0)
CONSTRAINT valid_purchase_date EXPECT (date <= current_date())

-- Complex business logic
CONSTRAINT valid_subscription_dates EXPECT (
  start_date <= end_date
  AND end_date <= current_date()
  AND start_date >= '2020-01-01'
)

-- Complex boolean logic
CONSTRAINT valid_order_state EXPECT (
  (status = 'ACTIVE' AND balance > 0)
  OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
)

잘못된 레코드에 대한 작업

레코드가 검증에 실패했을 때 어떻게 할지 결정하기 위해 작업을 지정해야 합니다. 다음 표에서는 사용 가능한 작업에 대해 설명합니다.

작업 SQL 구문 Python 구문 결과
warn (기본값) EXPECT dlt.expect 잘못된 레코드가 대상에 기록됩니다. 유효하고 잘못된 레코드 수가 다른 데이터 세트 메트릭과 함께 기록됩니다.
drop EXPECT ... ON VIOLATION DROP ROW dlt.expect_or_drop 데이터가 대상에 기록되기 전에 잘못된 레코드가 삭제됩니다. 삭제된 레코드 수는 다른 데이터 세트 메트릭과 함께 기록됩니다.
fail EXPECT ... ON VIOLATION FAIL UPDATE dlt.expect_or_fail 유효하지 않은 레코드 때문에 업데이트가 실패합니다. 재처리하기 전에 수동 개입이 필요합니다. 이러한 기대는 단일 흐름의 실패를 초래하고, 따라서 파이프라인의 다른 흐름들은 실패하지 않습니다.

잘못된 레코드를 격리하면서 데이터가 손실되거나 삭제되지 않도록 고급 논리를 구현할 수도 있습니다. 격리된 잘못된 레코드을 참조하세요.

기대 추적 지표

파이프라인 UI에서 warn 또는 drop 작업에 대한 추적 메트릭을 볼 수 있습니다. fail 잘못된 레코드가 검색될 때 업데이트가 실패하므로 메트릭이 기록되지 않습니다.

예상 메트릭을 보려면 다음 단계를 완료합니다.

  1. 사이드바에서 델타 라이브 테이블 클릭합니다.
  2. 귀하의 파이프라인의 이름을 클릭합니다.
  3. 예상이 정의된 데이터 세트를 클릭합니다.
  4. 오른쪽 사이드바에서 데이터 품질 탭을 선택합니다.

Delta Live Tables 이벤트 로그를 쿼리하여 데이터 품질 메트릭을 볼 수 있습니다. 이벤트 로그쿼리 데이터 품질을 참조하세요.

잘못된 레코드 보관

잘못된 레코드를 유지하는 것은 기대에 대한 기본 동작입니다. 예상을 위반하는 레코드를 유지하지만 제약 조건을 통과하거나 실패한 레코드 수에 대한 메트릭을 수집하려면 expect 연산자를 사용합니다. 예상을 위반하는 레코드는 유효한 레코드와 함께 대상 데이터 세트에 추가됩니다.

Python

@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

잘못된 레코드 삭제

expect_or_drop 연산자를 사용하여 유효하지 않은 레코드의 추가 처리를 방지합니다. 예상을 위반하는 레코드는 대상 데이터 세트에서 삭제됩니다.

Python

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

잘못된 레코드 실패

유효하지 않은 레코드를 허용할 수 없는 경우 expect_or_fail 연산자를 사용하여 레코드가 유효성 검사에 실패하면 즉시 실행을 중지합니다. 작업이 테이블 업데이트인 경우 시스템은 트랜잭션을 자동으로 롤백합니다.

Python

@dlt.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

Important

파이프라인에 여러 병렬 흐름이 정의된 경우 단일 흐름이 실패해도 다른 흐름이 실패하지는 않습니다.

Delta Live Tables 흐름 오류 설명 그래프

예상에서 실패한 업데이트 문제 해결

예상 위반으로 인해 파이프라인이 실패하면 파이프라인을 다시 실행하기 전에 잘못된 데이터를 올바르게 처리하도록 파이프라인 코드를 수정해야 합니다.

파이프라인 실패를 위해 구성된 기대치는 위반을 감지하고 보고하는 데 필요한 정보를 추적하도록 변환의 Spark 쿼리 계획을 수정합니다. 이 정보를 사용하여 많은 쿼리에서 위반을 초래한 입력 레코드를 식별할 수 있습니다. 다음은 예상 예제입니다.

Expectation Violated:
{
  "flowName": "sensor-pipeline",
  "verboseInfo": {
    "expectationsViolated": [
      "temperature_in_valid_range"
    ],
    "inputData": {
      "id": "TEMP_001",
      "temperature": -500,
      "timestamp_ms": "1710498600"
    },
    "outputRecord": {
      "sensor_id": "TEMP_001",
      "temperature": -500,
      "change_time": "2024-03-15 10:30:00"
    },
    "missingInputData": false
  }
}

여러 기대의 관리

참고 항목

SQL과 Python은 단일 데이터 세트 내에서 여러 기대치를 지원하지만 Python만 여러 개의 별도 기대치를 함께 그룹화하고 집단 작업을 지정할 수 있습니다.

여러 기대치를 포함한 그래프가 있는 델타 라이브 테이블

여러 기대치를 그룹화하고 expect_all, expect_all_or_dropexpect_all_or_fail함수를 사용하여 집단 작업을 지정할 수 있습니다.

이러한 데코레이터는 Python 사전을 인수로 허용합니다. 여기서 키는 기대 이름이며 값은 expectation 제약 조건입니다. 파이프라인의 여러 데이터 세트에서 동일한 기대 집합을 다시 사용할 수 있습니다. 다음은 각 expect_all Python 연산자의 예제를 보여 줍니다.

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create a raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create a cleaned and prepared dataset

@dlt.table
@dlt.expect_all_or_fail(valid_pages)
def customer_facing_data():
  # Create cleaned and prepared to share the dataset