Sdílet prostřednictvím


Spravujte kvalitu dat pomocí očekávání datového toku

Pomocí očekávání použijte omezení kvality, která ověřují data při jejich toku prostřednictvím kanálů ETL. Očekávání poskytují lepší přehled o metrikách kvality dat a umožňují selhání aktualizací nebo vyřazení záznamů při zjišťování neplatných záznamů.

Tento článek obsahuje přehled očekávání, včetně příkladů syntaxe a možností chování. Pokročilejší případy použití a doporučené osvědčené postupy najdete v tématu Doporučení k očekávání a pokročilé vzory.

graf toku očekávání DLT

Co jsou očekávání?

Očekávání jsou volitelné klauzule v materializovaném zobrazení v kanálu, streamovací tabulce nebo příkazech pro vytváření zobrazení, které u každého záznamu procházejícího dotazem provádějí kontroly kvality dat. Očekávání používají standardní logické příkazy SQL k určení omezení. Můžete kombinovat více očekávání pro jednu datovou sadu a nastavit očekávání napříč všemi deklaracemi datových sad v kanálu.

Následující části představují tři komponenty očekávání a poskytují příklady syntaxe.

Název očekávání

Každé očekávání musí mít název, který se používá jako identifikátor ke sledování a monitorování očekávání. Zvolte název, který oznamuje ověřované metriky. Následující příklad definuje očekávání valid_customer_age k potvrzení, že věk je mezi 0 a 120 lety:

Důležitý

Pro danou datovou sadu musí být jedinečný název očekávání. Očekávání můžete znovu použít napříč několika datovými sadami v pipeline. Viz Přenosné a opakovaně použitelné očekávání.

Python

@dlt.table
@dlt.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
  return spark.readStream.table("datasets.samples.raw_customers")

SQL

CREATE OR REFRESH STREAMING TABLE customers(
  CONSTRAINT valid_customer_age EXPECT (age BETWEEN 0 AND 120)
) AS SELECT * FROM STREAM(datasets.samples.raw_customers);

Omezení k vyhodnocení

Klauzule constraint je podmíněný příkaz SQL, který musí být vyhodnocen jako true nebo false pro každý záznam. Omezení obsahuje skutečnou logiku toho, co se ověřuje. Pokud záznam selže s touto podmínkou, aktivuje se očekávání.

Omezení musí používat platnou syntaxi SQL a nesmí obsahovat následující:

  • Vlastní funkce Pythonu
  • Volání externích služeb
  • Poddotazy odkazující na jiné tabulky

Tady jsou příklady omezení, která je možné přidat do příkazů pro vytváření datových sad:

Python

# Simple constraint
@dlt.expect("non_negative_price", "price >= 0")

# SQL functions
@dlt.expect("valid_date", "year(transaction_date) >= 2020")

# CASE statements
@dlt.expect("valid_order_status", """
   CASE
     WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
     WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
     ELSE false
   END
""")

# Multiple constraints
@dlt.expect("non_negative_price", "price >= 0")
@dlt.expect("valid_purchase_date", "date <= current_date()")

# Complex business logic
@dlt.expect(
  "valid_subscription_dates",
  """start_date <= end_date
    AND end_date <= current_date()
    AND start_date >= '2020-01-01'"""
)

