将自定义指标与 Databricks Lakehouse 监视配合使用

本页介绍如何在 Databricks Lakehouse 监视中创建自定义指标。 除了自动计算的分析和漂移统计数据之外,还可以创建自定义指标。 例如,你可能想要跟踪捕获业务逻辑某些方面的加权平均值或使用自定义模型质量得分。 你还可以创建自定义偏差指标来跟踪主表中值的更改(与基线或上一个时间窗口相比)。

有关如何使用 MonitorMetric API 的更多详细信息,请参阅 API 参考

自定义指标的类型

Databricks Lakehouse 监视包括以下类型的自定义指标:

  • 聚合指标,根据主表中的列计算。 聚合指标存储在配置文件指标表中。
  • 派生指标,根据之前计算的聚合指标计算得出,不直接使用主表中的数据。 派生指标存储在配置文件指标表中。
  • 偏移指标,用于比较先前计算的聚合或来自两个不同时间窗口或主表和基线表之间的派生指标。 偏移指标存储在偏移指标表中。

尽可能使用派生指标和偏移指标,可以最大限度地减少对整个主表的重新计算。 仅聚合指标访问主表中的数据。 然后,可以直接根据聚合指标值计算派生指标和偏移指标。

自定义指标参数

若要定义自定义指标,需要为 SQL 列表达式创建 Jinja 模板。 本节中的表描述了定义指标的参数以及 Jinja 模板中使用的参数。

参数 说明
type MonitorMetricType.CUSTOM_METRIC_TYPE_AGGREGATEMonitorMetricType.CUSTOM_METRIC_TYPE_DERIVEDMonitorMetricType.CUSTOM_METRIC_TYPE_DRIFT 之一。
name 指标表中自定义指标的列名。
input_columns 应为其计算指标的输入表中的列名称列表。 若要指示在计算中使用了多个列,请使用 :table。 请参阅本文中的示例。
definition SQL 表达式的 Jinja 模板,指定如何计算指标。 请参阅创建定义
output_data_type JSON 字符串格式的指标输出的 Spark 数据类型。

创建definition

definition 参数必须是 Jinja 模板形式的单个字符串表达式。 它不能包含联接或子查询。

下表列出了可用于创建 SQL Jinja 模板以指定如何计算指标的参数。

参数 说明
{{input_column}} 用于计算自定义指标的列。
{{prediction_col}} 包含 ML 模型预测的列。 与 InferenceLog 分析一起使用。
{{label_col}} 保存 ML 模型事实标签的列。 与 InferenceLog 分析一起使用。
{{current_df}} 用于与前一个时间窗口相比的偏移。 来自前一个时间窗口的数据。
{{base_df}} 用于与基线表相比的偏移。 基线数据。

聚合指标示例

以下示例计算列中值的平方平均值,并将其应用于 f1f2 列。 输出将作为新列保存在配置文件指标表中,并显示在对应于列 f1f2 的分析行中。 适用的列名称替换 Jinja 参数 {{input_column}}

from databricks.sdk.service.catalog import MonitorMetric, MonitorMetricType
from pyspark.sql import types as T

MonitorMetric(
    type=MonitorMetricType.CUSTOM_METRIC_TYPE_AGGREGATE,
    name="squared_avg",
    input_columns=["f1", "f2"],
    definition="avg(`{{input_column}}`*`{{input_column}}`)",
    output_data_type=T.StructField("output", T.DoubleType()).json(),
)

以下代码定义一个自定义指标,该指标计算列 f1f2 之间的差值的平均值。 此示例演示如何在 input_columns 参数中使用 [":table"] 来指示在计算中使用了表中的多个列。

from databricks.sdk.service.catalog import MonitorMetric, MonitorMetricType
from pyspark.sql import types as T

MonitorMetric(
    type=MonitorMetricType.CUSTOM_METRIC_TYPE_AGGREGATE,
    name="avg_diff_f1_f2",
    input_columns=[":table"],
    definition="avg(f1 - f2)",
    output_data_type=T.StructField("output", T.DoubleType()).json(),
)

此示例计算加权模型质量分数。 对于 critical 列为 True 的观察结果,当该行的预测值与真实值不匹配时,将分配较重的罚分。 由于它是在原始列(predictionlabel)上定义的,因此被定义为聚合指标。 :table 列指示此指标是从多个列计算得出的。 Jinja 参数 {{prediction_col}}{{label_col}} 将替换为监视器的预测列和事实标签列的名称。

from databricks.sdk.service.catalog import MonitorMetric, MonitorMetricType
from pyspark.sql import types as T

MonitorMetric(
    type=MonitorMetricType.CUSTOM_METRIC_TYPE_AGGREGATE,
    name="weighted_error",
    input_columns=[":table"],
    definition="""avg(CASE
      WHEN {{prediction_col}} = {{label_col}} THEN 0
      WHEN {{prediction_col}} != {{label_col}} AND critical=TRUE THEN 2
      ELSE 1 END)""",
    output_data_type=T.StructField("output", T.DoubleType()).json(),
)

派生指标示例

以下代码定义一个自定义指标,用于计算本节前面定义的 squared_avg 指标的平方根。 由于这是派生指标,因此它不引用主表数据,而是根据 squared_avg 聚合指标进行定义。 输出将保存为配置文件指标表中的新列。

from databricks.sdk.service.catalog import MonitorMetric, MonitorMetricType
from pyspark.sql import types as T

MonitorMetric(
    type=MonitorMetricType.CUSTOM_METRIC_TYPE_DERIVED,
    name="root_mean_square",
    input_columns=["f1", "f2"],
    definition="sqrt(squared_avg)",
    output_data_type=T.StructField("output", T.DoubleType()).json(),
)

偏移指标示例

以下代码定义了一个偏移指标,用于跟踪本节前面定义的 weighted_error 指标中的更改。 {{current_df}}{{base_df}} 参数允许指标引用当前窗口和比较窗口中的 weighted_error 值。 比较窗口可以是基线数据,也可以是前一个时间窗口的数据。 偏移指标保存在偏移指标表中。

from databricks.sdk.service.catalog import MonitorMetric, MonitorMetricType
from pyspark.sql import types as T

MonitorMetric(
    type=MonitorMetricType.CUSTOM_METRIC_TYPE_DRIFT,
    name="error_rate_delta",
    input_columns=[":table"],
    definition="{{current_df}}.weighted_error - {{base_df}}.weighted_error",
    output_data_type=T.StructField("output", T.DoubleType()).json(),
)