Zarządzanie jakością danych za pomocą tabel delta live
Oczekiwania są używane do definiowania ograniczeń jakości danych dotyczących zawartości zestawu danych. Oczekiwania pozwalają zagwarantować, że dane przychodzące w tabelach spełniają wymagania dotyczące jakości danych i zapewniają wgląd w jakość danych dla każdej aktualizacji potoku. Oczekiwania są stosowane do zapytań przy użyciu dekoratorów języka Python lub klauzul ograniczeń SQL.
Co to są oczekiwania usługi Delta Live Tables?
Oczekiwania to opcjonalne klauzule dodawane do deklaracji zestawu danych delta Live Tables, które stosują kontrole jakości danych dla każdego rekordu przechodzącego przez zapytanie.
Oczekiwanie składa się z trzech rzeczy:
- Opis, który działa jako unikatowy identyfikator i umożliwia śledzenie metryk dla ograniczenia.
- Instrukcja logiczna, która zawsze zwraca wartość true lub false na podstawie określonego warunku.
- Akcja do wykonania, gdy rekord zakończy się niepowodzeniem oczekiwania, co oznacza, że wartość logiczna zwraca wartość false.
Poniższa macierz przedstawia trzy akcje, które można zastosować do nieprawidłowych rekordów:
Akcja | Result |
---|---|
ostrzegaj (ustawienie domyślne) | Nieprawidłowe rekordy są zapisywane w obiekcie docelowym; błąd jest zgłaszany jako metryka dla zestawu danych. |
kropla | Nieprawidłowe rekordy są porzucane, zanim dane zostaną zapisane w obiekcie docelowym; błąd jest zgłaszany jako metryki dla zestawu danych. |
zawieść | Nieprawidłowe rekordy uniemożliwiają pomyślne zaktualizowanie. Interwencja ręczna jest wymagana przed ponownym przetworzeniem. |
Możesz wyświetlić metryki jakości danych, takie jak liczba rekordów, które naruszają oczekiwania, wysyłając zapytanie do dziennika zdarzeń usługi Delta Live Tables. Zobacz Monitor Delta Live Tables pipelines (Monitorowanie potoków tabel na żywo funkcji Delta).
Aby uzyskać pełną dokumentację składni deklaracji zestawu danych delta Live Tables, zobacz Dokumentacja języka Python tabel delta Live Tables lub Dokumentacja języka SQL tabel delta live tables.
Uwaga
- Chociaż można uwzględnić wiele klauzul w dowolnym oczekiwaniu, tylko język Python obsługuje definiowanie akcji na podstawie wielu oczekiwań. Zobacz Wiele oczekiwań.
- Oczekiwania muszą być zdefiniowane przy użyciu wyrażeń SQL. Nie można użyć składni innej niż SQL (na przykład funkcji języka Python) podczas definiowania oczekiwań.
Zachowaj nieprawidłowe rekordy
expect
Użyj operatora , jeśli chcesz przechowywać rekordy naruszające oczekiwania. Rekordy naruszające oczekiwania są dodawane do docelowego zestawu danych wraz z prawidłowymi rekordami:
Python
@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")
SQL
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')
Usuwanie nieprawidłowych rekordów
Użyj operatora , expect or drop
aby zapobiec dalszemu przetwarzaniu nieprawidłowych rekordów. Rekordy naruszające oczekiwania są porzucane z docelowego zestawu danych:
Python
@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
SQL
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW
Niepowodzenie w nieprawidłowych rekordach
Jeśli nieprawidłowe rekordy są niedopuszczalne, użyj expect or fail
operatora , aby zatrzymać wykonywanie natychmiast, gdy rekord zakończy się niepowodzeniem weryfikacji. Jeśli operacja jest aktualizacją tabeli, system niepodzieal cofa transakcję:
Python
@dlt.expect_or_fail("valid_count", "count > 0")
SQL
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE
Ważne
Jeśli masz wiele przepływów równoległych zdefiniowanych w potoku, awaria pojedynczego przepływu nie powoduje niepowodzenia innych przepływów.
Gdy potok zakończy się niepowodzeniem z powodu naruszenia oczekiwań, należy naprawić kod potoku, aby poprawnie obsłużyć nieprawidłowe dane przed ponownym uruchomieniem potoku.
Oczekiwania dotyczące niepowodzenia modyfikują plan zapytania platformy Spark przekształceń, aby śledzić informacje wymagane do wykrywania naruszeń i zgłaszania ich. W przypadku wielu zapytań można użyć tych informacji, aby określić, który rekord wejściowy spowodował naruszenie. Oto przykładowy wyjątek:
Expectation Violated:
{
"flowName": "a-b",
"verboseInfo": {
"expectationsViolated": [
"x1 is negative"
],
"inputData": {
"a": {"x1": 1,"y1": "a },
"b": {
"x2": 1,
"y2": "aa"
}
},
"outputRecord": {
"x1": 1,
"y1": "a",
"x2": 1,
"y2": "aa"
},
"missingInputData": false
}
}
Wiele oczekiwań
Oczekiwania można zdefiniować przy użyciu co najmniej jednego ograniczenia jakości danych w potokach języka Python. Te dekoratory akceptują słownik języka Python jako argument, gdzie klucz jest nazwą oczekiwania, a wartość jest ograniczeniem oczekiwania.
Służy expect_all
do określania wielu ograniczeń dotyczących jakości danych, gdy rekordy, które nie powiodły się weryfikacji, powinny być uwzględnione w docelowym zestawie danych:
@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
Służy expect_all_or_drop
do określania wielu ograniczeń dotyczących jakości danych, gdy rekordy, które nie powiodły się weryfikacji, powinny zostać usunięte z docelowego zestawu danych:
@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
Użyj expect_all_or_fail
polecenia , aby określić wiele ograniczeń dotyczących jakości danych, gdy rekordy, które nie powiodły się, walidacja powinna zatrzymać wykonywanie potoku:
@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
Możesz również zdefiniować kolekcję oczekiwań jako zmienną i przekazać ją do co najmniej jednego zapytania w potoku:
valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}
@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
# Create raw dataset
@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
# Create cleaned and prepared dataset
Kwarantanna nieprawidłowych danych
Poniższy przykład używa oczekiwań w połączeniu z tabelami tymczasowymi i widokami. Ten wzorzec zawiera metryki dla rekordów, które przechodzą testy oczekiwań podczas aktualizacji potoku, oraz umożliwiają przetwarzanie prawidłowych i nieprawidłowych rekordów za pośrednictwem różnych ścieżek podrzędnych.
Uwaga
W tym przykładzie są odczytywane przykładowe dane zawarte w zestawach danych usługi Databricks. Ponieważ zestawy danych usługi Databricks nie są obsługiwane w potoku publikowanym w wykazie aparatu Unity, ten przykład działa tylko z potokiem skonfigurowanym do publikowania w magazynie metadanych Hive. Jednak ten wzorzec działa również z potokami obsługującymi wykaz aparatu Unity, ale musisz odczytywać dane z lokalizacji zewnętrznych. Aby dowiedzieć się więcej o korzystaniu z rozwiązania Unity Catalog z tabelami Delta Live Tables, zobacz Używanie ozwiązania Unity Catalog z potokami platformy Delta Live Tables.
import dlt
from pyspark.sql.functions import expr
rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))
@dlt.table(
name="raw_farmers_market"
)
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="farmers_market_quarantine",
temporary=True,
partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
return (
spark.read.table("LIVE.raw_farmers_market")
.select("MarketName", "Website", "Location", "State",
"Facebook", "Twitter", "Youtube", "Organic", "updateTime")
.withColumn("is_quarantined", expr(quarantine_rules))
)
@dlt.view(
name="valid_farmers_market"
)
def get_valid_farmers_market():
return (
spark.read.table("LIVE.farmers_market_quarantine")
.filter("is_quarantined=false")
)
@dlt.view(
name="invalid_farmers_market"
)
def get_invalid_farmers_market():
return (
spark.read.table("LIVE.farmers_market_quarantine")
.filter("is_quarantined=true")
)
Weryfikowanie liczby wierszy w tabelach
Do potoku można dodać dodatkową tabelę, która definiuje oczekiwania dotyczące porównywania liczby wierszy między dwoma zmaterializowanymi widokami lub tabelami przesyłania strumieniowego. Wyniki tego oczekiwania pojawiają się w dzienniku zdarzeń i interfejsie użytkownika tabel delta Live Tables. Poniższy przykład weryfikuje równe liczby wierszy między tabelami tbla
i tblb
:
CREATE OR REFRESH MATERIALIZED VIEW count_verification(
CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
(SELECT COUNT(*) AS a_count FROM LIVE.tbla),
(SELECT COUNT(*) AS b_count FROM LIVE.tblb)
Przeprowadzanie zaawansowanej walidacji przy użyciu oczekiwań tabel delta Live Tables
Możesz zdefiniować zmaterializowane widoki przy użyciu zapytań agregujących i sprzężonych oraz użyć wyników tych zapytań w ramach sprawdzania oczekiwań. Jest to przydatne, jeśli chcesz wykonać złożone kontrole jakości danych, na przykład zapewnienie, że tabela pochodna zawiera wszystkie rekordy z tabeli źródłowej lub gwarantuje równość kolumny liczbowej między tabelami.
Poniższy przykład sprawdza, czy wszystkie oczekiwane rekordy znajdują się w report
tabeli:
CREATE MATERIALIZED VIEW report_compare_tests(
CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key
W poniższym przykładzie użyto agregacji w celu zapewnienia unikatowości klucza podstawowego:
CREATE MATERIALIZED VIEW report_pk_tests(
CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk
Oczekiwanie dotyczące przenoszenia i wielokrotnego użytku
Reguły jakości danych można zachować niezależnie od implementacji potoku.
Usługa Databricks zaleca przechowywanie reguł w tabeli delty z każdą regułą skategoryzowaną według tagu. Ten tag jest używany w definicjach zestawu danych, aby określić, które reguły mają być stosowane.
Poniższy przykład tworzy tabelę o nazwie rules
w celu zachowania reguł:
CREATE OR REPLACE TABLE
rules
AS SELECT
col1 AS name,
col2 AS constraint,
col3 AS tag
FROM (
VALUES
("website_not_null","Website IS NOT NULL","validity"),
("location_not_null","Location IS NOT NULL","validity"),
("state_not_null","State IS NOT NULL","validity"),
("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)
Poniższy przykład w języku Python definiuje oczekiwania dotyczące jakości danych na podstawie reguł przechowywanych w rules
tabeli. Funkcja get_rules()
odczytuje reguły z rules
tabeli i zwraca słownik języka Python zawierający reguły pasujące do argumentu tag
przekazanego do funkcji. Słownik jest stosowany w dekoratorach w @dlt.expect_all_*()
celu wymuszania ograniczeń dotyczących jakości danych. Na przykład wszystkie rekordy zakończone niepowodzeniem reguł oznaczonych tagiem validity
zostaną usunięte z raw_farmers_market
tabeli:
Uwaga
W tym przykładzie są odczytywane przykładowe dane zawarte w zestawach danych usługi Databricks. Ponieważ zestawy danych usługi Databricks nie są obsługiwane w potoku publikowanym w wykazie aparatu Unity, ten przykład działa tylko z potokiem skonfigurowanym do publikowania w magazynie metadanych Hive. Jednak ten wzorzec działa również z potokami obsługującymi wykaz aparatu Unity, ale musisz odczytywać dane z lokalizacji zewnętrznych. Aby dowiedzieć się więcej o korzystaniu z rozwiązania Unity Catalog z tabelami Delta Live Tables, zobacz Używanie ozwiązania Unity Catalog z potokami platformy Delta Live Tables.
import dlt
from pyspark.sql.functions import expr, col
def get_rules(tag):
"""
loads data quality rules from a table
:param tag: tag to match
:return: dictionary of rules that matched the tag
"""
rules = {}
df = spark.read.table("rules")
for row in df.filter(col("tag") == tag).collect():
rules[row['name']] = row['constraint']
return rules
@dlt.table(
name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
return (
spark.read.table("LIVE.raw_farmers_market")
.filter(expr("Organic = 'Y'"))
.select("MarketName", "Website", "State",
"Facebook", "Twitter", "Youtube", "Organic",
"updateTime"
)
)
Zamiast tworzyć tabelę o nazwie rules
do obsługi reguł, można utworzyć moduł języka Python do głównych reguł, na przykład w pliku o nazwie rules_module.py
w tym samym folderze co notes:
def get_rules_as_list_of_dict():
return [
{
"name": "website_not_null",
"constraint": "Website IS NOT NULL",
"tag": "validity"
},
{
"name": "location_not_null",
"constraint": "Location IS NOT NULL",
"tag": "validity"
},
{
"name": "state_not_null",
"constraint": "State IS NOT NULL",
"tag": "validity"
},
{
"name": "fresh_data",
"constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
"tag": "maintained"
},
{
"name": "social_media_access",
"constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
"tag": "maintained"
}
]
Następnie zmodyfikuj powyższy notes, importując moduł i zmieniając get_rules()
funkcję na odczyt z modułu rules
zamiast z tabeli:
import dlt
from rules_module import *
from pyspark.sql.functions import expr, col
df = spark.createDataFrame(get_rules_as_list_of_dict())
def get_rules(tag):
"""
loads data quality rules from a table
:param tag: tag to match
:return: dictionary of rules that matched the tag
"""
rules = {}
for row in df.filter(col("tag") == tag).collect():
rules[row['name']] = row['constraint']
return rules
@dlt.table(
name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
return (
spark.read.table("LIVE.raw_farmers_market")
.filter(expr("Organic = 'Y'"))
.select("MarketName", "Website", "State",
"Facebook", "Twitter", "Youtube", "Organic",
"updateTime"
)
)