Gegevenskwaliteit beheren met pijplijnverwachtingen
Gebruik de verwachtingen om kwaliteitsbeperkingen toe te passen die gegevens valideren terwijl deze via ETL-pijplijnen stromen. Verwachtingen bieden meer inzicht in de gegevenskwaliteitsmetriek en stellen u in staat om updates niet uit te voeren of records te verwijderen wanneer ongeldige records worden gedetecteerd.
Dit artikel bevat een overzicht van verwachtingen, waaronder syntaxisvoorbeelden en gedragsopties. Zie Aanbevelingen voor verwachtingen en geavanceerde patronenvoor meer geavanceerde gebruiksvoorbeelden en aanbevolen best practices.
Wat zijn verwachtingen?
Verwachtingen zijn optionele componenten in de gematerialiseerde weergave van de pijplijn, streamingtabel of weergavecreatiestatements die gegevenskwaliteitscontroles toepassen op elke record die door een query passeert. Verwachtingen maken gebruik van standaard SQL-booleaanse instructies om beperkingen op te geven. U kunt meerdere verwachtingen voor één gegevensset combineren en verwachtingen instellen voor alle declaraties van gegevenssets in een pijplijn.
In de volgende secties worden de drie onderdelen van een verwachting geïntroduceerd en worden syntaxisvoorbeelden weergegeven.
Naam van verwachting
Elke verwachting moet een naam hebben, die wordt gebruikt als id om de verwachting bij te houden en te bewaken. Kies een naam die de te valideren meetgegevens communiceert. In het volgende voorbeeld wordt de verwachting gedefinieerd valid_customer_age
om te bevestigen dat de leeftijd tussen 0 en 120 jaar ligt:
Belangrijk
Een verwachtingsnaam moet uniek zijn voor een bepaalde gegevensset. U kunt verwachtingen hergebruiken voor verschillende gegevenssets in een pijplijn. Zie Draagbare en herbruikbare verwachtingen.
Python
@dlt.table
@dlt.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
return spark.readStream.table("datasets.samples.raw_customers")
SQL
CREATE OR REFRESH STREAMING TABLE customers(
CONSTRAINT valid_customer_age EXPECT (age BETWEEN 0 AND 120)
) AS SELECT * FROM STREAM(datasets.samples.raw_customers);
Beperking om te evalueren
De beperkingscomponent is een voorwaardelijke SQL-instructie die voor elke record waar of onwaar moet evalueren. De beperking bevat de werkelijke logica voor wat wordt gevalideerd. Wanneer een record deze voorwaarde mislukt, wordt de verwachting geactiveerd.
Beperkingen moeten geldige SQL-syntaxis gebruiken en mogen niet het volgende bevatten:
- Aangepaste Python-functies
- Externe service-aanroepen
- Subquery's die verwijzen naar andere tabellen
Hier volgen enkele voorbeelden van beperkingen die kunnen worden toegevoegd aan instructies voor het maken van gegevenssets:
Python
# Simple constraint
@dlt.expect("non_negative_price", "price >= 0")
# SQL functions
@dlt.expect("valid_date", "year(transaction_date) >= 2020")
# CASE statements
@dlt.expect("valid_order_status", """
CASE
WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
ELSE false
END
""")
# Multiple constraints
@dlt.expect("non_negative_price", "price >= 0")
@dlt.expect("valid_purchase_date", "date <= current_date()")
# Complex business logic
@dlt.expect(
"valid_subscription_dates",
"""start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'"""
)
# Complex boolean logic
@dlt.expect("valid_order_state", """
(status = 'ACTIVE' AND balance > 0)
OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")
SQL
-- Simple constraint
CONSTRAINT non_negative_price EXPECT (price >= 0)
-- SQL functions
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020)
-- CASE statements
CONSTRAINT valid_order_status EXPECT (
CASE
WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
ELSE false
END
)
-- Multiple constraints
CONSTRAINT non_negative_price EXPECT (price >= 0)
CONSTRAINT valid_purchase_date EXPECT (date <= current_date())
-- Complex business logic
CONSTRAINT valid_subscription_dates EXPECT (
start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'
)
-- Complex boolean logic
CONSTRAINT valid_order_state EXPECT (
(status = 'ACTIVE' AND balance > 0)
OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
)
Actie bij ongeldige vermelding
U moet een actie opgeven om te bepalen wat er gebeurt wanneer een record de validatiecontrole mislukt. In de volgende tabel worden de beschikbare acties beschreven:
Handeling | SQL-syntaxis | Python-syntaxis | Resultaat |
---|---|---|---|
waarschuwen (standaard) | EXPECT |
dlt.expect |
Ongeldige records worden naar de doellocatie geschreven. Het aantal geldige en ongeldige records wordt geregistreerd naast andere metrische gegevenssets. |
laten vallen | EXPECT ... ON VIOLATION DROP ROW |
dlt.expect_or_drop |
Ongeldige records worden verwijderd voordat gegevens naar het doel worden geschreven. Het aantal verwijderde records wordt geregistreerd naast andere metrische gegevenssets. |
mislukt | EXPECT ... ON VIOLATION FAIL UPDATE |
dlt.expect_or_fail |
Ongeldige records verhinderen dat de update slaagt. Handmatige interventie is vereist voordat het opnieuw wordt verwerkt. Deze verwachting veroorzaakt een storing van één stroom en zorgt ervoor dat andere stromen in uw pijplijn niet mislukken. |
U kunt ook geavanceerde logica implementeren om ongeldige records in quarantaine te plaatsen zonder gegevens te mislukken of te verwijderen. Zie ongeldige recordsin quarantaine plaatsen.
metrische gegevens voor het bijhouden van verwachtingen
U kunt bijhoudingsstatistieken voor warn
of drop
acties bekijken vanuit de pijplijn-gebruikersinterface. Omdat fail
ervoor zorgt dat de update mislukt wanneer een ongeldige record wordt gedetecteerd, worden er geen metrische gegevens vastgelegd.
Voer de volgende stappen uit om metrische verwachtingen weer te geven:
- Klik op DLT- in de zijbalk.
- Klik op de naam van uw pijplijn.
- Klik op een gegevensset waarvoor een verwachting is gedefinieerd.
- Selecteer het tabblad Gegevenskwaliteit in de rechterzijbalk.
U kunt metrische gegevens over de kwaliteit van gegevens bekijken door een query uit te voeren op het DLT-gebeurtenislogboek. Zie Gegevenskwaliteit opvragen uit het gebeurtenislogboek.
ongeldige records behouden
Het behouden van ongeldige records is het standaardgedrag voor verwachtingen. Gebruik de operator expect
als u records wilt bewaren die in strijd zijn met de verwachting en om metrische gegevens te verzamelen over hoeveel records een voorwaarde doorstaan of niet voldoen. Records die in strijd zijn met de verwachting, worden toegevoegd aan de doelgegevensset, samen met geldige records:
Python
@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")
SQL
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')
ongeldige records verwijderen
Gebruik de operator expect_or_drop
om verdere verwerking van ongeldige records te voorkomen. Records die in strijd zijn met de verwachting, worden verwijderd uit de doelgegevensset:
Python
@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
SQL
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW
mislukt bij ongeldige records
Wanneer ongeldige records onacceptabel zijn, gebruikt u de operator expect_or_fail
om de uitvoering onmiddellijk te stoppen wanneer de validatie van een record mislukt. Als de bewerking een tabelupdate is, wordt de transactie door het systeem atomisch teruggedraaid:
Python
@dlt.expect_or_fail("valid_count", "count > 0")
SQL
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE
Belangrijk
Als u meerdere parallelle stromen hebt gedefinieerd in een pijplijn, leidt het mislukken van één stroom er niet toe dat andere stromen mislukken.
Problemen oplossen wanneer updates niet aan de verwachtingen voldoen.
Wanneer een pijplijn mislukt vanwege een schending van de verwachting, moet u de pijplijncode corrigeren om de ongeldige gegevens correct te verwerken voordat u de pijplijn opnieuw uitvoert.
Verwachtingen die zo zijn geconfigureerd dat pijplijnen mislukken, wijzigen het Spark-queryplan van uw transformaties om informatie bij te houden die nodig is voor het detecteren en rapporteren van schendingen. U kunt deze informatie gebruiken om te bepalen welke invoerrecord heeft geresulteerd in de schending van veel query's. Hier volgt een voorbeeld van een verwachting:
Expectation Violated:
{
"flowName": "sensor-pipeline",
"verboseInfo": {
"expectationsViolated": [
"temperature_in_valid_range"
],
"inputData": {
"id": "TEMP_001",
"temperature": -500,
"timestamp_ms": "1710498600"
},
"outputRecord": {
"sensor_id": "TEMP_001",
"temperature": -500,
"change_time": "2024-03-15 10:30:00"
},
"missingInputData": false
}
}
Beheer van meerdere verwachtingen
Notitie
Hoewel zowel SQL als Python meerdere verwachtingen binnen één gegevensset ondersteunen, kunt u met Alleen Python meerdere afzonderlijke verwachtingen groeperen en collectieve acties opgeven.
U kunt meerdere verwachtingen groeperen en collectieve acties opgeven met behulp van de functies expect_all
, expect_all_or_drop
en expect_all_or_fail
.
Deze decorators accepteren een Python-woordenlijst als argument, waarbij de sleutel de naam van de verwachting is en de waarde de verwachtingsbeperking is. U kunt dezelfde set verwachtingen in meerdere gegevenssets in uw pijplijn opnieuw gebruiken. Hieronder ziet u voorbeelden van elk van de expect_all
Python-operators:
valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}
@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
# Create a raw dataset
@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
# Create a cleaned and prepared dataset
@dlt.table
@dlt.expect_all_or_fail(valid_pages)
def customer_facing_data():
# Create cleaned and prepared to share the dataset