教程:使用 SemPy 和 Great Expectations (GX) 验证数据

本教程介绍如何将 SemPy 与 Great Expectations (GX) 配合使用来对 Power BI 语义模型执行数据验证。

本教程演示如何:

  • 使用 Great Expectations 的 Fabric 数据源(基于语义链接)验证 Fabric 工作区中数据集的约束。
    • 配置 GX 数据上下文、数据资产和期望。
    • 使用 GX 检查点查看验证结果。
  • 使用语义链接分析原始数据。

先决条件

  • 选择左侧导航窗格中的“工作区”,找到并选中自己的工作区。 此工作区将成为当前工作区。
  • 下载零售分析示例 PBIX.pbix 文件。
  • 在工作区中,使用“上传”按钮将零售分析示例 PBIX.pbix 文件上传到工作区。

在笔记本中继续操作

great_expectations_tutorial.ipynb 是本教程随附的笔记本。

若要打开本教程随附的笔记本,请按照让系统为数据科学做好准备教程中的说明操作,将该笔记本导入到工作区。

或者,如果要从此页面复制并粘贴代码,则可以创建新的笔记本

在开始运行代码之前,请务必将湖屋连接到笔记本

设置笔记本

在本部分中,你将使用必要的模块和数据设置笔记本环境。

  1. 使用笔记本中的 %pip 内联安装功能从 PyPI 安装 SemPy 和相关 Great Expectations 库。
# install libraries
%pip install semantic-link great-expectations great_expectations_experimental great_expectations_zipcode_expectations

# load %%dax cell magic
%load_ext sempy
  1. 导入稍后会用到的模块:
import great_expectations as gx
from great_expectations.expectations.expectation import ExpectationConfiguration
from great_expectations_zipcode_expectations.expectations import expect_column_values_to_be_valid_zip5

设置 GX 数据上下文和数据源

为了开始使用 Great Expectations,首先必须设置 GX 数据上下文。 上下文充当 GX 操作的入口点,并保存所有相关配置。

context = gx.get_context()

现在可以将 Fabric 数据集作为数据源添加到此上下文中,以开始与数据进行交互。 本教程使用标准 Power BI 示例语义模型零售分析示例 .pbix 文件

ds = context.sources.add_fabric_powerbi("Retail Analysis Data Source", dataset="Retail Analysis Sample PBIX")

指定数据资产

定义“数据资产”以指定要处理的数据子集。 资产可以像完整表一样简单,也可以像自定义数据分析表达式 (DAX) 查询一样复杂。

在这里,你将添加多个资产:

Power BI 表

将 Power BI 表添加为数据资产。

ds.add_powerbi_table_asset("Store Asset", table="Store")

Power BI 度量值

如果数据集包含预配置的度量值,则可按照与 SemPy 的 evaluate_measure 类似的 API 将度量值添加为资产。

ds.add_powerbi_measure_asset(
    "Total Units Asset",
    measure="TotalUnits",
    groupby_columns=["Time[FiscalYear]", "Time[FiscalMonth]"]
)

DAX

如果要定义自己的度量值或更好地控制特定行,则可以使用自定义 DAX 查询添加 DAX 资产。 在这里,我们通过将两个现有度量值相除来定义 Total Units Ratio 度量值。

ds.add_powerbi_dax_asset(
    "Total Units YoY Asset",
    dax_string=
    """
    EVALUATE SUMMARIZECOLUMNS(
        'Time'[FiscalYear],
        'Time'[FiscalMonth],
        "Total Units Ratio", DIVIDE([Total Units This Year], [Total Units Last Year])
    )    
    """
)

DMV 查询

在某些情况下,在数据验证过程中使用动态管理视图 (DMV) 计算可能很有帮助。 例如,可以跟踪数据集中引用完整性冲突的数量。 有关详细信息,请参阅“清理数据 = 更快的报表”。

ds.add_powerbi_dax_asset(
    "Referential Integrity Violation",
    dax_string=
    """
    SELECT
        [Database_name],
        [Dimension_Name],
        [RIVIOLATION_COUNT]
    FROM $SYSTEM.DISCOVER_STORAGE_TABLES
    """
)

预期

若要向资产添加特定约束,首先必须配置期望套件。 将单个期望添加到每个套件后,可以使用新套件更新在开始时设置的数据上下文。 有关可用预期的完整列表,请参阅 GX 期望库

首先添加具有两个期望的“零售商店套件”:

  • 有效的邮政编码
  • 行计数介于 80 到 200 之间的表
suite_store = context.add_expectation_suite("Retail Store Suite")

suite_store.add_expectation(ExpectationConfiguration("expect_column_values_to_be_valid_zip5", { "column": "PostalCode" }))
suite_store.add_expectation(ExpectationConfiguration("expect_table_row_count_to_be_between", { "min_value": 80, "max_value": 200 }))

context.add_or_update_expectation_suite(expectation_suite=suite_store)

TotalUnits 度量值

添加具有一个期望的“零售度量值套件”:

  • 列值应大于 50,000
suite_measure = context.add_expectation_suite("Retail Measure Suite")
suite_measure.add_expectation(ExpectationConfiguration(
    "expect_column_values_to_be_between", 
    {
        "column": "TotalUnits",
        "min_value": 50000
    }
))

context.add_or_update_expectation_suite(expectation_suite=suite_measure)

Total Units Ratio DAX

