AnomalyDetection_ChangePoint (Azure 流分析)
检测时序事件流中的永久性异常。 基础机器学习模型使用可交换性马丁格尔斯算法。
语法
AnomalyDetection_ChangePoint(
<scalar_expression>,
<confidence>,
<historySize>)
OVER ([PARTITION BY <partition key>]
LIMIT DURATION(<unit>, <length>)
[WHEN boolean_expression])
参数
scalar_expression
模型对其执行异常情况检测的事件列或计算字段。 此参数的允许值包括 FLOAT 或 BIGINT 数据类型,这些数据类型返回单个 (标量) 值。
不允许使用通配符表达式 *。 此外, scalar_expression 不能包含其他分析函数或外部函数。
信心
一个百分比数字,从 1.00 到 100 (非独占) ,用于设置机器学习模型的敏感度。 置信度越低,检测到的异常数就越高,反之亦然。 从 70 到 90 之间的任意数字开始,并根据在开发或测试中观察到的结果对此进行调整。
historySize
模型持续学习的滑动窗口中的事件数,并用于对下一个事件进行异常评分。 通常,这应表示正常行为的时间段,使模型能够标记后续异常。 从使用历史日志进行有根据的猜测开始,并根据在开发或测试中观察到的结果进行调整。
over ([ partition_by_clause ] limit_duration_clause [when_clause])
partition_by_clause
用于根据事件中的特定列对模型的训练进行分区。 模型在所有分区中应用相同的函数参数设置。
limit_duration_clause 持续时间 (单位,长度)
流分析中滑动窗口的时间大小。 此时间窗口的建议大小相当于生成 historySize 事件数处于稳定状态所需的时间。
when_clause
指定要提供给模型以执行异常情况检测的事件的布尔条件。 when_clause是可选的。
返回类型
函数返回由以下列组成的嵌套记录:
IsAnomaly
BIGINT (0 或 1) 指示事件是否异常。
评分
计算的马丁格尔分数 (浮点数) 指示事件的异常程度。 此分数随异常值呈指数级增长。
示例
在以下查询示例中,第一个查询假定每 5 分钟发生一个事件,第二个查询采用每秒一个事件。 两个模型的置信度级别都设置为 75。
AnomalyDetection_ChangePoint(reading, 75, 72)
OVER (LIMIT DURATION(hour, 6))
AnomalyDetection_ChangePoint(temperature, 75, 120)
OVER ([PARTITION BY sensorId] LIMIT DURATION(second, 120))
假设 20 分钟滑动窗口中的统一输入速率为每秒 1 个事件,历史记录大小为 1200 个事件。 最终的 SELECT 语句将提取事件,并输出评分和置信度级别为 80% 的异常状态。
WITH AnomalyDetectionStep AS
(
SELECT
EVENTENQUEUEDUTCTIME as time,
CAST(temperature as float) as temp,
AnomalyDetection_ChangePoint(CAST(temperature as float), 80, 1200)
OVER(LIMIT DURATION(minute, 20)) as ChangePointScores
FROM input
)
SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly
INTO output
FROM AnomalyDetectionStep
使用 1 秒的翻转窗口实现统一的非统一输入流的示例:
WITH SmootheningStep AS
(
SELECT
System.Timestamp() as time,
AVG(CAST(temperature as float)) as temp
FROM input
GROUP BY TUMBLINGWINDOW(second, 1)
),
AnomalyDetectionStep AS
(
SELECT
time,
temp,
AnomalyDetection_ChangePoint(temp, 80, 1200)
OVER(LIMIT DURATION(minute, 20)) as ChangePointScores
FROM SmootheningStep
)
SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly
INTO output
FROM AnomalyDetectionStep
使用分区查询的示例,用于为每个传感器训练单独的模型:
WITH AnomalyDetectionStep AS
(
SELECT
sensorid,
System.Timestamp() as time,
CAST(temperature as float) as temp,
AnomalyDetection_ChangePoint(CAST(temperature as float), 80, 1200)
OVER(PARTITION BY sensorid
LIMIT DURATION(minute, 20)) as ChangePointScores
FROM input
)
SELECT
CAST (sensorid as nvarchar(max)) as sensoridstring,
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly
INTO output
FROM AnomalyDetectionStep