Hantera datakvalitet med pipeline-förväntningar
Använd förväntningar för att tillämpa kvalitetsbegränsningar som validerar data när de flödar via ETL-pipelines. Förväntningar ger större insikt i datakvalitetsmått och gör att du kan misslyckas med uppdateringar eller släppa poster när du identifierar ogiltiga poster.
Den här artikeln innehåller en översikt över förväntningar, inklusive syntaxexempel och beteendealternativ. Mer avancerade användningsfall och rekommenderade metodtips finns i Förväntansrekommendationer och avancerade mönster.
Vad är förväntningar?
Förväntningar är valfria satser i materialiserad vy i pipeline, strömningstabell eller vyskapandeinstruktioner för att tillämpa datakvalitetskontroller på varje post som passerar genom en fråga. Förväntningar använder sql booleska standardinstruktioner för att ange begränsningar. Du kan kombinera flera förväntningar på en enda datauppsättning och ange förväntningar för alla datauppsättningsdeklarationer i en pipeline.
I följande avsnitt beskrivs de tre komponenterna i en förväntan och innehåller syntaxexempel.
Förväntningsnamn
Varje förväntan måste ha ett namn som används som identifierare för att spåra och övervaka förväntningarna. Välj ett namn som kommunicerar de mått som verifieras. I följande exempel definieras förväntan valid_customer_age
för att bekräfta att åldern är mellan 0 och 120 år:
Viktig
Ett namn på förväntningar måste vara unikt för en viss datauppsättning. Du kan återanvända förväntningar i flera datauppsättningar i en pipeline. Se Bärbara och återanvändbara förväntningar.
Python
@dlt.table
@dlt.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
return spark.readStream.table("datasets.samples.raw_customers")
SQL
CREATE OR REFRESH STREAMING TABLE customers(
CONSTRAINT valid_customer_age EXPECT (age BETWEEN 0 AND 120)
) AS SELECT * FROM STREAM(datasets.samples.raw_customers);
Villkor för att utvärdera
En begränsningsklausul är ett SQL-villkor som måste utvärderas till sant eller falskt för varje post. Villkoret innehåller den faktiska logiken för det som verifieras. När en post misslyckas med det här villkoret utlöses förväntningarna.
Begränsningar måste använda giltig SQL-syntax och får inte innehålla följande:
- Anpassade Python-funktioner
- Externa tjänstanrop
- Underfrågor som refererar till andra tabeller
Följande är exempel på begränsningar som kan läggas till i skapandeinstruktioner för datauppsättning:
Python
# Simple constraint
@dlt.expect("non_negative_price", "price >= 0")
# SQL functions
@dlt.expect("valid_date", "year(transaction_date) >= 2020")
# CASE statements
@dlt.expect("valid_order_status", """
CASE
WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
ELSE false
END
""")
# Multiple constraints
@dlt.expect("non_negative_price", "price >= 0")
@dlt.expect("valid_purchase_date", "date <= current_date()")
# Complex business logic
@dlt.expect(
"valid_subscription_dates",
"""start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'"""
)
# Complex boolean logic
@dlt.expect("valid_order_state", """
(status = 'ACTIVE' AND balance > 0)
OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")
SQL
-- Simple constraint
CONSTRAINT non_negative_price EXPECT (price >= 0)
-- SQL functions
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020)
-- CASE statements
CONSTRAINT valid_order_status EXPECT (
CASE
WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
ELSE false
END
)
-- Multiple constraints
CONSTRAINT non_negative_price EXPECT (price >= 0)
CONSTRAINT valid_purchase_date EXPECT (date <= current_date())
-- Complex business logic
CONSTRAINT valid_subscription_dates EXPECT (
start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'
)
-- Complex boolean logic
CONSTRAINT valid_order_state EXPECT (
(status = 'ACTIVE' AND balance > 0)
OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
)
Åtgärd vid ogiltig post
Du måste ange en åtgärd för att avgöra vad som händer när en post inte klarar verifieringen. I följande tabell beskrivs tillgängliga åtgärder:
Handling | SQL-syntax | Python-syntax | Resultat |
---|---|---|---|
varna (standard) | EXPECT |
dlt.expect |
Ogiltiga poster skrivs till målet. Antalet giltiga och ogiltiga poster loggas tillsammans med andra datamängdsmått. |
släppa | EXPECT ... ON VIOLATION DROP ROW |
dlt.expect_or_drop |
Ogiltiga poster tas bort innan data skrivs till måldestinationen. Antalet borttagna poster loggas tillsammans med andra datamängdsmått. |
misslyckas | EXPECT ... ON VIOLATION FAIL UPDATE |
dlt.expect_or_fail |
Ogiltiga rader hindrar uppdateringen från att lyckas. Manuella åtgärder krävs före ombearbetning. Den här förväntan orsakar ett fel i ett enda flöde och orsakar inte att andra flöden i pipelinen misslyckas. |
Du kan också implementera avancerad logik för att placera ogiltiga poster i karantän utan att misslyckas eller ta bort data. Se Sätt ogiltiga poster i karantän.
Mått för förväntansspårning
Du kan se spårningsmått för warn
eller drop
åtgärder från pipelinegränssnittet. Eftersom fail
gör att uppdateringen misslyckas när en ogiltig post identifieras registreras inte mått.
Slutför följande steg för att visa mått för förväntningar:
- Klicka på DLT- i sidofältet.
- Klicka på Namn för din pipeline.
- Klicka på en datauppsättning med en definierad förväntan.
- Välj fliken Datakvalitet i det högra sidofältet.
Du kan visa datakvalitetsmått genom att köra frågor mot DLT-händelseloggen. Se Frågedatakvalitet från händelseloggen.
Behåll ogiltiga poster
Att behålla ogiltiga poster är standardbeteendet för förväntningar. Använd operatorn expect
när du vill behålla poster som strider mot förväntningarna men ändå samla in mått på hur många poster som uppfyller eller inte uppfyller en begränsning. Poster som inte uppfyller förväntningarna läggs till i måldatauppsättningen, tillsammans med de giltiga posterna.
Python
@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")
SQL
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')
Ta bort ogiltiga poster
Använd operatorn expect_or_drop
för att förhindra ytterligare bearbetning av ogiltiga poster. Poster som bryter mot förväntningarna tas bort från måldatasetet.
Python
@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
SQL
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW
Misslyckas vid ogiltiga poster
När ogiltiga poster är oacceptabla, använd operatorn expect_or_fail
för att omedelbart stoppa körningen när en post misslyckas med valideringen. Om åtgärden är en tabelluppdatering återställer systemet atomiskt transaktionen:
Python
@dlt.expect_or_fail("valid_count", "count > 0")
SQL
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE
Viktig
Om du har flera parallella flöden som definierats i en pipeline leder fel i ett enda flöde inte till att andra flöden misslyckas.
Felsöka misslyckade uppdateringar som inte uppfyllde förväntningarna
När en pipeline misslyckas på grund av en förväntansöverträdelse måste du åtgärda pipelinekoden för att hantera ogiltiga data korrekt innan du kör pipelinen igen.
Förväntningar som är konfigurerade för att orsaka pipeline-fel ändrar Spark-frågeplanen för dina omvandlingar så att den kravställda informationen kan spåras vilket krävs för att upptäcka och rapportera överträdelser. Du kan använda den här informationen för att identifiera vilken indatapost som resulterade i överträdelsen för många frågor. Följande är ett exempel på förväntan:
Expectation Violated:
{
"flowName": "sensor-pipeline",
"verboseInfo": {
"expectationsViolated": [
"temperature_in_valid_range"
],
"inputData": {
"id": "TEMP_001",
"temperature": -500,
"timestamp_ms": "1710498600"
},
"outputRecord": {
"sensor_id": "TEMP_001",
"temperature": -500,
"change_time": "2024-03-15 10:30:00"
},
"missingInputData": false
}
}
Hantering av flera förväntningar
Not
Både SQL och Python har stöd för flera förväntningar i en enda datauppsättning, men endast Python låter dig gruppera flera separata förväntningar och ange kollektiva åtgärder.
Du kan gruppera flera förväntningar och ange kollektiva åtgärder med hjälp av funktionerna expect_all
, expect_all_or_drop
och expect_all_or_fail
.
De här dekoratörerna accepterar en Python-ordlista som argument, där nyckeln är förväntansnamnet och värdet är förväntansbegränsningen. Du kan återanvända samma uppsättning förväntningar i flera datauppsättningar i din pipeline. Följande visar exempel på var och en av de expect_all
Python-operatorerna:
valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}
@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
# Create a raw dataset
@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
# Create a cleaned and prepared dataset
@dlt.table
@dlt.expect_all_or_fail(valid_pages)
def customer_facing_data():
# Create cleaned and prepared to share the dataset