Správa kvality dat pomocí Tables Delta Live
Pomocí očekávání definujete omezení kvality dat v obsahu datové sady. Očekávání umožňují zaručit, že data přicházející do tables splňují požadavky na kvalitu dat a poskytují přehled o kvalitě dat pro každý kanál update. Předpoklady uplatníte na dotazy pomocí dekorátorů Pythonu nebo klauzulí SQL constraint.
Jaká jsou očekávání Tables Delta Live?
Očekávání jsou volitelné klauzule, které přidáte do deklarací datové sady Delta Live Tables, které používají kontroly kvality dat u každého záznamu předávajícího dotazem.
Očekávání se skládá ze tří věcí:
- Popis, který funguje jako jedinečný identifier a umožňuje sledovat metriky pro constraint.
- Logický příkaz, který vždy vrátí hodnotu true nebo false na základě některé stavové podmínky.
- Akce, která se má provést, když záznam selže s očekáváním, což znamená, že logická hodnota vrátí hodnotu false.
Následující matice ukazuje tři akce, které můžete použít u neplatných záznamů:
Akce | Výsledek |
---|---|
upozornění (výchozí) | Do cíle jsou zapsány neplatné záznamy; selhání je hlášeno jako metrika pro datovou sadu. |
kapka | Neplatné záznamy se zahodí před zápisem dat do cíle; selhání je hlášeno jako metrika pro datovou sadu. |
selhat | Neplatné záznamy brání úspěšnému dokončení update. Před opětovným zpracováním je vyžadován ruční zásah. |
Metriky kvality dat, například počet záznamů, které porušují očekávání, můžete zobrazit dotazem na protokol událostí Delta Live Tables. Viz Monitorování kanálů Delta Live Tables.
Kompletní referenční informace o syntaxi deklarací datové sady Delta Live Tables najdete v referenční dokumentace jazyka Delta Live Tables jazyka Python nebo referenční informace k jazyku Delta Live Tables jazyka SQL.
Poznámka:
- V jakémkoli očekávání můžete zahrnout více klauzulí, ale Python podporuje definování akcí na základě více očekávání. Podívejte se na několik očekávání.
- Očekávání musí být definována pomocí výrazů SQL. Syntaxi bez SQL (například funkce Pythonu) nelze použít při definování očekávání.
Zachování neplatných záznamů
Operátor expect
použijte, pokud chcete uchovávat záznamy, které porušují očekávání. Záznamy, které porušují očekávání, se přidají do cílové datové sady spolu s platnými záznamy:
Python
@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")
SQL
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')
Odstranění neplatných záznamů
Pomocí operátoru expect or drop
můžete zabránit dalšímu zpracování neplatných záznamů. Záznamy, které porušují očekávání, se z cílové datové sady zahodí:
Python
@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
SQL
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW
Selhání u neplatných záznamů
Pokud jsou neplatné záznamy nepřijatelné, ukončete expect or fail
spuštění okamžitě, když záznam selže s ověřením. Pokud je operace tableupdate, systém atomicky vrátí zpět tuto transakci.
Python
@dlt.expect_or_fail("valid_count", "count > 0")
SQL
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE
Důležité
Pokud máte v kanálu definovaných více paralelních toků, selhání jednoho toku nezpůsobí selhání jiných toků.
Pokud kanál selže z důvodu porušení očekávání, musíte před opětovným spuštěním kanálu opravit kód kanálu, který zpracuje neplatná data správně.
Očekávání selhání upravují plán dotazů Sparku vašich transformací tak, aby sledovaly informace potřebné k detekci a hlášení porušení. U mnoha dotazů můžete pomocí těchto informací zjistit, který vstupní záznam způsobil narušení. Následuje příklad výjimky:
Expectation Violated:
{
"flowName": "a-b",
"verboseInfo": {
"expectationsViolated": [
"x1 is negative"
],
"inputData": {
"a": {"x1": 1,"y1": "a },
"b": {
"x2": 1,
"y2": "aa"
}
},
"outputRecord": {
"x1": 1,
"y1": "a",
"x2": 1,
"y2": "aa"
},
"missingInputData": false
}
}
Více očekávání
V kanálech Pythonu můžete definovat očekávání s jedním nebo více omezeními kvality dat. Tyto dekorátory přijímají slovník Pythonu jako argument, where klíč je název očekávání a hodnota je očekávání constraint.
Slouží expect_all
k určení více omezení kvality dat, pokud by záznamy, které selžou ověření, měly být zahrnuty do cílové datové sady:
@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
Slouží expect_all_or_drop
k určení více omezení kvality dat, když se záznamy, které selžou ověření, by se měly z cílové datové sady vynechat:
@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
Slouží expect_all_or_fail
k určení více omezení kvality dat, pokud by záznamy, které selžou ověření, zastavily provádění kanálu:
@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
Můžete také definovat kolekci očekávání jako proměnnou a předat ji jednomu nebo více dotazům v kanálu:
valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}
@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
# Create raw dataset
@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
# Create cleaned and prepared dataset
Umístit neplatná data do karantény
Následující příklad používá očekávání v kombinaci s dočasnými tables a views. Tento model poskytuje metriky pro záznamy, které během aktualizací kanálu procházejí očekávanými kontrolami, a poskytuje způsob zpracování platných a neplatných záznamů prostřednictvím různých podřízených cest.
Poznámka:
Tento příklad načte ukázková data zahrnutá v datových sadách Databricks. Vzhledem k tomu, že datové sady Databricks nejsou podporovány kanálem, který publikuje do Unity Catalog, tento příklad funguje pouze s kanálem, který je nakonfigurován pro publikování do metastoru Hive. Tento model ale funguje také s povolenými pipeline Unity Catalog, ale musíte číst data z externích umístění. Další informace o používání Catalog Unity s TablesDelta Live najdete v tématu Použití Catalog Unity s vašimi kanály Delta Live Tables.
import dlt
from pyspark.sql.functions import expr
rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))
@dlt.table(
name="raw_farmers_market"
)
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="farmers_market_quarantine",
temporary=True,
partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
return (
spark.read.table("LIVE.raw_farmers_market")
.select("MarketName", "Website", "Location", "State",
"Facebook", "Twitter", "Youtube", "Organic", "updateTime")
.withColumn("is_quarantined", expr(quarantine_rules))
)
@dlt.view(
name="valid_farmers_market"
)
def get_valid_farmers_market():
return (
spark.read.table("LIVE.farmers_market_quarantine")
.filter("is_quarantined=false")
)
@dlt.view(
name="invalid_farmers_market"
)
def get_invalid_farmers_market():
return (
spark.read.table("LIVE.farmers_market_quarantine")
.filter("is_quarantined=true")
)
Ověřte počet řádků napříč tables
Můžete přidat další table do svého kanálu, který nastavuje očekávání pro porovnání počtu řádků mezi dvěma materializovanými views nebo streamovanými tables. Výsledky tohoto očekávání se zobrazí v protokolu událostí a v uživatelském rozhraní Delta Live Tables. Následující příklad ověří počet stejných řádků mezi tbla
a tblb
tables:
CREATE OR REFRESH MATERIALIZED VIEW count_verification(
CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
(SELECT COUNT(*) AS a_count FROM LIVE.tbla),
(SELECT COUNT(*) AS b_count FROM LIVE.tblb)
Provádějte pokročilé ověřování s očekáváními Delta Live Tables.
Materializované views můžete definovat pomocí agregačních a join dotazů a v rámci kontroly očekávání použít výsledky těchto dotazů. To je užitečné, pokud chcete provádět komplexní kontroly kvality dat, například zajistit, že odvozené table obsahuje všechny záznamy ze zdrojového table, nebo zaručit rovnost číselného column napříč tables.
Následující příklad ověří, že jsou v report
tablepřítomny všechny očekávané záznamy:
CREATE MATERIALIZED VIEW report_compare_tests(
CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key
Následující příklad používá agregaci k zajištění jedinečnosti primárního klíče:
CREATE MATERIALIZED VIEW report_pk_tests(
CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk
Přenosná a opakovaně použitelná očekávání
Pravidla kvality dat můžete udržovat odděleně od implementací kanálů.
Databricks doporučuje ukládat pravidla do Delta table s jednotlivými pravidly kategorizovanými podle značky. Tuto značku použijete v definicích datové sady k určení pravidel, která se mají použít.
Následující příklad vytvoří table pojmenovanou rules
pro udržení pravidel:
CREATE OR REPLACE TABLE
rules
AS SELECT
col1 AS name,
col2 AS constraint,
col3 AS tag
FROM (
VALUES
("website_not_null","Website IS NOT NULL","validity"),
("location_not_null","Location IS NOT NULL","validity"),
("state_not_null","State IS NOT NULL","validity"),
("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)
Následující příklad Pythonu definuje očekávání kvality dat na základě pravidel uložených v rules
table. Funkce get_rules()
čte pravidla z rules
table a vrátí slovník Pythonu obsahující pravidla odpovídající argumentu tag
předaného funkci. Slovník se použije v @dlt.expect_all_*()
dekorátorech k vynucení omezení kvality dat. Například všechny záznamy, které nevyhovují pravidlům označeným validity
, se vyřadí z raw_farmers_market
table.
Poznámka:
Tento příklad načte ukázková data zahrnutá v datových sadách Databricks. Vzhledem k tomu, že datové sady Databricks nejsou podporovány kanálem, který publikuje do Unity Catalog, tento příklad funguje pouze s kanálem nakonfigurovaným pro publikování do úložiště Hive. Tento vzorec však funguje také s povolenými kanály Unity Catalog, ale musíte číst data z externích umístění . Další informace o použití Unity Catalog s Delta Live Tablesnajdete v tématu Použití Unity Catalog s kanály Delta Live Tables.
import dlt
from pyspark.sql.functions import expr, col
def get_rules(tag):
"""
loads data quality rules from a table
:param tag: tag to match
:return: dictionary of rules that matched the tag
"""
rules = {}
df = spark.read.table("rules")
for row in df.filter(col("tag") == tag).collect():
rules[row['name']] = row['constraint']
return rules
@dlt.table(
name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
return (
spark.read.table("LIVE.raw_farmers_market")
.filter(expr("Organic = 'Y'"))
.select("MarketName", "Website", "State",
"Facebook", "Twitter", "Youtube", "Organic",
"updateTime"
)
)
Místo vytváření table pojmenovaných rules
pro údržbu pravidel můžete vytvořit modul Pythonu pro hlavní pravidla, například v souboru s názvem rules_module.py
ve stejné složce jako poznámkový blok:
def get_rules_as_list_of_dict():
return [
{
"name": "website_not_null",
"constraint": "Website IS NOT NULL",
"tag": "validity"
},
{
"name": "location_not_null",
"constraint": "Location IS NOT NULL",
"tag": "validity"
},
{
"name": "state_not_null",
"constraint": "State IS NOT NULL",
"tag": "validity"
},
{
"name": "fresh_data",
"constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
"tag": "maintained"
},
{
"name": "social_media_access",
"constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
"tag": "maintained"
}
]
Potom upravte předchozí poznámkový blok importem modulu a změnou funkce get_rules()
na čtení z modulu místo z rules
table:
import dlt
from rules_module import *
from pyspark.sql.functions import expr, col
df = spark.createDataFrame(get_rules_as_list_of_dict())
def get_rules(tag):
"""
loads data quality rules from a table
:param tag: tag to match
:return: dictionary of rules that matched the tag
"""
rules = {}
for row in df.filter(col("tag") == tag).collect():
rules[row['name']] = row['constraint']
return rules
@dlt.table(
name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
return (
spark.read.table("LIVE.raw_farmers_market")
.filter(expr("Organic = 'Y'"))
.select("MarketName", "Website", "State",
"Facebook", "Twitter", "Youtube", "Organic",
"updateTime"
)
)