使用 API 创建监视器

本页介绍如何使用 Databricks SDK 在 Databricks 中创建监视器,并介绍 API 调用中使用的所有参数。 也可以使用 REST API 创建和管理监视器。 有关参考信息,请参阅 Lakehouse 监视 SDK 参考REST API 参考

可以在 Unity Catalog 中注册的任何托管或外部 Delta 表上创建监视器。 只能在 Unity Catalog 元存储中为任何表创建一个监视器。

要求

Lakehouse 监视 API 内置于 databricks-sdk 0.28.0 及更高版本中。 若要使用最新版本的 API,请使用笔记本开头的以下命令安装 Python 客户端:

%pip install "databricks-sdk>=0.28.0"

若要进行身份验证以在环境中使用 Databricks SDK,请参阅身份验证

配置文件类型

创建监视器时,请选择以下配置文件类型之一:TimeSeries、InferenceLog 或 Snapshot。 本部分简要介绍每个选项。 有关详细信息,请参阅 API 参考文档REST API 参考文档

注意

  • 首次创建时序或推理配置文件时,监视器仅分析在创建它之前 30 天内的数据。 监视器在创建后,将处理所有新数据。
  • 具体化视图和流式处理表上定义的监视器不支持增量处理。

提示

对于 TimeSeriesInference 配置文件,最佳做法是启用表上的更改数据馈送 (CDF)。 启用 CDF 后,只会处理新追加的数据,而不是在每次刷新时重新处理整个表。 这使得执行更高效,并在跨多个表缩放监视时降低成本。

TimeSeries 配置文件

TimeSeries 配置文件比较跨时间窗口的数据分布。 对于 TimeSeries 配置文件,必须提供以下内容:

  • 时间戳列 (timestamp_col)。 时间戳列数据类型必须是 TIMESTAMP 或可以使用 to_timestamp PySpark 函数转换为时间戳的类型。
  • 用于计算指标的 granularities 集。 可用粒度为“5 分钟”、“30 分钟”、“1 小时”、“1 天”、“n 周”、“1 个月”、“1 年”。
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import MonitorTimeSeries

w = WorkspaceClient()
w.quality_monitors.create(
  table_name=f"{catalog}.{schema}.{table_name}",
  assets_dir=f"/Workspace/Users/{user_email}/databricks_lakehouse_monitoring/{catalog}.{schema}.{table_name}",
  output_schema_name=f"{catalog}.{schema}",
  time_series=MonitorTimeSeries(timestamp_col=ts, granularities=["30 minutes"])
)

InferenceLog 配置文件

InferenceLog 配置文件类似于 TimeSeries 配置文件,但也包括模型质量指标。 对于 InferenceLog 配置文件,下列参数是必需的:

参数 说明
problem_type MonitorInferenceLogProblemType.PROBLEM_TYPE_CLASSIFICATIONMonitorInferenceLogProblemType.PROBLEM_TYPE_REGRESSION
prediction_col 包含模型预测值的列。
timestamp_col 包含推理请求时间戳的列。
model_id_col 包含用于预测的模型 ID 的列。
granularities 确定如何跨时间对窗口中的数据进行分区。 可能的值:“5 分钟”、“30 分钟”、“1 小时”、“1 天”、“n 周”、“1 个月”、“1 年”。

还有一个可选参数:

可选参数 说明
label_col 包含模型预测基本事实的列。
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import MonitorInferenceLog, MonitorInferenceLogProblemType

w = WorkspaceClient()
w.quality_monitors.create(
  table_name=f"{catalog}.{schema}.{table_name}",
  assets_dir=f"/Workspace/Users/{user_email}/databricks_lakehouse_monitoring/{catalog}.{schema}.{table_name}",
  output_schema_name=f"{catalog}.{schema}",
  inference_log=MonitorInferenceLog(
        problem_type=MonitorInferenceLogProblemType.PROBLEM_TYPE_CLASSIFICATION,
        prediction_col="preds",
        timestamp_col="ts",
        granularities=["30 minutes", "1 day"],
        model_id_col="model_ver",
        label_col="label", # optional
  )
)

对于 InferenceLog 配置文件,将根据 model_id_col 的不同值自动创建切片。

Snapshot 配置文件

相对于 TimeSeriesSnapshot 配置文件则监视表的完整内容随时间的变化。 指标针对表中的所有数据进行计算,并在每次刷新监视器时监视表状态。

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import MonitorSnapshot

w = WorkspaceClient()
w.quality_monitors.create(
  table_name=f"{catalog}.{schema}.{table_name}",
  assets_dir=f"/Workspace/Users/{user_email}/databricks_lakehouse_monitoring/{catalog}.{schema}.{table_name}",
  output_schema_name=f"{catalog}.{schema}",
  snapshot=MonitorSnapshot()
)

