Dela via


Hantera datakvalitet med pipeline-förväntningar

Använd förväntningar för att tillämpa kvalitetsbegränsningar som validerar data när de flödar via ETL-pipelines. Förväntningar ger större insikt i datakvalitetsmått och gör att du kan misslyckas med uppdateringar eller släppa poster när du identifierar ogiltiga poster.

Den här artikeln innehåller en översikt över förväntningar, inklusive syntaxexempel och beteendealternativ. Mer avancerade användningsfall och rekommenderade metodtips finns i Förväntansrekommendationer och avancerade mönster.

flödesdiagram för DLT-förväntningar

Vad är förväntningar?

Förväntningar är valfria satser i materialiserad vy i pipeline, strömningstabell eller vyskapandeinstruktioner för att tillämpa datakvalitetskontroller på varje post som passerar genom en fråga. Förväntningar använder sql booleska standardinstruktioner för att ange begränsningar. Du kan kombinera flera förväntningar på en enda datauppsättning och ange förväntningar för alla datauppsättningsdeklarationer i en pipeline.

I följande avsnitt beskrivs de tre komponenterna i en förväntan och innehåller syntaxexempel.

Förväntningsnamn

Varje förväntan måste ha ett namn som används som identifierare för att spåra och övervaka förväntningarna. Välj ett namn som kommunicerar de mått som verifieras. I följande exempel definieras förväntan valid_customer_age för att bekräfta att åldern är mellan 0 och 120 år:

Viktig

Ett namn på förväntningar måste vara unikt för en viss datauppsättning. Du kan återanvända förväntningar i flera datauppsättningar i en pipeline. Se Bärbara och återanvändbara förväntningar.

Python

@dlt.table
@dlt.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
  return spark.readStream.table("datasets.samples.raw_customers")

SQL

CREATE OR REFRESH STREAMING TABLE customers(
  CONSTRAINT valid_customer_age EXPECT (age BETWEEN 0 AND 120)
) AS SELECT * FROM STREAM(datasets.samples.raw_customers);

Villkor för att utvärdera

En begränsningsklausul är ett SQL-villkor som måste utvärderas till sant eller falskt för varje post. Villkoret innehåller den faktiska logiken för det som verifieras. När en post misslyckas med det här villkoret utlöses förväntningarna.

Begränsningar måste använda giltig SQL-syntax och får inte innehålla följande:

  • Anpassade Python-funktioner
  • Externa tjänstanrop
  • Underfrågor som refererar till andra tabeller

Följande är exempel på begränsningar som kan läggas till i skapandeinstruktioner för datauppsättning:

Python

# Simple constraint
@dlt.expect("non_negative_price", "price >= 0")

# SQL functions
@dlt.expect("valid_date", "year(transaction_date) >= 2020")

# CASE statements
@dlt.expect("valid_order_status", """
   CASE
     WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
     WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
     ELSE false
   END
""")

# Multiple constraints
@dlt.expect("non_negative_price", "price >= 0")
@dlt.expect("valid_purchase_date", "date <= current_date()")

# Complex business logic
@dlt.expect(
  "valid_subscription_dates",
  """start_date <= end_date
    AND end_date <= current_date()
    AND start_date >= '2020-01-01'"""
)

