教程:使用 SemPy 和 Great Expectations (GX) 验证数据
本教程介绍如何将 SemPy 与 Great Expectations (GX) 配合使用来对 Power BI 语义模型执行数据验证。
本教程演示如何:
- 使用 Great Expectations 的 Fabric 数据源(基于语义链接)验证 Fabric 工作区中数据集的约束。
- 配置 GX 数据上下文、数据资产和期望。
- 使用 GX 检查点查看验证结果。
- 使用语义链接分析原始数据。
先决条件
获取 Microsoft Fabric 订阅。 或者注册免费的 Microsoft Fabric 试用版。
登录 Microsoft Fabric。
使用主页左侧的体验切换器切换到 Synapse 数据科学体验。
- 选择左侧导航窗格中的“工作区”,找到并选中自己的工作区。 此工作区将成为当前工作区。
- 下载零售分析示例 PBIX.pbix 文件。
- 在工作区中,使用“上传”按钮将零售分析示例 PBIX.pbix 文件上传到工作区。
在笔记本中继续操作
great_expectations_tutorial.ipynb 是本教程随附的笔记本。
若要打开本教程随附的笔记本,请按照让系统为数据科学做好准备教程中的说明操作,将该笔记本导入到工作区。
或者,如果要从此页面复制并粘贴代码,则可以创建新的笔记本。
在开始运行代码之前,请务必将湖屋连接到笔记本。
设置笔记本
在本部分中,你将使用必要的模块和数据设置笔记本环境。
- 使用笔记本中的
%pip
内联安装功能从 PyPI 安装SemPy
和相关Great Expectations
库。
# install libraries
%pip install semantic-link great-expectations great_expectations_experimental great_expectations_zipcode_expectations
# load %%dax cell magic
%load_ext sempy
- 导入稍后会用到的模块:
import great_expectations as gx
from great_expectations.expectations.expectation import ExpectationConfiguration
from great_expectations_zipcode_expectations.expectations import expect_column_values_to_be_valid_zip5
设置 GX 数据上下文和数据源
为了开始使用 Great Expectations,首先必须设置 GX 数据上下文。 上下文充当 GX 操作的入口点,并保存所有相关配置。
context = gx.get_context()
现在可以将 Fabric 数据集作为数据源添加到此上下文中,以开始与数据进行交互。 本教程使用标准 Power BI 示例语义模型零售分析示例 .pbix 文件。
ds = context.sources.add_fabric_powerbi("Retail Analysis Data Source", dataset="Retail Analysis Sample PBIX")
指定数据资产
定义“数据资产”以指定要处理的数据子集。 资产可以像完整表一样简单,也可以像自定义数据分析表达式 (DAX) 查询一样复杂。
在这里,你将添加多个资产:
- Power BI 表
- Power BI 度量值
- 自定义 DAX 查询
- 动态管理视图 (DMV) 查询
Power BI 表
将 Power BI 表添加为数据资产。
ds.add_powerbi_table_asset("Store Asset", table="Store")
Power BI 度量值
如果数据集包含预配置的度量值,则可按照与 SemPy 的 evaluate_measure
类似的 API 将度量值添加为资产。
ds.add_powerbi_measure_asset(
"Total Units Asset",
measure="TotalUnits",
groupby_columns=["Time[FiscalYear]", "Time[FiscalMonth]"]
)
DAX
如果要定义自己的度量值或更好地控制特定行,则可以使用自定义 DAX 查询添加 DAX 资产。 在这里,我们通过将两个现有度量值相除来定义 Total Units Ratio
度量值。
ds.add_powerbi_dax_asset(
"Total Units YoY Asset",
dax_string=
"""
EVALUATE SUMMARIZECOLUMNS(
'Time'[FiscalYear],
'Time'[FiscalMonth],
"Total Units Ratio", DIVIDE([Total Units This Year], [Total Units Last Year])
)
"""
)
DMV 查询
在某些情况下,在数据验证过程中使用动态管理视图 (DMV) 计算可能很有帮助。 例如,可以跟踪数据集中引用完整性冲突的数量。 有关详细信息,请参阅“清理数据 = 更快的报表”。
ds.add_powerbi_dax_asset(
"Referential Integrity Violation",
dax_string=
"""
SELECT
[Database_name],
[Dimension_Name],
[RIVIOLATION_COUNT]
FROM $SYSTEM.DISCOVER_STORAGE_TABLES
"""
)
预期
若要向资产添加特定约束,首先必须配置期望套件。 将单个期望添加到每个套件后,可以使用新套件更新在开始时设置的数据上下文。 有关可用预期的完整列表,请参阅 GX 期望库。
首先添加具有两个期望的“零售商店套件”:
- 有效的邮政编码
- 行计数介于 80 到 200 之间的表
suite_store = context.add_expectation_suite("Retail Store Suite")
suite_store.add_expectation(ExpectationConfiguration("expect_column_values_to_be_valid_zip5", { "column": "PostalCode" }))
suite_store.add_expectation(ExpectationConfiguration("expect_table_row_count_to_be_between", { "min_value": 80, "max_value": 200 }))
context.add_or_update_expectation_suite(expectation_suite=suite_store)
TotalUnits
度量值
添加具有一个期望的“零售度量值套件”:
- 列值应大于 50,000
suite_measure = context.add_expectation_suite("Retail Measure Suite")
suite_measure.add_expectation(ExpectationConfiguration(
"expect_column_values_to_be_between",
{
"column": "TotalUnits",
"min_value": 50000
}
))
context.add_or_update_expectation_suite(expectation_suite=suite_measure)
Total Units Ratio
DAX
添加具有一个期望的“零售 DAX 套件”:
- 总单位比率的列值应介于 0.8 和 1.5 之间
suite_dax = context.add_expectation_suite("Retail DAX Suite")
suite_dax.add_expectation(ExpectationConfiguration(
"expect_column_values_to_be_between",
{
"column": "[Total Units Ratio]",
"min_value": 0.8,
"max_value": 1.5
}
))
context.add_or_update_expectation_suite(expectation_suite=suite_dax)
引用完整性冲突 (DMV)
添加具有一个期望的“零售 DMV 套件”:
- RIVIOLATION_COUNT 应为 0
suite_dmv = context.add_expectation_suite("Retail DMV Suite")
# There should be no RI violations
suite_dmv.add_expectation(ExpectationConfiguration(
"expect_column_values_to_be_in_set",
{
"column": "RIVIOLATION_COUNT",
"value_set": [0]
}
))
context.add_or_update_expectation_suite(expectation_suite=suite_dmv)
验证
若要实际针对数据运行指定的预期,请先创建一个检查点并将其添加到上下文中。 有关检查点配置的详细信息,请参阅数据验证工作流。
checkpoint_config = {
"name": f"Retail Analysis Checkpoint",
"validations": [
{
"expectation_suite_name": "Retail Store Suite",
"batch_request": {
"datasource_name": "Retail Analysis Data Source",
"data_asset_name": "Store Asset",
},
},
{
"expectation_suite_name": "Retail Measure Suite",
"batch_request": {
"datasource_name": "Retail Analysis Data Source",
"data_asset_name": "Total Units Asset",
},
},
{
"expectation_suite_name": "Retail DAX Suite",
"batch_request": {
"datasource_name": "Retail Analysis Data Source",
"data_asset_name": "Total Units YoY Asset",
},
},
{
"expectation_suite_name": "Retail DMV Suite",
"batch_request": {
"datasource_name": "Retail Analysis Data Source",
"data_asset_name": "Referential Integrity Violation",
},
},
],
}
checkpoint = context.add_checkpoint(
**checkpoint_config
)
现在,运行检查点并将结果提取为 pandas 数据帧,以便进行简单的格式设置。
result = checkpoint.run()
处理并打印结果。
import pandas as pd
data = []
for run_result in result.run_results:
for validation_result in result.run_results[run_result]["validation_result"]["results"]:
row = {
"Batch ID": run_result.batch_identifier,
"type": validation_result.expectation_config.expectation_type,
"success": validation_result.success
}
row.update(dict(validation_result.result))
data.append(row)
result_df = pd.DataFrame.from_records(data)
result_df[["Batch ID", "type", "success", "element_count", "unexpected_count", "partial_unexpected_list"]]
从这些结果中可以看到,除了通过自定义 DAX 查询定义的“总单位年初至今资产”之外,所有期望都通过了验证。
诊断
使用语义链接,可以提取源数据,以了解哪些确切年份超出了范围。 语义链接提供了用于执行 DAX 查询的内联 magic。 使用语义链接执行传入 GX 数据资产的同一查询,并可视化生成的值。
%%dax "Retail Analysis Sample PBIX"
EVALUATE SUMMARIZECOLUMNS(
'Time'[FiscalYear],
'Time'[FiscalMonth],
"Total Units Ratio", DIVIDE([Total Units This Year], [Total Units Last Year])
)
将这些结果保存在 DataFrame 中。
df = _
绘制结果。
import matplotlib.pyplot as plt
df["Total Units % Change YoY"] = (df["[Total Units Ratio]"] - 1)
df.set_index(["Time[FiscalYear]", "Time[FiscalMonth]"]).plot.bar(y="Total Units % Change YoY")
plt.axhline(0)
plt.axhline(-0.2, color="red", linestyle="dotted")
plt.axhline( 0.5, color="red", linestyle="dotted")
None
从绘图中可以看到,4 月和 7 月略高于范围,然后可以采取进一步的步骤进行调查。
存储 GX 配置
随着数据集中的数据随时间变化,可能需要重新运行刚刚执行的 GX 验证。 目前,数据上下文(包含连接的数据资产、期望套件和检查点)暂时存在,但可以转换为文件上下文以供将来使用。 或者,可以实例化文件上下文(请参阅实例化数据上下文)。
context = context.convert_to_file_context()
保存上下文后,请将 gx
目录复制到湖屋。
重要
此单元格假定你向笔记本添加了湖屋。 如果没有附加湖屋,则你不会看到错误,但以后也无法获取上下文。 如果现在添加湖屋,内核将重启,因此必须重新运行整个笔记本才能返回到这一点。
# copy GX directory to attached lakehouse
!cp -r gx/ /lakehouse/default/Files/gx
现在,可以使用 context = gx.get_context(project_root_dir="<your path here>")
创建将来的上下文,以使用本教程中的所有配置。
例如,在新笔记本中,附加相同的湖屋并使用 context = gx.get_context(project_root_dir="/lakehouse/default/Files/gx")
来检索上下文。
相关内容
查看有关语义链接/SemPy 的其他教程: