Udostępnij za pośrednictwem


Zarządzanie jakością danych przy użyciu oczekiwań przepływu

Użyj oczekiwań, aby zastosować ograniczenia jakości, które weryfikują dane podczas przepływów przez potoki ETL. Oczekiwania zapewniają lepszy wgląd w metryki jakości danych i zezwalają na przerwanie aktualizacji lub porzucanie rekordów w przypadku wykrycia nieprawidłowych rekordów.

Ten artykuł zawiera omówienie oczekiwań, w tym przykłady składni i opcje zachowania. Aby uzyskać bardziej zaawansowane przypadki użycia i zalecane najlepsze rozwiązania, zobacz Zalecenia dotyczące oczekiwań i zaawansowane wzorce.

graf przepływu oczekiwań Delta Live Tables

Jakie są oczekiwania?

Oczekiwania są klauzulami opcjonalnymi w zmaterializowanym widoku w potoku, tabeli strumieniowej lub instrukcjach tworzenia widoku, które stosują kontrole jakości danych dla każdego rekordu, które przechodzą przez zapytanie. Oczekiwania używają standardowych instrukcji logicznych SQL do określania ograniczeń. Można połączyć wiele oczekiwań dla pojedynczego zestawu danych i ustawić oczekiwania dla wszystkich deklaracji zestawów danych w pipeline.

W poniższych sekcjach przedstawiono trzy składniki oczekiwań i podano przykłady składni.

Nazwa oczekiwania

Każde oczekiwanie musi mieć nazwę, która jest używana jako identyfikator do śledzenia i monitorowania oczekiwań. Wybierz nazwę, która komunikuje weryfikowane metryki. W poniższym przykładzie zdefiniowano oczekiwania valid_customer_age, aby potwierdzić, że wiek wynosi od 0 do 120 lat:

Ważne

Nazwa oczekiwania musi być unikatowa dla danego zestawu danych. Możesz ponownie użyć oczekiwań w wielu zestawach danych w pipeline. Zobacz Oczekiwania dotyczące przenośnego i wielokrotnego użytku.

Python

@dlt.table
@dlt.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
  return spark.readStream.table("datasets.samples.raw_customers")

SQL

CREATE OR REFRESH STREAMING TABLE customers(
  CONSTRAINT valid_customer_age EXPECT (age BETWEEN 0 AND 120)
) AS SELECT * FROM STREAM(datasets.samples.raw_customers);

Ograniczenie do oceny

Klauzula ograniczenia jest instrukcją warunkową SQL, która musi mieć wartość true lub false dla każdego rekordu. Ograniczenie zawiera rzeczywistą logikę sprawdzania poprawności. Gdy rekord nie spełnia tego warunku, zostaje uruchamiane oczekiwanie.

Ograniczenia muszą używać prawidłowej składni SQL i nie mogą zawierać następujących elementów:

  • Niestandardowe funkcje języka Python
  • Wywołania usług zewnętrznych
  • Podzapytania odwołujące się do innych tabel

Poniżej przedstawiono przykłady ograniczeń, które można dodać do instrukcji tworzenia zestawu danych:

Python

# Simple constraint
@dlt.expect("non_negative_price", "price >= 0")

# SQL functions
@dlt.expect("valid_date", "year(transaction_date) >= 2020")

# CASE statements
@dlt.expect("valid_order_status", """
   CASE
     WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
     WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
     ELSE false
   END
""")

# Multiple constraints
@dlt.expect("non_negative_price", "price >= 0")
@dlt.expect("valid_purchase_date", "date <= current_date()")

# Complex business logic
@dlt.expect(
  "valid_subscription_dates",
  """start_date <= end_date
    AND end_date <= current_date()
    AND start_date >= '2020-01-01'"""
)

