教程:使用 SemPy 和 Great Expectations (GX) 验证数据

本教程介绍如何将 SemPy 与 Great Expectations (GX) 结合使用,对 Power BI 语义模型执行数据验证。

本教程介绍如何:

  • 使用 Great Expectations 的 Fabric 数据源(基于语义链接)验证 Fabric 工作区中数据集的约束。
    • 配置 GX 数据上下文、数据资产和期望。
    • 使用 GX 检查点查看验证结果。
  • 使用语义链接分析原始数据。

先决条件

  • 从左侧导航窗格中选择 工作区 以查找并选择工作区。 此工作区是您的当前工作区。
  • 下载 零售分析示例 PBIX.pbix 文件。
  • 在工作区中,选择“导入”>“报表或分页报表”>“从此计算机”,将“Retail Analysis Sample PBIX.pbix”文件上传到工作区

在笔记本中继续操作

great_expectations_tutorial.ipynb 是本教程随附的笔记本。

设置笔记本

在本部分中,你将使用必要的模块和数据设置笔记本环境。

  1. 使用笔记本中的 %pip 内联安装功能从 PyPI 安装 SemPy 和相关 Great Expectations 库。
# install libraries
%pip install semantic-link 'great-expectations<1.0' great_expectations_experimental great_expectations_zipcode_expectations

# load %%dax cell magic
%load_ext sempy
  1. 执行必要的模块导入,以备后用:
import great_expectations as gx
from great_expectations.expectations.expectation import ExpectationConfiguration
from great_expectations_zipcode_expectations.expectations import expect_column_values_to_be_valid_zip5

设置 GX 数据上下文和数据源

若要开始使用 Great Expectations,首先必须设置 GX 数据上下文。 上下文充当 GX 操作的入口点,并保存所有相关配置。

context = gx.get_context()

现在可以将 Fabric 数据集作为 数据源 添加到此上下文中,开始与数据交互。 本教程使用标准 Power BI 示例语义模型 零售分析示例 .pbix 文件

ds = context.sources.add_fabric_powerbi("Retail Analysis Data Source", dataset="Retail Analysis Sample PBIX")

指定数据资产

定义 数据资产 以指定要处理的数据子集。 资产可以像完整表一样简单,也可以像自定义数据分析表达式(DAX)查询一样复杂。

在这里,你将添加多个资产:

Power BI 表

将 Power BI 表添加为数据资产。

ds.add_powerbi_table_asset("Store Asset", table="Store")

Power BI 度量值

如果数据集包含预配置的度量值,则按照与 SemPy 的 evaluate_measure类似的 API 将度量值添加为资产。

ds.add_powerbi_measure_asset(
    "Total Units Asset",
    measure="TotalUnits",
    groupby_columns=["Time[FiscalYear]", "Time[FiscalMonth]"]
)

DAX

如果要定义自己的度量值或更好地控制特定行,可以使用自定义 DAX 查询添加 DAX 资产。 在这里,我们通过将两个现有度量值除以定义 Total Units Ratio 度量值。

ds.add_powerbi_dax_asset(
    "Total Units YoY Asset",
    dax_string=
    """
    EVALUATE SUMMARIZECOLUMNS(
        'Time'[FiscalYear],
        'Time'[FiscalMonth],
        "Total Units Ratio", DIVIDE([Total Units This Year], [Total Units Last Year])
    )    
    """
)

DMV 查询

在某些情况下,使用 动态管理视图(DMV)计算作为数据验证过程的一部分可能很有帮助。 例如,可以跟踪数据集中引用完整性冲突的数量。 有关详细信息,请参阅 清理数据 = 更快的报表

ds.add_powerbi_dax_asset(
    "Referential Integrity Violation",
    dax_string=
    """
    SELECT
        [Database_name],
        [Dimension_Name],
        [RIVIOLATION_COUNT]
    FROM $SYSTEM.DISCOVER_STORAGE_TABLES
    """
)

期望值

若要向资产添加特定约束,首先必须配置 期望套件。 在将单个 期望 添加到每个套件后,您可以用新套件更新最初设置的数据上下文。 有关可用预期的完整列表,请参阅 GX 期望库

首先添加具有两个期望的“零售商店套件”:

  • 有效的邮政编码
  • 行计数介于 80 到 200 之间的表
suite_store = context.add_expectation_suite("Retail Store Suite")

suite_store.add_expectation(ExpectationConfiguration("expect_column_values_to_be_valid_zip5", { "column": "PostalCode" }))
suite_store.add_expectation(ExpectationConfiguration("expect_table_row_count_to_be_between", { "min_value": 80, "max_value": 200 }))

context.add_or_update_expectation_suite(expectation_suite=suite_store)

TotalUnits 度量值

添加具有一个期望的“零售度量值套件”:

  • 列值应大于 50,000
suite_measure = context.add_expectation_suite("Retail Measure Suite")
suite_measure.add_expectation(ExpectationConfiguration(
    "expect_column_values_to_be_between", 
    {
        "column": "TotalUnits",
        "min_value": 50000
    }
))

context.add_or_update_expectation_suite(expectation_suite=suite_measure)

Total Units Ratio DAX

添加具有一个期望的“零售 DAX 套件”:

  • 总单位比率的列值应介于 0.8 和 1.5 之间