# Complex boolean logic
@dlt.expect("valid_order_state", """
   (status = 'ACTIVE' AND balance > 0)
   OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")

SQL

-- Simple constraint
CONSTRAINT non_negative_price EXPECT (price >= 0)

-- SQL functions
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020)

-- CASE statements
CONSTRAINT valid_order_status EXPECT (
  CASE
    WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
    WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
    ELSE false
  END
)

-- Multiple constraints
CONSTRAINT non_negative_price EXPECT (price >= 0)
CONSTRAINT valid_purchase_date EXPECT (date <= current_date())

-- Complex business logic
CONSTRAINT valid_subscription_dates EXPECT (
  start_date <= end_date
  AND end_date <= current_date()
  AND start_date >= '2020-01-01'
)

-- Complex boolean logic
CONSTRAINT valid_order_state EXPECT (
  (status = 'ACTIVE' AND balance > 0)
  OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
)

Akce při neplatném záznamu

Je nutné zadat akci, která určí, co se stane, když záznam selže při kontrole ověření. Následující tabulka popisuje dostupné akce:

Akce Syntaxe SQL Syntaxe Pythonu Výsledek
varovat (výchozí) EXPECT dlt.expect Do cíle se zapisují neplatné záznamy. Počet platných a neplatných záznamů se protokoluje spolu s dalšími metrikami datové sady.
přetáhnout EXPECT ... ON VIOLATION DROP ROW dlt.expect_or_drop Neplatné záznamy se zahodí před zápisem dat do cíle. Počet vynechaných záznamů se protokoluje spolu s dalšími metrikami datové sady.
selhání EXPECT ... ON VIOLATION FAIL UPDATE dlt.expect_or_fail Neplatné záznamy brání úspěšné aktualizaci. Před opětovnou úpravou je vyžadován ruční zásah. Toto očekávání způsobí selhání jednoho toku a nezpůsobí selhání jiných toků ve vašem kanálu.

Můžete také implementovat pokročilou logiku pro karanténu neplatných záznamů bez selhání nebo vyřazení dat. Viz Karanténa neplatných záznamů.

Metriky sledování očekávání

Z uživatelského rozhraní datového toku můžete zobrazit metriky sledování warn nebo drop akcí. Protože fail způsobí selhání aktualizace při zjištění neplatného záznamu, metriky se nezaznamenávají.

Pokud chcete zobrazit očekávané metriky, proveďte následující kroky:

  1. Na bočním panelu klikněte na DLT.
  2. Klikněte na název vaší pipeline.
  3. Klikněte na datovou sadu s definovaným očekáváním.
  4. Na pravém bočním panelu vyberte kartu Kvalita dat.

Metriky kvality dat můžete zobrazit dotazováním protokolu událostí DLT. Viz Dotaz na kvalitu dat z protokolu událostí.

Zachovat neplatné záznamy

Zachování neplatných záznamů je výchozím chováním pro očekávání. Operátor expect použijte, pokud chcete uchovávat záznamy, které porušují očekávání, ale zároveň shromažďovat metriky o tom, kolik záznamů splňuje nebo nesplňuje podmínku. Záznamy, které porušují očekávání, se přidají do cílové datové sady spolu s platnými záznamy:

Python

@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

Přetažení neplatných záznamů

Pomocí operátoru expect_or_drop zabráníte dalšímu zpracování neplatných záznamů. Záznamy, které porušují očekávání, se z cílové datové sady zahodí:

Python

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

selhání pro neplatné záznamy

Pokud jsou neplatné záznamy nepřijatelné, pomocí operátoru expect_or_fail ukončete provádění okamžitě, když se ověření záznamu nezdaří. Pokud je operace aktualizace tabulky, systém atomicky vrátí zpět transakci:

Python

@dlt.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

Důležitý

Pokud máte v kanálu definovaných více paralelních toků, selhání jednoho toku nezpůsobí selhání jiných toků.

Graf vysvětlení selhání toku DLT

Řešení potíží s neúspěšnými aktualizacemi z očekávání

Pokud kanál selže z důvodu porušení očekávání, musíte před opětovným spuštěním kanálu opravit kód kanálu, který zpracuje neplatná data správně.

Očekávání nakonfigurovaná pro selhávání kanálů upravují dotazovací plán vašich transformací v Sparku, aby sledovaly informace potřebné k identifikaci a hlášení porušení. Tyto informace můžete použít k identifikaci toho, který vstupní záznam způsobil porušení mnoha dotazů. Následuje příklad očekávání:

Expectation Violated:
{
  "flowName": "sensor-pipeline",
  "verboseInfo": {
    "expectationsViolated": [
      "temperature_in_valid_range"
    ],
    "inputData": {
      "id": "TEMP_001",
      "temperature": -500,
      "timestamp_ms": "1710498600"
    },
    "outputRecord": {
      "sensor_id": "TEMP_001",
      "temperature": -500,
      "change_time": "2024-03-15 10:30:00"
    },
    "missingInputData": false
  }
}

Správa více očekávání

Poznámka

I když SQL i Python podporují více očekávání v rámci jedné datové sady, umožňuje pouze Python seskupit několik samostatných očekávání a zadat kolektivní akce.

DLT s více očekáváními fLow graf

Pomocí funkcí expect_all, expect_all_or_dropa expect_all_or_failmůžete seskupit více očekávání a určit kolektivní akce.

Tyto dekorátory přijímají slovník Pythonu jako argument, kde klíč je očekávaný název a hodnota je omezení očekávání. Stejnou sadu očekávání můžete použít v několika datových sadách ve vašem pracovním toku. Následující příklady ukazují jednotlivé operátory expect_all Pythonu:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create a raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create a cleaned and prepared dataset

@dlt.table
@dlt.expect_all_or_fail(valid_pages)
def customer_facing_data():
  # Create cleaned and prepared to share the dataset