# Complex boolean logic
@dlt.expect("valid_order_state", """
   (status = 'ACTIVE' AND balance > 0)
   OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")

SQL

-- Simple constraint
CONSTRAINT non_negative_price EXPECT (price >= 0)

-- SQL functions
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020)

-- CASE statements
CONSTRAINT valid_order_status EXPECT (
  CASE
    WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
    WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
    ELSE false
  END
)

-- Multiple constraints
CONSTRAINT non_negative_price EXPECT (price >= 0)
CONSTRAINT valid_purchase_date EXPECT (date <= current_date())

-- Complex business logic
CONSTRAINT valid_subscription_dates EXPECT (
  start_date <= end_date
  AND end_date <= current_date()
  AND start_date >= '2020-01-01'
)

-- Complex boolean logic
CONSTRAINT valid_order_state EXPECT (
  (status = 'ACTIVE' AND balance > 0)
  OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
)

Åtgärd vid ogiltig post

Du måste ange en åtgärd för att avgöra vad som händer när en post inte klarar verifieringen. I följande tabell beskrivs tillgängliga åtgärder:

Handling SQL-syntax Python-syntax Resultat
varna (standard) EXPECT dlt.expect Ogiltiga poster skrivs till målet. Antalet giltiga och ogiltiga poster loggas tillsammans med andra datamängdsmått.
släppa EXPECT ... ON VIOLATION DROP ROW dlt.expect_or_drop Ogiltiga poster tas bort innan data skrivs till måldestinationen. Antalet borttagna poster loggas tillsammans med andra datamängdsmått.
misslyckas EXPECT ... ON VIOLATION FAIL UPDATE dlt.expect_or_fail Ogiltiga rader hindrar uppdateringen från att lyckas. Manuella åtgärder krävs före ombearbetning. Den här förväntan orsakar ett fel i ett enda flöde och orsakar inte att andra flöden i pipelinen misslyckas.

Du kan också implementera avancerad logik för att placera ogiltiga poster i karantän utan att misslyckas eller ta bort data. Se Sätt ogiltiga poster i karantän.

Mått för förväntansspårning

Du kan se spårningsmått för warn eller drop åtgärder från pipelinegränssnittet. Eftersom fail gör att uppdateringen misslyckas när en ogiltig post identifieras registreras inte mått.

Slutför följande steg för att visa mått för förväntningar:

  1. Klicka på DLT- i sidofältet.
  2. Klicka på Namn för din pipeline.
  3. Klicka på en datauppsättning med en definierad förväntan.
  4. Välj fliken Datakvalitet i det högra sidofältet.

Du kan visa datakvalitetsmått genom att köra frågor mot DLT-händelseloggen. Se Frågedatakvalitet från händelseloggen.

Behåll ogiltiga poster

Att behålla ogiltiga poster är standardbeteendet för förväntningar. Använd operatorn expect när du vill behålla poster som strider mot förväntningarna men ändå samla in mått på hur många poster som uppfyller eller inte uppfyller en begränsning. Poster som inte uppfyller förväntningarna läggs till i måldatauppsättningen, tillsammans med de giltiga posterna.

Python

@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

Ta bort ogiltiga poster

Använd operatorn expect_or_drop för att förhindra ytterligare bearbetning av ogiltiga poster. Poster som bryter mot förväntningarna tas bort från måldatasetet.

Python

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

Misslyckas vid ogiltiga poster

När ogiltiga poster är oacceptabla, använd operatorn expect_or_fail för att omedelbart stoppa körningen när en post misslyckas med valideringen. Om åtgärden är en tabelluppdatering återställer systemet atomiskt transaktionen:

Python

@dlt.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

Viktig

Om du har flera parallella flöden som definierats i en pipeline leder fel i ett enda flöde inte till att andra flöden misslyckas.

förklaringsdiagram för DLT-flödesfel

Felsöka misslyckade uppdateringar som inte uppfyllde förväntningarna

När en pipeline misslyckas på grund av en förväntansöverträdelse måste du åtgärda pipelinekoden för att hantera ogiltiga data korrekt innan du kör pipelinen igen.

Förväntningar som är konfigurerade för att orsaka pipeline-fel ändrar Spark-frågeplanen för dina omvandlingar så att den kravställda informationen kan spåras vilket krävs för att upptäcka och rapportera överträdelser. Du kan använda den här informationen för att identifiera vilken indatapost som resulterade i överträdelsen för många frågor. Följande är ett exempel på förväntan:

Expectation Violated:
{
  "flowName": "sensor-pipeline",
  "verboseInfo": {
    "expectationsViolated": [
      "temperature_in_valid_range"
    ],
    "inputData": {
      "id": "TEMP_001",
      "temperature": -500,
      "timestamp_ms": "1710498600"
    },
    "outputRecord": {
      "sensor_id": "TEMP_001",
      "temperature": -500,
      "change_time": "2024-03-15 10:30:00"
    },
    "missingInputData": false
  }
}

Hantering av flera förväntningar

Not

Både SQL och Python har stöd för flera förväntningar i en enda datauppsättning, men endast Python låter dig gruppera flera separata förväntningar och ange kollektiva åtgärder.

DLT med flera förväntningar fLow graph

Du kan gruppera flera förväntningar och ange kollektiva åtgärder med hjälp av funktionerna expect_all, expect_all_or_dropoch expect_all_or_fail.

De här dekoratörerna accepterar en Python-ordlista som argument, där nyckeln är förväntansnamnet och värdet är förväntansbegränsningen. Du kan återanvända samma uppsättning förväntningar i flera datauppsättningar i din pipeline. Följande visar exempel på var och en av de expect_all Python-operatorerna:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create a raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create a cleaned and prepared dataset

@dlt.table
@dlt.expect_all_or_fail(valid_pages)
def customer_facing_data():
  # Create cleaned and prepared to share the dataset