suite_dax = context.add_expectation_suite("Retail DAX Suite")
suite_dax.add_expectation(ExpectationConfiguration(
    "expect_column_values_to_be_between", 
    {
        "column": "[Total Units Ratio]",
        "min_value": 0.8,
        "max_value": 1.5
    }
))

context.add_or_update_expectation_suite(expectation_suite=suite_dax)

引用完整性冲突 (DMV)

添加具有一个期望的“零售 DMV 套件”:

  • RIVIOLATION_COUNT 的值应为 0
suite_dmv = context.add_expectation_suite("Retail DMV Suite")
# There should be no RI violations
suite_dmv.add_expectation(ExpectationConfiguration(
    "expect_column_values_to_be_in_set", 
    {
        "column": "RIVIOLATION_COUNT",
        "value_set": [0]
    }
))
context.add_or_update_expectation_suite(expectation_suite=suite_dmv)

验证

若要实际针对数据运行指定的预期,请先创建一个 检查点 并将其添加到上下文中。 有关检查点配置的详细信息,请参阅 数据验证工作流

checkpoint_config = {
    "name": f"Retail Analysis Checkpoint",
    "validations": [
        {
            "expectation_suite_name": "Retail Store Suite",
            "batch_request": {
                "datasource_name": "Retail Analysis Data Source",
                "data_asset_name": "Store Asset",
            },
        },
        {
            "expectation_suite_name": "Retail Measure Suite",
            "batch_request": {
                "datasource_name": "Retail Analysis Data Source",
                "data_asset_name": "Total Units Asset",
            },
        },
        {
            "expectation_suite_name": "Retail DAX Suite",
            "batch_request": {
                "datasource_name": "Retail Analysis Data Source",
                "data_asset_name": "Total Units YoY Asset",
            },
        },
        {
            "expectation_suite_name": "Retail DMV Suite",
            "batch_request": {
                "datasource_name": "Retail Analysis Data Source",
                "data_asset_name": "Referential Integrity Violation",
            },
        },
    ],
}
checkpoint = context.add_checkpoint(
    **checkpoint_config
)

现在运行检查点,将结果提取为 pandas 数据帧,以便简单地进行格式化。

result = checkpoint.run()

处理并打印结果。

import pandas as pd

data = []

for run_result in result.run_results:
    for validation_result in result.run_results[run_result]["validation_result"]["results"]:
        row = {
            "Batch ID": run_result.batch_identifier,
            "type": validation_result.expectation_config.expectation_type,
            "success": validation_result.success
        }

        row.update(dict(validation_result.result))
        
        data.append(row)

result_df = pd.DataFrame.from_records(data)    

result_df[["Batch ID", "type", "success", "element_count", "unexpected_count", "partial_unexpected_list"]]

表显示验证结果。

从这些结果中可以看到,除了通过自定义 DAX 查询定义的“总单位 YoY 资产”之外,所有期望都通过了验证。

诊断

使用语义链接,可以提取源数据,以了解哪些具体年份超出范围。 语义链接提供了用于执行 DAX 查询的内联 magic。 使用语义链接执行传入 GX 数据资产的同一查询,并可视化生成的值。

%%dax "Retail Analysis Sample PBIX"

EVALUATE SUMMARIZECOLUMNS(
    'Time'[FiscalYear],
    'Time'[FiscalMonth],
    "Total Units Ratio", DIVIDE([Total Units This Year], [Total Units Last Year])
)

表显示 DAX 查询汇总的结果。

将这些结果保存在 DataFrame 中。

df = _

绘制结果。

import matplotlib.pyplot as plt

df["Total Units % Change YoY"] = (df["[Total Units Ratio]"] - 1)

df.set_index(["Time[FiscalYear]", "Time[FiscalMonth]"]).plot.bar(y="Total Units % Change YoY")

plt.axhline(0)

plt.axhline(-0.2, color="red", linestyle="dotted")
plt.axhline( 0.5, color="red", linestyle="dotted")

None

图表显示 DAX 查询总结的结果。

从绘图中可以看到,4 月和 7 月略高于范围,然后可以采取进一步的步骤进行调查。

存储 GX 配置

随着数据集中的数据随时间变化,可能需要重新运行刚刚执行的 GX 验证。 目前,数据上下文(包含连接的数据资产、期望套件和检查点)暂时存在,但可以转换为文件上下文以供将来使用。 或者,可以实例化文件上下文(请参阅 实例化数据上下文)。

context = context.convert_to_file_context()

保存上下文后,请将 gx 目录复制到湖屋。

重要

此单元格假定你向笔记本添加了湖屋 如果没有附加湖屋,则你不会看到错误,但以后也无法获取上下文。 如果现在添加湖屋,内核将重启,因此必须重新运行整个笔记本才能返回到这一点。

# copy GX directory to attached lakehouse
!cp -r gx/ /lakehouse/default/Files/gx

现在,可以使用 context = gx.get_context(project_root_dir="<your path here>") 创建将来的上下文,以使用本教程中的所有配置。

例如,在新笔记本中,附加相同的湖屋并使用 context = gx.get_context(project_root_dir="/lakehouse/default/Files/gx") 来检索上下文。

查看有关语义链接/SemPy 的其他教程: