使用 Delta Live Tables 管理数据质量
使用预期来定义对数据集内容的数据质量约束。 期望使你能够保证进入表中的数据满足数据质量要求,并为每项管道更新提供数据质量的见解。 使用 Python 修饰器或 SQL 约束子句将预期应用于查询。
什么是增量实时表期望?
期望是添加到增量实时表数据集声明的可选子句,这些子句对通过查询的每条记录应用数据质量检查。
期望由三个元素组成:
- 一段说明,它充当唯一标识符,并可用于跟踪约束的指标。
- 一个布尔语句,它始终根据某个规定的条件返回 true 或 false。
- 当记录不满足期望(即布尔语句返回 false)时要执行的操作。
以下矩阵显示了可应用于无效记录的三个操作:
操作 | 结果 |
---|---|
警告(默认值) | 将无效记录写入目标;将失败报告为数据集的指标。 |
删除 | 在将数据写入目标之前删除无效记录;将失败报告为数据集的指标。 |
失败 | 无效记录会阻止更新成功完成。 在重新处理之前需要手动干预。 |
你可以通过查询增量实时表事件日志来查看数据质量指标,例如违反预期的记录数。 请参阅监视增量实时表管道。
有关增量实时表数据集声明语法的完整参考,请参阅增量实时表 Python 语言参考或增量实时表 SQL 语言参考。
注意
- 虽然可以在任一期望中包含多个子句,但只有 Python 支持基于多个期望定义操作。 请参阅多个期望。
- 必须使用 SQL 表达式来定义预期。 定义预期时,不能使用非 SQL 语法(如 Python 函数)。
保留无效记录
当你想要保留不符合预期的记录时,请使用 expect
运算符。 不符合预期的记录将与有效记录一起添加到目标数据集中:
Python
@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")
SQL
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')
删除无效记录
使用 expect or drop
运算符可防止进一步处理无效记录。 不符合预期的记录将从目标数据集中删除:
Python
@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
SQL
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW
出现无效记录时失败
当无效记录不可接受时,使用 expect or fail
运算符在记录未通过验证时立即停止执行。 如果这项操作是表更新,系统会自动回滚事务:
Python
@dlt.expect_or_fail("valid_count", "count > 0")
SQL
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE
重要
如果在管道中定义了多个并行流,则单个流的失败不会导致其他流失败。
当管道因预期违规而失败时,必须在重新运行管道之前修复管道代码,以正确处理无效数据。
预期违规会修改转换的 Spark 查询计划,以跟踪检测和报告违规所需的信息。 对于许多查询,可以使用此信息来识别导致违规的输入记录。 下面是一个异常示例:
Expectation Violated:
{
"flowName": "a-b",
"verboseInfo": {
"expectationsViolated": [
"x1 is negative"
],
"inputData": {
"a": {"x1": 1,"y1": "a },
"b": {
"x2": 1,
"y2": "aa"
}
},
"outputRecord": {
"x1": 1,
"y1": "a",
"x2": 1,
"y2": "aa"
},
"missingInputData": false
}
}
多个预期
可以在 Python 管道中定义具有一个或多个数据质量约束的期望。 这些修饰器接受 Python 字典作为参数,其中键是预期名称,值是预期约束。
当未通过验证的记录应包含在目标数据集中时,请使用 expect_all
指定多个数据质量约束:
@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
当未通过验证的记录应从目标数据集中删除时,请使用 expect_all_or_drop
指定多个数据质量约束:
@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
当未通过验证的记录应停止管道执行时,请使用 expect_all_or_fail
指定多个数据质量约束:
@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
你还可以将一组预期定义为变量,并将其传递给管道中的一个或多个查询:
valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}
@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
# Create raw dataset
@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
# Create cleaned and prepared dataset
隔离无效数据
以下示例将期望与临时表和视图结合使用。 此模式提供在管道更新期间通过期望检查的记录的指标,并提供一种通过不同下游路径处理有效和无效记录的方式。
注意
此示例读取 Databricks 数据集中包含的示例数据。 由于发布到 Unity Catalog 的管道不支持 Databricks 数据集,因此此示例仅适用于配置为发布到 Hive 元存储的管道。 不过,此模式也适用于已启用 Unity Catalog 的管道,但你必须从外部位置读取数据。 若要详细了解如何将 Unity Catalog 与 Delta Live Tables 配合使用,请参阅将 Unity Catalog 与 Delta Live Tables 管道配合使用。
import dlt
from pyspark.sql.functions import expr
rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))
@dlt.table(
name="raw_farmers_market"
)
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="farmers_market_quarantine",
temporary=True,
partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
return (
spark.read.table("LIVE.raw_farmers_market")
.select("MarketName", "Website", "Location", "State",
"Facebook", "Twitter", "Youtube", "Organic", "updateTime")
.withColumn("is_quarantined", expr(quarantine_rules))
)
@dlt.view(
name="valid_farmers_market"
)
def get_valid_farmers_market():
return (
spark.read.table("LIVE.farmers_market_quarantine")
.filter("is_quarantined=false")
)
@dlt.view(
name="invalid_farmers_market"
)
def get_invalid_farmers_market():
return (
spark.read.table("LIVE.farmers_market_quarantine")
.filter("is_quarantined=true")
)
验证跨表的行计数
可以向管道添加一个附加表,用于定义比较两个具体化视图或流式处理表之间的行计数的预期。 该预期的结果显示在事件日志和增量实时表 UI 中。 以下示例验证 tbla
和 tblb
表之间的行计数是否相等:
CREATE OR REFRESH MATERIALIZED VIEW count_verification(
CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
(SELECT COUNT(*) AS a_count FROM LIVE.tbla),
(SELECT COUNT(*) AS b_count FROM LIVE.tblb)
根据 Delta Live Tables 预期执行高级验证
可使用聚合和联接查询定义具体化视图,并将这些查询的结果用作预期检查的一部分。 如果你想要执行复杂的数据质量检查,从而确保派生表包含源表中的所有记录或保证表中数值列的相等性,则此功能非常有用。
以下示例验证 report
表中是否存在所有预期记录:
CREATE MATERIALIZED VIEW report_compare_tests(
CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key
以下示例使用聚合来确保主键的唯一性:
CREATE MATERIALIZED VIEW report_pk_tests(
CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk
使期望可移植且可重用
可以将数据质量规则与管道实现分开维护。
Databricks 建议将规则存储在 Delta 表中,每个规则按标记分类。 在数据集定义中使用此标记来确定要应用的规则。
以下示例创建一个名为 rules
的表来维护规则:
CREATE OR REPLACE TABLE
rules
AS SELECT
col1 AS name,
col2 AS constraint,
col3 AS tag
FROM (
VALUES
("website_not_null","Website IS NOT NULL","validity"),
("location_not_null","Location IS NOT NULL","validity"),
("state_not_null","State IS NOT NULL","validity"),
("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)
以下 Python 示例基于 rules
表中存储的规则定义数据质量期望。 get_rules()
函数从 rules
表读取规则并返回一个 Python 字典,其中包含与传递给该函数的 tag
参数匹配的规则。 该字典应用于 @dlt.expect_all_*()
修饰器以强制实施数据质量约束。 例如,将从 raw_farmers_market
表中删除任何与使用 validity
标记的规则不符的记录:
注意
此示例读取 Databricks 数据集中包含的示例数据。 由于发布到 Unity Catalog 的管道不支持 Databricks 数据集,因此此示例仅适用于配置为发布到 Hive 元存储的管道。 不过,此模式也适用于已启用 Unity Catalog 的管道,但你必须从外部位置读取数据。 若要详细了解如何将 Unity Catalog 与 Delta Live Tables 配合使用,请参阅将 Unity Catalog 与 Delta Live Tables 管道配合使用。
import dlt
from pyspark.sql.functions import expr, col
def get_rules(tag):
"""
loads data quality rules from a table
:param tag: tag to match
:return: dictionary of rules that matched the tag
"""
rules = {}
df = spark.read.table("rules")
for row in df.filter(col("tag") == tag).collect():
rules[row['name']] = row['constraint']
return rules
@dlt.table(
name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
return (
spark.read.table("LIVE.raw_farmers_market")
.filter(expr("Organic = 'Y'"))
.select("MarketName", "Website", "State",
"Facebook", "Twitter", "Youtube", "Organic",
"updateTime"
)
)
可以将 Python 模块创建到主规则中,例如,在笔记本所在的同一文件夹中名为 rules_module.py
的文件中创建 Python 模块,而不是创建名为 rules
的表来维护规则:
def get_rules_as_list_of_dict():
return [
{
"name": "website_not_null",
"constraint": "Website IS NOT NULL",
"tag": "validity"
},
{
"name": "location_not_null",
"constraint": "Location IS NOT NULL",
"tag": "validity"
},
{
"name": "state_not_null",
"constraint": "State IS NOT NULL",
"tag": "validity"
},
{
"name": "fresh_data",
"constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
"tag": "maintained"
},
{
"name": "social_media_access",
"constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
"tag": "maintained"
}
]
然后,通过导入模块并将 get_rules()
函数更改为从模块中读取,而不是从 rules
表中读取来修改前面的笔记本:
import dlt
from rules_module import *
from pyspark.sql.functions import expr, col
df = spark.createDataFrame(get_rules_as_list_of_dict())
def get_rules(tag):
"""
loads data quality rules from a table
:param tag: tag to match
:return: dictionary of rules that matched the tag
"""
rules = {}
for row in df.filter(col("tag") == tag).collect():
rules[row['name']] = row['constraint']
return rules
@dlt.table(
name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
return (
spark.read.table("LIVE.raw_farmers_market")
.filter(expr("Organic = 'Y'"))
.select("MarketName", "Website", "State",
"Facebook", "Twitter", "Youtube", "Organic",
"updateTime"
)
)