多变量异常情况检测
有关实时智能中的多变量异常情况检测的一般信息,请参阅 Microsoft Fabric 中的多变量异常情况检测 - 概述。 在本教程中,你将使用示例数据在 Python 笔记本中使用 Spark 引擎训练多变量异常情况检测模型。 然后,你将通过使用 Eventhouse 引擎将训练的模型应用到新数据来预测异常情况。 前几个步骤设置环境,以下步骤训练模型并预测异常情况。
先决条件
- 具有已启用 Microsoft Fabric 的容量的工作区
- 工作区中的“管理员”、“参与者”或“成员”角色。 创建环境等项目需要此权限级别。
- 具有数据库的工作区中的 eventhouse。
- 从 GitHub 存储库下载示例数据
- 从 GitHub 存储库下载笔记本
第 1 部分 - 启用 OneLake 可用性
必须先启用 OneLake 可用性,然后才能在 Eventhouse 中获取数据。 此步骤非常重要,因为它使引入的数据能够在 OneLake 中可用。 在后面的步骤中,你可以从 Spark Notebook 访问此相同数据来训练模型。
在实时智能中浏览到工作区主页。
选择在先决条件部分中创建的 Eventhouse。 选择要在其中存储数据的数据库。
在“数据库详细信息”磁贴,选择“OneLake 可用性”旁边的“铅笔”图标
在右窗格中,将按钮切换为“活动”。
选择“完成” 。
第 2 部分 - 启用 KQL Python 插件
在此步骤中,你将在 Eventhouse 中启用 python 插件。 此步骤需要在 KQL 查询集中运行预测异常 Python 代码。 请务必选择包含时序异常情况检测器包的正确包。
在 Eventhouse 屏幕中,选择数据库,然后从功能区中选择“管理”>“插件”。
在“插件”窗格中,将 Python 语言扩展切换到“打开”。
选择 Python 3.11.7 DL(预览版)。
选择“完成” 。
第 3 部分 - 创建 Spark 环境
在此步骤中,你将创建一个 Spark 环境来运行 Python 笔记本,该笔记本使用 Spark 引擎训练多变量异常情况检测模型。 有关创建环境的详细信息,请参阅创建和管理环境。
在体验切换器中,选择“数据工程”。 如果已处于数据工程师体验中,请浏览到“主页”。
从“要创建的建议项目”中选择“环境”,然后输入环境的名称“MVAD_ENV”。
在“库”下,选择“公共库”。
选择“从 PyPI 添加”。
在搜索框中,输入“时序异常情况检测器”。 版本会自动填充最新版本。 本教程是使用版本 0.2.7 创建的,这是 Kusto Python 3.11.7 DL 中包含的版本。
选择“保存”。
选择环境中的“主页”选项卡。
从功能区中,选择“发布”图标。
选择“全部发布”。 此步骤可能需要几分钟才能完成。
第 4 部分- 将数据获取到 Eventhouse
将鼠标悬停在要在其中存储数据的 KQL 数据库上。 选择“更多菜单 [...]”>“获取数据”>“本地文件”。
选择“+ 新建表”,然后输入 demo_stocks_change 作为表名。
选择下一步。
在“检查数据”部分中,将“第一行为列标头”切换为“打开”。
选择“完成”。
上传数据后,选择“关闭”。
第 5 部分 - 将 OneLake 路径复制到表
请确保选择 demo_stocks_change 表。 在“表详细信息”磁贴中,选择“复制路径”,以将 OneLake 路径复制到剪贴板。 将此复制的文本保存在文本编辑器的某个位置,以供后续步骤使用。
第 6 部分 - 准备笔记本
在体验切换器中,选择“开发”,然后选择你的工作区。
依次选择“导入”、“笔记本”“从此计算机”。
上传笔记本后,可以从工作区找到并打开笔记本。
在顶部功能区中,选择“工作区默认”下拉列表,然后选择在上一步中创建的环境。
第 7 部分 - 运行笔记本
导入标准包。
import numpy as np import pandas as pd
Spark 需要 ABFSS URI 才能安全地连接到 OneLake 存储,因此下一步将定义此函数以将 OneLake URI 转换为 ABFSS URI。
def convert_onelake_to_abfss(onelake_uri): if not onelake_uri.startswith('https://'): raise ValueError("Invalid OneLake URI. It should start with 'https://'.") uri_without_scheme = onelake_uri[8:] parts = uri_without_scheme.split('/') if len(parts) < 3: raise ValueError("Invalid OneLake URI format.") account_name = parts[0].split('.')[0] container_name = parts[1] path = '/'.join(parts[2:]) abfss_uri = f"abfss://{container_name}@{parts[0]}/{path}" return abfss_uri
输入从第 5 部分 - 将 OneLake 路径复制到表中复制的 OneLake URI,以便将 demo_stocks_change 表加载到 pandas 数据帧中。
onelake_uri = "OneLakeTableURI" # Replace with your OneLake table URI abfss_uri = convert_onelake_to_abfss(onelake_uri) print(abfss_uri)
df = spark.read.format('delta').load(abfss_uri) df = df.toPandas().set_index('Date') print(df.shape) df[:3]
运行以下单元格来准备训练和预测数据帧。
注意
实际预测将由 Eventhouse 在第 9 部分 - Predict-anomalies-in-the-kql-queryset 中的数据上运行。 在生产应用场景中,如果要将数据流式传输到 Eventhouse,则会对新的流数据进行预测。 在本教程中,数据集已按日期拆分为两个部分以进行训练和预测。 这样做是为了模拟历史数据和新的流数据。
features_cols = ['AAPL', 'AMZN', 'GOOG', 'MSFT', 'SPY'] cutoff_date = pd.to_datetime('2023-01-01')
train_df = df[df.Date < cutoff_date] print(train_df.shape) train_df[:3]
train_len = len(train_df) predict_len = len(df) - train_len print(f'Total samples: {len(df)}. Split to {train_len} for training, {predict_len} for testing')
运行单元格以训练模型并将其保存在 Fabric MLflow 模型注册表中。
import mlflow from anomaly_detector import MultivariateAnomalyDetector model = MultivariateAnomalyDetector()
sliding_window = 200 param s = {"sliding_window": sliding_window}
model.fit(train_df, params=params)
with mlflow.start_run(): mlflow.log_params(params) mlflow.set_tag("Training Info", "MVAD on 5 Stocks Dataset") model_info = mlflow.pyfunc.log_model( python_model=model, artifact_path="mvad_artifacts", registered_model_name="mvad_5_stocks_model", )
# Extract the registered model path to be used for prediction using Kusto Python sandbox mi = mlflow.search_registered_models(filter_string="name='mvad_5_stocks_model'")[0] model_abfss = mi.latest_versions[0].source print(model_abfss)
从最后一个单元格输出复制模型 URI。 在后面的下一步骤中将要使用此 URI。
第 8 部分 - 设置 KQL 查询集
有关一般信息,请参阅创建 KQL 查询集。
- 在体验切换器中,选择“实时智能”。
- 选择工作区。
- 选择“+新建项目”>“KQL 查询集”。 输入名称 MultivariateAnomalyDetectionTutorial。
- 选择创建。
- 在“OneLake 数据中心”窗口中,选择存储数据的 KQL 数据库。
- 选择“连接” 。
第 9 部分 - 预测 KQL 查询集中的异常情况
复制/粘贴并运行以下“.create-or-alter 函数”查询以定义
predict_fabric_mvad_fl()
存储的函数:.create-or-alter function with (folder = "Packages\\ML", docstring = "Predict MVAD model in Microsoft Fabric") predict_fabric_mvad_fl(samples:(*), features_cols:dynamic, artifacts_uri:string, trim_result:bool=false) { let s = artifacts_uri; let artifacts = bag_pack('MLmodel', strcat(s, '/MLmodel;impersonate'), 'conda.yaml', strcat(s, '/conda.yaml;impersonate'), 'requirements.txt', strcat(s, '/requirements.txt;impersonate'), 'python_env.yaml', strcat(s, '/python_env.yaml;impersonate'), 'python_model.pkl', strcat(s, '/python_model.pkl;impersonate')); let kwargs = bag_pack('features_cols', features_cols, 'trim_result', trim_result); let code = ```if 1: import os import shutil import mlflow model_dir = 'C:/Temp/mvad_model' model_data_dir = model_dir + '/data' os.mkdir(model_dir) shutil.move('C:/Temp/MLmodel', model_dir) shutil.move('C:/Temp/conda.yaml', model_dir) shutil.move('C:/Temp/requirements.txt', model_dir) shutil.move('C:/Temp/python_env.yaml', model_dir) shutil.move('C:/Temp/python_model.pkl', model_dir) features_cols = kargs["features_cols"] trim_result = kargs["trim_result"] test_data = df[features_cols] model = mlflow.pyfunc.load_model(model_dir) predictions = model.predict(test_data) predict_result = pd.DataFrame(predictions) samples_offset = len(df) - len(predict_result) # this model doesn't output predictions for the first sliding_window-1 samples if trim_result: # trim the prefix samples result = df[samples_offset:] result.iloc[:,-4:] = predict_result.iloc[:, 1:] # no need to copy 1st column which is the timestamp index else: result = df # output all samples result.iloc[samples_offset:,-4:] = predict_result.iloc[:, 1:] ```; samples | evaluate python(typeof(*), code, kwargs, external_artifacts=artifacts) }
复制/粘贴以下预测查询。
- 替换步骤 7 末尾复制的输出模型 URI。
- 运行查询。 它会根据训练的模型检测五个股票的多变量异常情况,并将结果呈现为
anomalychart
。 异常点呈现在第一个股票上 (AAPL),尽管它们表示多变量异常情况(换句话说,五只股票在特定日期的联合变化的异常)。
let cutoff_date=datetime(2023-01-01); let num_predictions=toscalar(demo_stocks_change | where Date >= cutoff_date | count); // number of latest points to predict let sliding_window=200; // should match the window that was set for model training let prefix_score_len = sliding_window/2+min_of(sliding_window/2, 200)-1; let num_samples = prefix_score_len + num_predictions; demo_stocks_change | top num_samples by Date desc | order by Date asc | extend is_anomaly=bool(false), score=real(null), severity=real(null), interpretation=dynamic(null) | invoke predict_fabric_mvad_fl(pack_array('AAPL', 'AMZN', 'GOOG', 'MSFT', 'SPY'), // NOTE: Update artifacts_uri to model path artifacts_uri='enter your model URI here', trim_result=true) | summarize Date=make_list(Date), AAPL=make_list(AAPL), AMZN=make_list(AMZN), GOOG=make_list(GOOG), MSFT=make_list(MSFT), SPY=make_list(SPY), anomaly=make_list(toint(is_anomaly)) | render anomalychart with(anomalycolumns=anomaly, title='Stock Price Changest in % with Anomalies')
产生的异常情况图应如下图所示:
清理资源
完成本教程后,你可以删除你创建的资源,从而避免产生其他成本。 若要删除资源,请执行以下步骤:
- 浏览到工作区主页。
- 删除在本教程中创建的环境。
- 删除在本教程中创建的笔记本。
- 删除在本教程中使用的 Eventhouse 或数据库。
- 删除在本教程中创建的 KQL 查询集。