# Complex boolean logic
@dlt.expect("valid_order_state", """
   (status = 'ACTIVE' AND balance > 0)
   OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")

SQL

-- Simple constraint
CONSTRAINT non_negative_price EXPECT (price >= 0)

-- SQL functions
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020)

-- CASE statements
CONSTRAINT valid_order_status EXPECT (
  CASE
    WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
    WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
    ELSE false
  END
)

-- Multiple constraints
CONSTRAINT non_negative_price EXPECT (price >= 0)
CONSTRAINT valid_purchase_date EXPECT (date <= current_date())

-- Complex business logic
CONSTRAINT valid_subscription_dates EXPECT (
  start_date <= end_date
  AND end_date <= current_date()
  AND start_date >= '2020-01-01'
)

-- Complex boolean logic
CONSTRAINT valid_order_state EXPECT (
  (status = 'ACTIVE' AND balance > 0)
  OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
)

Akcja na nieprawidłowym rekordzie

Należy określić akcję, aby określić, co się stanie, gdy rekord zakończy się niepowodzeniem sprawdzania poprawności. W poniższej tabeli opisano dostępne akcje:

Akcja Składnia SQL Składnia języka Python Result
ostrzegaj (ustawienie domyślne) EXPECT dlt.expect Nieprawidłowe rekordy są zapisywane w obiekcie docelowym. Liczba prawidłowych i nieprawidłowych rekordów jest rejestrowana wraz z innymi metrykami zestawu danych.
kropla EXPECT ... ON VIOLATION DROP ROW dlt.expect_or_drop Nieprawidłowe rekordy są porzucane, zanim dane zostaną zapisane w obiekcie docelowym. Liczba porzuconych rekordów jest rejestrowana wraz z innymi metrykami zestawu danych.
zawieść EXPECT ... ON VIOLATION FAIL UPDATE dlt.expect_or_fail Nieprawidłowe rekordy uniemożliwiają pomyślne zaktualizowanie. Interwencja ręczna jest wymagana przed ponownym przetworzeniem. To oczekiwanie powoduje awarię pojedynczego przepływu i nie powoduje niepowodzenia innych przepływów w potoku.

Możesz również zaimplementować zaawansowaną logikę w celu kwarantanny nieprawidłowych rekordów bez niepowodzenia lub porzucania danych. Przejrzyj nieprawidłowe rekordy w kwarantannie .

Metryki monitorowania oczekiwań

Możesz zobaczyć metryki śledzenia akcji warn lub drop z interfejsu użytkownika pipeline'u. Ponieważ fail powoduje niepowodzenie aktualizacji po wykryciu nieprawidłowego rekordu, metryki nie są rejestrowane.

Aby wyświetlić metryki oczekiwań, wykonaj następujące kroki:

  1. Kliknij Delta Live Tables na pasku bocznym.
  2. Kliknij Nazwa potoku.
  3. Kliknij zestaw danych ze zdefiniowanym oczekiwaniem.
  4. Wybierz kartę Jakość danych na prawym pasku bocznym.

Metryki jakości danych można wyświetlić, wysyłając zapytanie do dziennika zdarzeń Delta Live Tables. Zobacz jakość danych zapytania z dziennika zdarzeń .

Zachowaj nieprawidłowe rekordy

Zachowywanie nieprawidłowych rekordów jest domyślnym zachowaniem oczekiwań. Użyj operatora expect, jeśli chcesz zachować rekordy, które naruszają oczekiwania, ale zbierać metryki dotyczące tego, ile rekordów spełnia lub nie spełnia ograniczenia. Rekordy naruszające oczekiwania są dodawane do docelowego zestawu danych wraz z prawidłowymi rekordami:

Python

@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

Usuwanie nieprawidłowych rekordów

Użyj operatora , expect_or_drop aby zapobiec dalszemu przetwarzaniu nieprawidłowych rekordów. Rekordy naruszające oczekiwania są porzucane z docelowego zestawu danych:

Python

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

Niepowodzenie w nieprawidłowych rekordach

Jeśli nieprawidłowe rekordy są niedopuszczalne, użyj expect_or_fail operatora , aby zatrzymać wykonywanie natychmiast, gdy rekord zakończy się niepowodzeniem weryfikacji. Jeśli operacja jest aktualizacją tabeli, system niepodzieal cofa transakcję:

Python

@dlt.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

Ważne

Jeśli masz wiele przepływów równoległych zdefiniowanych w potoku, awaria pojedynczego przepływu nie powoduje niepowodzenia innych przepływów.

wykres wyjaśniający awarię przepływu Delta Live Tables

Rozwiązywanie problemów z nieudanymi aktualizacjami niezgodnymi z oczekiwaniami

Gdy potok zakończy się niepowodzeniem z powodu naruszenia oczekiwań, należy naprawić kod potoku, aby poprawnie obsłużyć nieprawidłowe dane przed ponownym uruchomieniem potoku.

Oczekiwania, które są skonfigurowane tak, aby umożliwić niepowodzenie potoków, modyfikują plan zapytań przekształceń w Spark, aby śledzić informacje potrzebne do wykrywania i zgłaszania naruszeń. Te informacje umożliwiają określenie, który rekord wejściowy spowodował naruszenie wielu zapytań. Poniżej przedstawiono przykładowe oczekiwania:

Expectation Violated:
{
  "flowName": "sensor-pipeline",
  "verboseInfo": {
    "expectationsViolated": [
      "temperature_in_valid_range"
    ],
    "inputData": {
      "id": "TEMP_001",
      "temperature": -500,
      "timestamp_ms": "1710498600"
    },
    "outputRecord": {
      "sensor_id": "TEMP_001",
      "temperature": -500,
      "change_time": "2024-03-15 10:30:00"
    },
    "missingInputData": false
  }
}

zarządzanie wieloma oczekiwaniami

Uwaga

Chociaż zarówno język SQL, jak i Python obsługują wiele oczekiwań w ramach jednego zestawu danych, tylko język Python umożliwia grupowanie wielu oddzielnych oczekiwań i określanie zbiorczych akcji.

Delta Live Tables z wieloma warunkami oczekiwań fLow graph

Można grupować wiele oczekiwań i określać akcje zbiorowe przy użyciu funkcji expect_all, expect_all_or_dropi expect_all_or_fail.

Te dekoratory akceptują słownik języka Python jako argument, gdzie klucz jest nazwą oczekiwania, a wartość jest ograniczeniem oczekiwania. Możesz ponownie użyć tego samego zestawu oczekiwań w wielu zestawach danych w przepływie danych. Poniżej przedstawiono przykłady poszczególnych operatorów języka Python expect_all:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create a raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create a cleaned and prepared dataset

@dlt.table
@dlt.expect_all_or_fail(valid_pages)
def customer_facing_data():
  # Create cleaned and prepared to share the dataset