Gestire la qualità dei dati con Delta Live Tables
Si usano le aspettative per definire i vincoli di qualità dei dati sul contenuto di un set di dati. Le aspettative consentono di garantire che i dati in arrivo nelle tabelle soddisfino i requisiti di qualità dei dati e forniscano informazioni dettagliate sulla qualità dei dati per ogni aggiornamento della pipeline. Si applicano aspettative alle query usando elementi decorator Python o clausole di vincolo SQL.
Quali sono le aspettative di Delta Live Tables?
Le aspettative DLT sono dichiarazioni di set di dati che applicano controlli di qualità dei dati per ogni record che passa attraverso una query.
Un'aspettativa è costituita da tre cose:
- Descrizione, che funge da identificatore univoco e consente di tenere traccia delle metriche per il vincolo.
- Istruzione booleana che restituisce sempre true o false in base a una condizione dichiarata.
- Azione da eseguire quando un record non soddisfa le aspettative, ovvero il valore booleano restituisce false.
La matrice seguente mostra le tre azioni che è possibile applicare ai record non validi:
Azione | Risultato |
---|---|
warn (impostazione predefinita) | I record non validi vengono scritti nella destinazione; l'errore viene segnalato come metrica per il set di dati. |
drop | I record non validi vengono eliminati prima che i dati vengano scritti nella destinazione; l'errore viene segnalato come metriche per il set di dati. |
fail | I record non validi impediscono l'esito positivo dell'aggiornamento. L'intervento manuale è necessario prima della rielaborazione. |
È possibile visualizzare le metriche relative alla qualità dei dati, ad esempio il numero di record che violano le aspettative eseguendo una query sul registro eventi di Delta Live Tables. Vedere Monitorare le pipeline Delta Live Tables.
Per un riferimento completo alla sintassi della dichiarazione del set di dati Delta Live Tables, vedere Informazioni di riferimento sul linguaggio Python per tabelle live Delta o informazioni di riferimento sul linguaggio SQL per le tabelle live Delta.
Nota
- Sebbene sia possibile includere più clausole in qualsiasi previsione, solo Python supporta la definizione di azioni in base a più aspettative. Vedere Aspettative multiple.
- Le aspettative devono essere definite usando espressioni SQL. Non è possibile usare la sintassi non SQL (ad esempio, le funzioni Python) quando si definisce un'aspettativa.
Conservare record non validi
Usare l'operatore expect
quando si desidera mantenere i record che violano le aspettative. I record che violano le aspettative vengono aggiunti al set di dati di destinazione insieme ai record validi:
Python
@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")
SQL
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')
Eliminare record non validi
Utilizzare l'operatore expect or drop
per impedire un'ulteriore elaborazione di record non validi. I record che violano le aspettative vengono eliminati dal set di dati di destinazione:
Python
@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
SQL
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW
Errore nei record non validi
Se i record non validi non sono accettabili, usare l'operatore per arrestare immediatamente l'esecuzione expect or fail
quando un record non riesce la convalida. Se l'operazione è un aggiornamento della tabella, il sistema esegue il rollback della transazione in modo atomico:
Python
@dlt.expect_or_fail("valid_count", "count > 0")
SQL
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE
Importante
Se in una pipeline sono definiti più flussi paralleli, l'errore di un singolo flusso non causa l'esito negativo di altri flussi.
Quando una pipeline ha esito negativo a causa di una violazione delle aspettative, è necessario correggere il codice della pipeline per gestire correttamente i dati non validi prima di eseguire nuovamente la pipeline.
Le aspettative di errore modificano il piano di query Spark delle trasformazioni per tenere traccia delle informazioni necessarie per rilevare e segnalare violazioni. Per molte query, è possibile usare queste informazioni per identificare il record di input che ha generato la violazione. Di seguito è riportata un’immagine di esempio:
Expectation Violated:
{
"flowName": "a-b",
"verboseInfo": {
"expectationsViolated": [
"x1 is negative"
],
"inputData": {
"a": {"x1": 1,"y1": "a },
"b": {
"x2": 1,
"y2": "aa"
}
},
"outputRecord": {
"x1": 1,
"y1": "a",
"x2": 1,
"y2": "aa"
},
"missingInputData": false
}
}
Aspettative multiple
È possibile definire le aspettative con uno o più vincoli di qualità dei dati nelle pipeline Python. Questi elementi decorator accettano un dizionario Python come argomento, dove la chiave è il nome previsto e il valore è il vincolo di attesa.
Usare expect_all
per specificare più vincoli di qualità dei dati quando i record che non superano la convalida devono essere inclusi nel set di dati di destinazione:
@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
Usare expect_all_or_drop
per specificare più vincoli di qualità dei dati quando i record che non superano la convalida devono essere eliminati dal set di dati di destinazione:
@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
Usare expect_all_or_fail
per specificare più vincoli di qualità dei dati quando i record che non superano la convalida devono interrompere l'esecuzione della pipeline:
@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
È anche possibile definire una raccolta di aspettative come variabile e passarla a una o più query nella pipeline:
valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}
@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
# Create raw dataset
@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
# Create cleaned and prepared dataset
Mettere in quarantena dati non validi
Nell'esempio seguente vengono utilizzate le aspettative in combinazione con tabelle e viste temporanee. Questo modello fornisce metriche per i record che superano i controlli delle aspettative durante gli aggiornamenti della pipeline e consente di elaborare record validi e non validi tramite percorsi downstream diversi.
Nota
Questo esempio legge i dati di esempio inclusi nei set di dati di Databricks. Poiché i set di dati di Databricks non sono supportati con una pipeline che pubblica in Unity Catalog, questo esempio funziona solo con una pipeline configurata per la pubblicazione nel metastore Hive. Tuttavia, questo modello funziona anche con le pipeline abilitate per Unity Catalog, ma è necessario leggere i dati da posizioni esterne. Per altre informazioni sull'uso del catalogo Unity con Delta Live Tables, vedere Usare il catalogo Unity con le pipeline Delta Live Tables.
import dlt
from pyspark.sql.functions import expr
rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))
@dlt.table(
name="raw_farmers_market"
)
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="farmers_market_quarantine",
temporary=True,
partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
return (
spark.read.table("LIVE.raw_farmers_market")
.select("MarketName", "Website", "Location", "State",
"Facebook", "Twitter", "Youtube", "Organic", "updateTime")
.withColumn("is_quarantined", expr(quarantine_rules))
)
@dlt.view(
name="valid_farmers_market"
)
def get_valid_farmers_market():
return (
spark.read.table("LIVE.farmers_market_quarantine")
.filter("is_quarantined=false")
)
@dlt.view(
name="invalid_farmers_market"
)
def get_invalid_farmers_market():
return (
spark.read.table("LIVE.farmers_market_quarantine")
.filter("is_quarantined=true")
)
Convalidare i conteggi delle righe tra tabelle
È possibile aggiungere una tabella aggiuntiva alla pipeline che definisce un'aspettativa per confrontare i conteggi delle righe tra due viste materializzate o tabelle di streaming. I risultati di questa aspettativa vengono visualizzati nel registro eventi e nell'interfaccia utente di Delta Live Tables. Nell'esempio seguente viene convalidato il numero di righe uguali tra le tabelle tbla
e tblb
:
CREATE OR REFRESH MATERIALIZED VIEW count_verification(
CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
(SELECT COUNT(*) AS a_count FROM LIVE.tbla),
(SELECT COUNT(*) AS b_count FROM LIVE.tblb)
Eseguire la convalida avanzata con le aspettative di Delta Live Tables
È possibile definire viste materializzate usando query di aggregazione e partecipazione e usare i risultati di tali query come parte del controllo delle aspettative. Ciò è utile se si desidera eseguire controlli di qualità dei dati complessi, ad esempio assicurando che una tabella derivata contenga tutti i record della tabella di origine o garantendo l'uguaglianza di una colonna numerica tra le tabelle.
Nell'esempio seguente viene verificato che nella report
tabella siano presenti tutti i record previsti:
CREATE MATERIALIZED VIEW report_compare_tests(
CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key
Nell'esempio seguente viene usata un'aggregazione per garantire l'univocità di una chiave primaria:
CREATE MATERIALIZED VIEW report_pk_tests(
CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk
Rendere le aspettative portabili e riutilizzabili
È possibile gestire le regole di qualità dei dati separatamente dalle implementazioni della pipeline.
Databricks consiglia di archiviare le regole in una tabella Delta con ogni regola categorizzata da un tag. Questo tag viene usato nelle definizioni dei set di dati per determinare le regole da applicare.
L’esempio seguente crea una tabella denominata rules
per mantenere le regole:
CREATE OR REPLACE TABLE
rules
AS SELECT
col1 AS name,
col2 AS constraint,
col3 AS tag
FROM (
VALUES
("website_not_null","Website IS NOT NULL","validity"),
("location_not_null","Location IS NOT NULL","validity"),
("state_not_null","State IS NOT NULL","validity"),
("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)
L'esempio Python seguente definisce le aspettative sulla qualità dei dati in base alle regole archiviate nella rules
tabella. La get_rules()
funzione legge le regole dalla rules
tabella e restituisce un dizionario Python contenente regole corrispondenti all'argomento tag
passato alla funzione. Il dizionario viene applicato negli @dlt.expect_all_*()
elementi Decorator per applicare vincoli di qualità dei dati. Ad esempio, tutti i record con cui si verificano errori nelle regole contrassegnate validity
verranno eliminati dalla raw_farmers_market
tabella:
Nota
Questo esempio legge i dati di esempio inclusi nei set di dati di Databricks. Poiché i set di dati di Databricks non sono supportati con una pipeline che pubblica in Unity Catalog, questo esempio funziona solo con una pipeline configurata per la pubblicazione nel metastore Hive. Tuttavia, questo modello funziona anche con le pipeline abilitate per Unity Catalog, ma è necessario leggere i dati da posizioni esterne. Per altre informazioni sull'uso del catalogo Unity con Delta Live Tables, vedere Usare il catalogo Unity con le pipeline Delta Live Tables.
import dlt
from pyspark.sql.functions import expr, col
def get_rules(tag):
"""
loads data quality rules from a table
:param tag: tag to match
:return: dictionary of rules that matched the tag
"""
rules = {}
df = spark.read.table("rules")
for row in df.filter(col("tag") == tag).collect():
rules[row['name']] = row['constraint']
return rules
@dlt.table(
name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
return (
spark.read.table("LIVE.raw_farmers_market")
.filter(expr("Organic = 'Y'"))
.select("MarketName", "Website", "State",
"Facebook", "Twitter", "Youtube", "Organic",
"updateTime"
)
)
Anziché creare una tabella denominata rules
per gestire le regole, è possibile creare un modulo Python per le regole principali, ad esempio in un file denominato rules_module.py
nella stessa cartella del notebook:
def get_rules_as_list_of_dict():
return [
{
"name": "website_not_null",
"constraint": "Website IS NOT NULL",
"tag": "validity"
},
{
"name": "location_not_null",
"constraint": "Location IS NOT NULL",
"tag": "validity"
},
{
"name": "state_not_null",
"constraint": "State IS NOT NULL",
"tag": "validity"
},
{
"name": "fresh_data",
"constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
"tag": "maintained"
},
{
"name": "social_media_access",
"constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
"tag": "maintained"
}
]
Modificare quindi il notebook precedente importando il modulo e modificando la get_rules()
funzione per leggere dal modulo anziché dalla rules
tabella:
import dlt
from rules_module import *
from pyspark.sql.functions import expr, col
df = spark.createDataFrame(get_rules_as_list_of_dict())
def get_rules(tag):
"""
loads data quality rules from a table
:param tag: tag to match
:return: dictionary of rules that matched the tag
"""
rules = {}
for row in df.filter(col("tag") == tag).collect():
rules[row['name']] = row['constraint']
return rules
@dlt.table(
name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
return (
spark.read.table("LIVE.raw_farmers_market")
.filter(expr("Organic = 'Y'"))
.select("MarketName", "Website", "State",
"Facebook", "Twitter", "Youtube", "Organic",
"updateTime"
)
)