添加具有一个期望的“零售 DAX 套件”:

  • 总单位比率的列值应介于 0.8 和 1.5 之间
suite_dax = context.add_expectation_suite("Retail DAX Suite")
suite_dax.add_expectation(ExpectationConfiguration(
    "expect_column_values_to_be_between", 
    {
        "column": "[Total Units Ratio]",
        "min_value": 0.8,
        "max_value": 1.5
    }
))

context.add_or_update_expectation_suite(expectation_suite=suite_dax)

引用完整性冲突 (DMV)

添加具有一个期望的“零售 DMV 套件”:

  • RIVIOLATION_COUNT 应为 0
suite_dmv = context.add_expectation_suite("Retail DMV Suite")
# There should be no RI violations
suite_dmv.add_expectation(ExpectationConfiguration(
    "expect_column_values_to_be_in_set", 
    {
        "column": "RIVIOLATION_COUNT",
        "value_set": [0]
    }
))
context.add_or_update_expectation_suite(expectation_suite=suite_dmv)

验证

若要实际针对数据运行指定的预期,请先创建一个检查点并将其添加到上下文中。 有关检查点配置的详细信息,请参阅数据验证工作流

checkpoint_config = {
    "name": f"Retail Analysis Checkpoint",
    "validations": [
        {
            "expectation_suite_name": "Retail Store Suite",
            "batch_request": {
                "datasource_name": "Retail Analysis Data Source",
                "data_asset_name": "Store Asset",
            },
        },
        {
            "expectation_suite_name": "Retail Measure Suite",
            "batch_request": {
                "datasource_name": "Retail Analysis Data Source",
                "data_asset_name": "Total Units Asset",
            },
        },
        {
            "expectation_suite_name": "Retail DAX Suite",
            "batch_request": {
                "datasource_name": "Retail Analysis Data Source",
                "data_asset_name": "Total Units YoY Asset",
            },
        },
        {
            "expectation_suite_name": "Retail DMV Suite",
            "batch_request": {
                "datasource_name": "Retail Analysis Data Source",
                "data_asset_name": "Referential Integrity Violation",
            },
        },
    ],
}
checkpoint = context.add_checkpoint(
    **checkpoint_config
)

现在,运行检查点并将结果提取为 pandas 数据帧,以便进行简单的格式设置。

result = checkpoint.run()

处理并打印结果。

import pandas as pd

data = []

for run_result in result.run_results:
    for validation_result in result.run_results[run_result]["validation_result"]["results"]:
        row = {
            "Batch ID": run_result.batch_identifier,
            "type": validation_result.expectation_config.expectation_type,
            "success": validation_result.success
        }

        row.update(dict(validation_result.result))
        
        data.append(row)

result_df = pd.DataFrame.from_records(data)    

result_df[["Batch ID", "type", "success", "element_count", "unexpected_count", "partial_unexpected_list"]]

表显示验证结果。

从这些结果中可以看到,除了通过自定义 DAX 查询定义的“总单位年初至今资产”之外,所有期望都通过了验证。

诊断

使用语义链接,可以提取源数据,以了解哪些确切年份超出了范围。 语义链接提供了用于执行 DAX 查询的内联 magic。 使用语义链接执行传入 GX 数据资产的同一查询,并可视化生成的值。

%%dax "Retail Analysis Sample PBIX"

EVALUATE SUMMARIZECOLUMNS(
    'Time'[FiscalYear],
    'Time'[FiscalMonth],
    "Total Units Ratio", DIVIDE([Total Units This Year], [Total Units Last Year])
)

表显示 DAX 查询摘要的结果。

将这些结果保存在 DataFrame 中。

df = _

绘制结果。

import matplotlib.pyplot as plt

df["Total Units % Change YoY"] = (df["[Total Units Ratio]"] - 1)

df.set_index(["Time[FiscalYear]", "Time[FiscalMonth]"]).plot.bar(y="Total Units % Change YoY")

plt.axhline(0)

plt.axhline(-0.2, color="red", linestyle="dotted")
plt.axhline( 0.5, color="red", linestyle="dotted")

None

绘图显示 DAX 查询摘要的结果。

从绘图中可以看到,4 月和 7 月略高于范围,然后可以采取进一步的步骤进行调查。

存储 GX 配置

随着数据集中的数据随时间变化,可能需要重新运行刚刚执行的 GX 验证。 目前,数据上下文(包含连接的数据资产、期望套件和检查点)暂时存在,但可以转换为文件上下文以供将来使用。 或者,可以实例化文件上下文(请参阅实例化数据上下文)。

context = context.convert_to_file_context()

保存上下文后,请将 gx 目录复制到湖屋。

重要

此单元格假定你向笔记本添加了湖屋 如果没有附加湖屋,则你不会看到错误,但以后也无法获取上下文。 如果现在添加湖屋,内核将重启,因此必须重新运行整个笔记本才能返回到这一点。

# copy GX directory to attached lakehouse
!cp -r gx/ /lakehouse/default/Files/gx

现在,可以使用 context = gx.get_context(project_root_dir="<your path here>") 创建将来的上下文,以使用本教程中的所有配置。

例如,在新笔记本中,附加相同的湖屋并使用 context = gx.get_context(project_root_dir="/lakehouse/default/Files/gx") 来检索上下文。

查看有关语义链接/SemPy 的其他教程: