Spravujte kvalitu dat pomocí očekávání datového toku
Pomocí očekávání použijte omezení kvality, která ověřují data při jejich toku prostřednictvím kanálů ETL. Očekávání poskytují lepší přehled o metrikách kvality dat a umožňují selhání aktualizací nebo vyřazení záznamů při zjišťování neplatných záznamů.
Tento článek obsahuje přehled očekávání, včetně příkladů syntaxe a možností chování. Pokročilejší případy použití a doporučené osvědčené postupy najdete v tématu Doporučení k očekávání a pokročilé vzory.
Co jsou očekávání?
Očekávání jsou volitelné klauzule v materializovaném zobrazení v kanálu, streamovací tabulce nebo příkazech pro vytváření zobrazení, které u každého záznamu procházejícího dotazem provádějí kontroly kvality dat. Očekávání používají standardní logické příkazy SQL k určení omezení. Můžete kombinovat více očekávání pro jednu datovou sadu a nastavit očekávání napříč všemi deklaracemi datových sad v kanálu.
Následující části představují tři komponenty očekávání a poskytují příklady syntaxe.
Název očekávání
Každé očekávání musí mít název, který se používá jako identifikátor ke sledování a monitorování očekávání. Zvolte název, který oznamuje ověřované metriky. Následující příklad definuje očekávání valid_customer_age
k potvrzení, že věk je mezi 0 a 120 lety:
Důležitý
Pro danou datovou sadu musí být jedinečný název očekávání. Očekávání můžete znovu použít napříč několika datovými sadami v pipeline. Viz Přenosné a opakovaně použitelné očekávání.
Python
@dlt.table
@dlt.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
return spark.readStream.table("datasets.samples.raw_customers")
SQL
CREATE OR REFRESH STREAMING TABLE customers(
CONSTRAINT valid_customer_age EXPECT (age BETWEEN 0 AND 120)
) AS SELECT * FROM STREAM(datasets.samples.raw_customers);
Omezení k vyhodnocení
Klauzule constraint je podmíněný příkaz SQL, který musí být vyhodnocen jako true nebo false pro každý záznam. Omezení obsahuje skutečnou logiku toho, co se ověřuje. Pokud záznam selže s touto podmínkou, aktivuje se očekávání.
Omezení musí používat platnou syntaxi SQL a nesmí obsahovat následující:
- Vlastní funkce Pythonu
- Volání externích služeb
- Poddotazy odkazující na jiné tabulky
Tady jsou příklady omezení, která je možné přidat do příkazů pro vytváření datových sad:
Python
# Simple constraint
@dlt.expect("non_negative_price", "price >= 0")
# SQL functions
@dlt.expect("valid_date", "year(transaction_date) >= 2020")
# CASE statements
@dlt.expect("valid_order_status", """
CASE
WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
ELSE false
END
""")
# Multiple constraints
@dlt.expect("non_negative_price", "price >= 0")
@dlt.expect("valid_purchase_date", "date <= current_date()")
# Complex business logic
@dlt.expect(
"valid_subscription_dates",
"""start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'"""
)
# Complex boolean logic
@dlt.expect("valid_order_state", """
(status = 'ACTIVE' AND balance > 0)
OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")
SQL
-- Simple constraint
CONSTRAINT non_negative_price EXPECT (price >= 0)
-- SQL functions
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020)
-- CASE statements
CONSTRAINT valid_order_status EXPECT (
CASE
WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
ELSE false
END
)
-- Multiple constraints
CONSTRAINT non_negative_price EXPECT (price >= 0)
CONSTRAINT valid_purchase_date EXPECT (date <= current_date())
-- Complex business logic
CONSTRAINT valid_subscription_dates EXPECT (
start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'
)
-- Complex boolean logic
CONSTRAINT valid_order_state EXPECT (
(status = 'ACTIVE' AND balance > 0)
OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
)
Akce při neplatném záznamu
Je nutné zadat akci, která určí, co se stane, když záznam selže při kontrole ověření. Následující tabulka popisuje dostupné akce:
Akce | Syntaxe SQL | Syntaxe Pythonu | Výsledek |
---|---|---|---|
varovat (výchozí) | EXPECT |
dlt.expect |
Do cíle se zapisují neplatné záznamy. Počet platných a neplatných záznamů se protokoluje spolu s dalšími metrikami datové sady. |
přetáhnout | EXPECT ... ON VIOLATION DROP ROW |
dlt.expect_or_drop |
Neplatné záznamy se zahodí před zápisem dat do cíle. Počet vynechaných záznamů se protokoluje spolu s dalšími metrikami datové sady. |
selhání | EXPECT ... ON VIOLATION FAIL UPDATE |
dlt.expect_or_fail |
Neplatné záznamy brání úspěšné aktualizaci. Před opětovnou úpravou je vyžadován ruční zásah. Toto očekávání způsobí selhání jednoho toku a nezpůsobí selhání jiných toků ve vašem kanálu. |
Můžete také implementovat pokročilou logiku pro karanténu neplatných záznamů bez selhání nebo vyřazení dat. Viz Karanténa neplatných záznamů.
Metriky sledování očekávání
Z uživatelského rozhraní datového toku můžete zobrazit metriky sledování warn
nebo drop
akcí. Protože fail
způsobí selhání aktualizace při zjištění neplatného záznamu, metriky se nezaznamenávají.
Pokud chcete zobrazit očekávané metriky, proveďte následující kroky:
- Na bočním panelu klikněte na DLT.
- Klikněte na název vaší pipeline.
- Klikněte na datovou sadu s definovaným očekáváním.
- Na pravém bočním panelu vyberte kartu Kvalita dat.
Metriky kvality dat můžete zobrazit dotazováním protokolu událostí DLT. Viz Dotaz na kvalitu dat z protokolu událostí.
Zachovat neplatné záznamy
Zachování neplatných záznamů je výchozím chováním pro očekávání. Operátor expect
použijte, pokud chcete uchovávat záznamy, které porušují očekávání, ale zároveň shromažďovat metriky o tom, kolik záznamů splňuje nebo nesplňuje podmínku. Záznamy, které porušují očekávání, se přidají do cílové datové sady spolu s platnými záznamy:
Python
@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")
SQL
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')
Přetažení neplatných záznamů
Pomocí operátoru expect_or_drop
zabráníte dalšímu zpracování neplatných záznamů. Záznamy, které porušují očekávání, se z cílové datové sady zahodí:
Python
@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
SQL
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW
selhání pro neplatné záznamy
Pokud jsou neplatné záznamy nepřijatelné, pomocí operátoru expect_or_fail
ukončete provádění okamžitě, když se ověření záznamu nezdaří. Pokud je operace aktualizace tabulky, systém atomicky vrátí zpět transakci:
Python
@dlt.expect_or_fail("valid_count", "count > 0")
SQL
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE
Důležitý
Pokud máte v kanálu definovaných více paralelních toků, selhání jednoho toku nezpůsobí selhání jiných toků.
Řešení potíží s neúspěšnými aktualizacemi z očekávání
Pokud kanál selže z důvodu porušení očekávání, musíte před opětovným spuštěním kanálu opravit kód kanálu, který zpracuje neplatná data správně.
Očekávání nakonfigurovaná pro selhávání kanálů upravují dotazovací plán vašich transformací v Sparku, aby sledovaly informace potřebné k identifikaci a hlášení porušení. Tyto informace můžete použít k identifikaci toho, který vstupní záznam způsobil porušení mnoha dotazů. Následuje příklad očekávání:
Expectation Violated:
{
"flowName": "sensor-pipeline",
"verboseInfo": {
"expectationsViolated": [
"temperature_in_valid_range"
],
"inputData": {
"id": "TEMP_001",
"temperature": -500,
"timestamp_ms": "1710498600"
},
"outputRecord": {
"sensor_id": "TEMP_001",
"temperature": -500,
"change_time": "2024-03-15 10:30:00"
},
"missingInputData": false
}
}
Správa více očekávání
Poznámka
I když SQL i Python podporují více očekávání v rámci jedné datové sady, umožňuje pouze Python seskupit několik samostatných očekávání a zadat kolektivní akce.
Pomocí funkcí expect_all
, expect_all_or_drop
a expect_all_or_fail
můžete seskupit více očekávání a určit kolektivní akce.
Tyto dekorátory přijímají slovník Pythonu jako argument, kde klíč je očekávaný název a hodnota je omezení očekávání. Stejnou sadu očekávání můžete použít v několika datových sadách ve vašem pracovním toku. Následující příklady ukazují jednotlivé operátory expect_all
Pythonu:
valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}
@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
# Create a raw dataset
@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
# Create a cleaned and prepared dataset
@dlt.table
@dlt.expect_all_or_fail(valid_pages)
def customer_facing_data():
# Create cleaned and prepared to share the dataset