Gegevenskwaliteit beheren met Delta Live Tables
U gebruikt verwachtingen om beperkingen voor gegevenskwaliteit te definiëren voor de inhoud van een gegevensset. Met verwachtingen kunt u garanderen dat gegevens die binnenkomen in tables voldoen aan de vereisten voor gegevenskwaliteit en bieden u inzicht in de gegevenskwaliteit voor elke pijplijn update. U past verwachtingen toe op query's met behulp van Python-decorators of SQL constraint-componenten.
Wat zijn de verwachtingen van Delta Live Tables?
Verwachtingen zijn optionele componenten die u aan Delta Live toevoegt Tables gegevenssetdeclaraties die gegevenskwaliteitscontroles toepassen op elke record die via een query wordt doorgegeven.
Een verwachting bestaat uit drie dingen:
- Een beschrijving, die fungeert als een unieke identifier en waarmee u metrische gegevens voor de constraintkunt bijhouden.
- Een Booleaanse instructie die altijd waar of onwaar retourneert op basis van een bepaalde voorwaarde.
- Een actie die moet worden uitgevoerd wanneer een record de verwachting mislukt, wat betekent dat de Booleaanse waarde onwaar retourneert.
In de volgende matrix ziet u de drie acties die u kunt toepassen op ongeldige records:
Actie | Result |
---|---|
waarschuwen (standaard) | Ongeldige records worden naar het doel geschreven; fout wordt gerapporteerd als een metrische waarde voor de gegevensset. |
druppel | Ongeldige records worden verwijderd voordat gegevens naar het doel worden geschreven; fout wordt gerapporteerd als metrische gegevens voor de gegevensset. |
mislukken | Ongeldige records verhinderen het slagen van de update. Handmatige interventie is vereist voordat de verwerking opnieuw wordt uitgevoerd. |
U kunt metrische gegevens over gegevenskwaliteit bekijken, zoals het aantal records dat een verwachting schendt door een query uit te voeren op het Delta Live-Tables gebeurtenislogboek. Zie Delta Live-Tables-pijplijnen bewaken.
Zie Delta Live Tables Python-taalreferentie of Delta Live Tables SQL-taalreferentievoor een volledig overzicht van de declaratiesyntaxis van Delta Live Tables gegevensset.
Notitie
- Hoewel u meerdere componenten in elke verwachting kunt opnemen, ondersteunt alleen Python het definiëren van acties op basis van meerdere verwachtingen. Bekijk meerdere verwachtingen.
- Verwachtingen moeten worden gedefinieerd met behulp van SQL-expressies. U kunt geen niet-SQL-syntaxis (bijvoorbeeld Python-functies) gebruiken bij het definiëren van een verwachting.
Ongeldige records behouden
Gebruik de expect
operator als u records wilt bewaren die in strijd zijn met de verwachting. Records die in strijd zijn met de verwachting, worden toegevoegd aan de doelgegevensset, samen met geldige records:
Python
@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")
SQL
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')
Ongeldige records verwijderen
Gebruik de expect or drop
operator om verdere verwerking van ongeldige records te voorkomen. Records die in strijd zijn met de verwachting, worden verwijderd uit de doelgegevensset:
Python
@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
SQL
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW
Mislukt bij ongeldige records
Wanneer ongeldige records onacceptabel zijn, gebruikt u de operator om de expect or fail
uitvoering onmiddellijk te stoppen wanneer de validatie van een record mislukt. Als de bewerking een tableupdateis, wordt de transactie atomisch teruggedraaid door het systeem:
Python
@dlt.expect_or_fail("valid_count", "count > 0")
SQL
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE
Belangrijk
Als u meerdere parallelle stromen hebt gedefinieerd in een pijplijn, leidt het mislukken van één stroom er niet toe dat andere stromen mislukken.
Wanneer een pijplijn mislukt vanwege een schending van de verwachting, moet u de pijplijncode corrigeren om de ongeldige gegevens correct te verwerken voordat u de pijplijn opnieuw uitvoert.
Bij het mislukken van de verwachtingen wordt het Spark-queryplan van uw transformaties gewijzigd om informatie bij te houden die nodig is om schendingen te detecteren en te rapporteren. Voor veel query's kunt u deze informatie gebruiken om te bepalen welke invoerrecord heeft geleid tot de schending. Hier volgt een voorbeeld van een uitzondering:
Expectation Violated:
{
"flowName": "a-b",
"verboseInfo": {
"expectationsViolated": [
"x1 is negative"
],
"inputData": {
"a": {"x1": 1,"y1": "a },
"b": {
"x2": 1,
"y2": "aa"
}
},
"outputRecord": {
"x1": 1,
"y1": "a",
"x2": 1,
"y2": "aa"
},
"missingInputData": false
}
}
Meerdere verwachtingen
U kunt verwachtingen definiëren met een of meer beperkingen voor gegevenskwaliteit in Python-pijplijnen. Deze decorators accepteren een Python-woordenlijst als argument, where waarbij de sleutel de naam van de verwachting is en de waarde de verwachting is constraint.
Gebruik expect_all
dit om meerdere beperkingen voor gegevenskwaliteit op te geven wanneer records die niet kunnen worden gevalideerd, moeten worden opgenomen in de doelgegevensset:
@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
Gebruik expect_all_or_drop
dit om meerdere beperkingen voor gegevenskwaliteit op te geven wanneer records die niet kunnen worden gevalideerd, moeten worden verwijderd uit de doelgegevensset:
@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
Gebruik expect_all_or_fail
dit om meerdere beperkingen voor gegevenskwaliteit op te geven wanneer records die mislukken bij validatie, de uitvoering van pijplijnen moeten stoppen:
@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
U kunt ook een verzameling verwachtingen definiëren als een variabele en deze doorgeven aan een of meer query's in uw pijplijn:
valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}
@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
# Create raw dataset
@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
# Create cleaned and prepared dataset
Ongeldige gegevens in quarantaine plaatsen
In het volgende voorbeeld worden verwachtingen gebruikt in combinatie met tijdelijke tables en views. Dit patroon biedt u metrische gegevens voor records die verwachtingen tijdens pijplijnupdates doorgeven en biedt een manier om geldige en ongeldige records via verschillende downstreampaden te verwerken.
Notitie
In dit voorbeeld worden voorbeeldgegevens gelezen die zijn opgenomen in de Databricks-gegevenssets. Omdat de Databricks-gegevenssets niet worden ondersteund met een pijplijn die naar Unity Catalogpubliceert, werkt dit voorbeeld alleen met een pijplijn die is geconfigureerd om te publiceren naar de Hive-metastore. Dit patroon werkt echter ook met Unity Catalog ingeschakelde pijplijnen, maar je moet gegevens lezen van externe locaties. Voor nadere informatie over het gebruik van Unity Catalog met Delta Live Tables, zie Unity Catalog gebruiken met uw Delta Live Tables-pijplijnen.
import dlt
from pyspark.sql.functions import expr
rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))
@dlt.table(
name="raw_farmers_market"
)
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="farmers_market_quarantine",
temporary=True,
partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
return (
spark.read.table("LIVE.raw_farmers_market")
.select("MarketName", "Website", "Location", "State",
"Facebook", "Twitter", "Youtube", "Organic", "updateTime")
.withColumn("is_quarantined", expr(quarantine_rules))
)
@dlt.view(
name="valid_farmers_market"
)
def get_valid_farmers_market():
return (
spark.read.table("LIVE.farmers_market_quarantine")
.filter("is_quarantined=false")
)
@dlt.view(
name="invalid_farmers_market"
)
def get_invalid_farmers_market():
return (
spark.read.table("LIVE.farmers_market_quarantine")
.filter("is_quarantined=true")
)
Het aantal rijen valideren in tables
U kunt een extra table toevoegen aan uw pijplijn die een verwachting definieert voor het vergelijken van het aantal rijen tussen twee gerealiseerde views of streaming tables. De resultaten van deze verwachting worden weergegeven in het gebeurtenislogboek en de Gebruikersinterface van Delta Live Tables. In het volgende voorbeeld wordt het gelijke aantal rijen tussen tbla
en tblb
tablesgevalideerd.
CREATE OR REFRESH MATERIALIZED VIEW count_verification(
CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
(SELECT COUNT(*) AS a_count FROM LIVE.tbla),
(SELECT COUNT(*) AS b_count FROM LIVE.tblb)
Geavanceerde validatie uitvoeren met de verwachtingen van Delta Live Tables
U kunt materialized views definiëren met behulp van aggregaten en join-query's en de resultaten van deze query's gebruiken als onderdeel van uw verwachtingsverificatie. Dit is handig als u bijvoorbeeld complexe gegevenskwaliteitscontroles wilt uitvoeren, zodat een afgeleide table alle records uit de bron-table bevat of de gelijkheid van een numerieke column tussen tablesgarandeert.
In het volgende voorbeeld wordt gevalideerd of alle verwachte records aanwezig zijn in de report
table:
CREATE MATERIALIZED VIEW report_compare_tests(
CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key
In het volgende voorbeeld wordt een statistische functie gebruikt om de uniekheid van een primaire sleutel te garanderen:
CREATE MATERIALIZED VIEW report_pk_tests(
CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk
Verwachtingen draagbaar en herbruikbaar maken
U kunt regels voor gegevenskwaliteit afzonderlijk van uw pijplijn-implementaties onderhouden.
Databricks raadt aan de regels op te slaan in een Delta-table waarbij elke regel wordt gecategoriseerd door een tag. U gebruikt deze tag in gegevenssetdefinities om te bepalen welke regels moeten worden toegepast.
In het volgende voorbeeld wordt een table met de naam rules
gemaakt om regels te onderhouden:
CREATE OR REPLACE TABLE
rules
AS SELECT
col1 AS name,
col2 AS constraint,
col3 AS tag
FROM (
VALUES
("website_not_null","Website IS NOT NULL","validity"),
("location_not_null","Location IS NOT NULL","validity"),
("state_not_null","State IS NOT NULL","validity"),
("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)
In het volgende Python-voorbeeld worden de verwachtingen van gegevenskwaliteit gedefinieerd op basis van de regels die zijn opgeslagen in de rules
table. De get_rules()
-functie leest de regels uit de rules
table en retourneert een Python-woordenlijst met regels die overeenkomen met het tag
argument dat aan de functie is doorgegeven. De woordenlijst wordt toegepast in de @dlt.expect_all_*()
decorators om beperkingen voor gegevenskwaliteit af te dwingen. Records die niet voldoen aan de regels die zijn getagd met validity
, worden bijvoorbeeld verwijderd uit de raw_farmers_market
table:
Notitie
In dit voorbeeld worden voorbeeldgegevens gelezen die zijn opgenomen in de Databricks-gegevenssets. Omdat de Databricks-gegevenssets niet worden ondersteund met een pijplijn die naar Unity Catalogpubliceert, werkt dit voorbeeld alleen met een pijplijn die is geconfigureerd om te publiceren naar de Hive-metastore. Dit patroon werkt echter ook met Unity Catalog ingeschakelde pijplijnen, maar u moet gegevens lezen van externe locaties . Voor meer informatie over het gebruik van Unity Catalog met Delta Live Tables, zie Gebruik Unity Catalog met uw Delta Live Tables-pijplijnen.
import dlt
from pyspark.sql.functions import expr, col
def get_rules(tag):
"""
loads data quality rules from a table
:param tag: tag to match
:return: dictionary of rules that matched the tag
"""
rules = {}
df = spark.read.table("rules")
for row in df.filter(col("tag") == tag).collect():
rules[row['name']] = row['constraint']
return rules
@dlt.table(
name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
return (
spark.read.table("LIVE.raw_farmers_market")
.filter(expr("Organic = 'Y'"))
.select("MarketName", "Website", "State",
"Facebook", "Twitter", "Youtube", "Organic",
"updateTime"
)
)
In plaats van een table met de naam rules
te maken om regels te onderhouden, kunt u een Python-module maken voor hoofdregels, bijvoorbeeld in een bestand met de naam rules_module.py
in dezelfde map als het notebook:
def get_rules_as_list_of_dict():
return [
{
"name": "website_not_null",
"constraint": "Website IS NOT NULL",
"tag": "validity"
},
{
"name": "location_not_null",
"constraint": "Location IS NOT NULL",
"tag": "validity"
},
{
"name": "state_not_null",
"constraint": "State IS NOT NULL",
"tag": "validity"
},
{
"name": "fresh_data",
"constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
"tag": "maintained"
},
{
"name": "social_media_access",
"constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
"tag": "maintained"
}
]
Wijzig vervolgens het voorgaande notebook door de module te importeren en de functie get_rules()
te wijzigen zodat deze uit de module wordt gelezen in plaats van uit de rules
table:
import dlt
from rules_module import *
from pyspark.sql.functions import expr, col
df = spark.createDataFrame(get_rules_as_list_of_dict())
def get_rules(tag):
"""
loads data quality rules from a table
:param tag: tag to match
:return: dictionary of rules that matched the tag
"""
rules = {}
for row in df.filter(col("tag") == tag).collect():
rules[row['name']] = row['constraint']
return rules
@dlt.table(
name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
return (
spark.read.table("LIVE.raw_farmers_market")
.filter(expr("Organic = 'Y'"))
.select("MarketName", "Website", "State",
"Facebook", "Twitter", "Youtube", "Organic",
"updateTime"
)
)