刷新和查看监视器结果

若要刷新指标表,请使用 run_refresh。 例如:

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
w.quality_monitors.run_refresh(
    table_name=f"{catalog}.{schema}.{table_name}"
)

从笔记本调用 run_refresh 时,将创建或更新监视器指标表。 此计算在无服务器计算上运行,而不是在笔记本附加到的群集上运行。 更新监视器统计信息时,可以继续在笔记本中运行命令。

有关存储在指标表中的统计信息的信息,请参阅监控指标表。指标表是 Unity Catalog 表。 可以在笔记本或 SQL 查询资源管理器中查询它们,并在目录资源管理器中查看它们。

若要显示与监视器关联的所有刷新的历史记录,请使用 list_refreshes

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
w.quality_monitors.list_refreshes(
    table_name=f"{catalog}.{schema}.{table_name}"
)

若要获取已排队、正在运行或已完成的特定运行的状态,请使用 get_refresh

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
run_info = w.quality_monitors.run_refresh(table_name=f"{catalog}.{schema}.{table_name}")

w.quality_monitors.get_refresh(
    table_name=f"{catalog}.{schema}.{table_name}",
    refresh_id = run_info.refresh_id
)

若要取消已纳入排队或正在运行的刷新,请使用 cancel_refresh

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
run_info = w.quality_monitors.run_refresh(table_name=f"{catalog}.{schema}.{table_name}")

w.quality_monitors.cancel_refresh(
    table_name=f"{catalog}.{schema}.{table_name}",
    refresh_id=run_info.refresh_id
)

查看监视器设置

可以使用 API get_monitor 查看监视设置。

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
w.quality_monitors.get(f"{catalog}.{schema}.{table_name}")

计划

若要将监视器设置为按计划运行,请使用 create_monitorschedule 参数:

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import MonitorTimeSeries, MonitorCronSchedule

w = WorkspaceClient()
w.quality_monitors.create(
  table_name=f"{catalog}.{schema}.{table_name}",
  assets_dir=f"/Workspace/Users/{user_email}/databricks_lakehouse_monitoring/{catalog}.{schema}.{table_name}",
  output_schema_name=f"{catalog}.{schema}",
  time_series=MonitorTimeSeries(timestamp_col=ts, granularities=["30 minutes"]),
  schedule=MonitorCronSchedule(
        quartz_cron_expression="0 0 12 * * ?", # schedules a refresh every day at 12 noon
        timezone_id="PST",
    )
)

有关详细信息,请参阅 cron 表达式

通知

若要为监视器设置通知,请使用 create_monitornotifications 参数:

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import MonitorTimeSeries, MonitorNotifications, MonitorDestination

w = WorkspaceClient()
w.quality_monitors.create(
  table_name=f"{catalog}.{schema}.{table_name}",
  assets_dir=f"/Workspace/Users/{user_email}/databricks_lakehouse_monitoring/{catalog}.{schema}.{table_name}",
  output_schema_name=f"{catalog}.{schema}",
  time_series=MonitorTimeSeries(timestamp_col=ts, granularities=["30 minutes"]),
  notifications=MonitorNotifications(
        # Notify the given email when a monitoring refresh fails or times out.
        on_failure=MonitorDestination(
            email_addresses=["your_email@domain.com"]
        )
    )
)

每种事件类型(例如“on_failure”)最多支持 5 个电子邮件地址。

控制对指标表的访问

监视器创建的指标表和仪表板归创建监视器的用户所有。 可以使用 Unity Catalog 特权来控制对指标表的访问。 若要在工作区中共享仪表板,请使用仪表板右上角的“共享”按钮。

删除监视器

若要删除监视器,请执行以下操作:

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
w.quality_monitors.delete(table_name=f"{catalog}.{schema}.{table_name}")

此命令不会删除监视器创建的配置文件表和仪表板。 必须在单独的步骤中删除这些资产,或者可以将它们保存在其他位置。

示例笔记本

以下示例笔记本演示了如何创建监视器、刷新监视器以及检查它创建的指标表。

笔记本示例:时序配置文件

此笔记本说明如何创建 TimeSeries 类型监视器。

时序湖屋监视器示例笔记本

获取笔记本

笔记本示例:推理配置文件(回归)

此笔记本演示如何为回归问题创建 InferenceLog 类型监视器。

推理湖屋监视器回归示例笔记本

获取笔记本

笔记本示例:推理配置文件(分类)

此笔记本演示如何为分类问题创建 InferenceLog 类型监视器。

推理湖屋监视器分类示例笔记本

获取笔记本

笔记本示例:快照配置文件

此笔记本说明如何创建 Snapshot 类型监视器。

快照湖屋监视器示例笔记本

获取笔记本