你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

detect_anomalous_new_entity_fl()

适用于:✅Microsoft Fabric✅Azure 数据资源管理器Azure MonitorMicrosoft✅ Sentinel

检测时间戳数据中异常新实体的外观。

该函数 detect_anomalous_new_entity_fl() 是一个 UDF(用户定义的函数), 用于检测时间戳数据(如流量日志)中异常的新实体(如 IP 地址或用户)的外观。 在网络安全上下文中,此类事件可能可疑,并指示潜在的攻击或泄露。

异常模型基于一个 Poisson 分布,表示每个范围每个时间箱(如日)显示的新实体数。 Poisson 分布参数是根据训练期间新实体的外观率估计的,增加了衰变因子,反映了最近出现比旧实体更重要的事实。 因此,我们计算每个范围(例如订阅或帐户)在定义的检测周期中遇到新实体的概率。 模型输出由多个可选参数控制,例如异常的最小阈值、衰减率参数和其他参数。

模型的直接输出是基于遇到新实体的估计概率的反函数的异常分数。 分数在 [0, 1] 范围内单调,1 表示异常。 除了异常分数之外,还有一个二进制标志用于检测到的异常(由最小阈值参数控制)和其他解释字段。

语法

detect_anomalous_new_entity_fl(entityColumnNamescopeColumnName, timeColumnNamestartTrainingstartDetection, endDetection, [maxEntitiesThresh], [minTrainingDaysThresh], [decayParam], [anomalyScoreThresh])

详细了解语法约定

参数

客户 类型​​ 必需 说明
entityColumnName string ✔️ 输入表列的名称,其中包含计算异常模型的实体的名称或 ID。
scopeColumnName string ✔️ 包含分区或范围的输入表列的名称,以便为每个范围生成不同的异常模型。
timeColumnName string ✔️ 包含时间戳的输入表列的名称,用于定义训练和检测周期。
startTraining datetime ✔️ 异常模型的训练期的开始。 其结束由检测周期的开始定义。
startDetection datetime ✔️ 异常情况检测的检测周期的开始。
endDetection datetime ✔️ 异常检测的检测期结束。
maxEntitiesThresh int 用于计算异常的范围中现有实体的最大数目。 如果实体数超过阈值,则范围被视为过于干扰,并且不会计算异常。 默认值是 60秒。
minTrainingDaysThresh int 用于计算异常的范围所存在的训练期间的最低天数。 如果它低于阈值,则范围被视为太新且未知,因此不会计算异常。 默认值为 14。
decayParam real 异常模型的衰减速率参数,范围为 0,1]。 较低的值意味着更快的衰减,因此在训练期间以后出现时更为重要。 值为 1 表示无衰减,因此将简单的平均值用于 Poisson 分布参数估计。 默认值为 0.95。
anomalyScoreThresh real 检测到异常的异常分数的最小值,范围为 [0, 1]。 较高的值表示仅将更重要的事例视为异常情况,因此检测到的异常更少(精度较高、召回率较低)。 默认值为 0.9。

函数定义

可以通过将函数的代码嵌入为查询定义的函数,或将其创建为数据库中的存储函数来定义函数,如下所示:

使用以下 let 语句定义函数。 不需要任何权限。

重要

let 语句不能独立运行。 它必须后跟一个表格表达式语句。 若要运行 detect_anomalous_new_entity_fl() 的工作示例,请参阅示例

let detect_anomalous_new_entity_fl = (T:(*), entityColumnName:string, scopeColumnName:string
                                        , timeColumnName:string, startTraining:datetime, startDetection:datetime, endDetection:datetime
                                        , maxEntitiesThresh:int = 60, minTrainingDaysThresh:int = 14, decayParam:real = 0.95, anomalyScoreThresh:real = 0.9)
{
//pre-process the input data by adding standard column names and dividing to datasets
let timePeriodBinSize = 'day';      // we assume a reasonable bin for time is day, so the probability model is built per that bin size
let processedData = (
    T
    | extend scope      = column_ifexists(scopeColumnName, '')
    | extend entity     = column_ifexists(entityColumnName, '')
    | extend sliceTime  = todatetime(column_ifexists(timeColumnName, ''))
    | where isnotempty(scope) and isnotempty(entity) and isnotempty(sliceTime)
    | extend dataSet = case((sliceTime >= startTraining and sliceTime < startDetection), 'trainSet'
                           , sliceTime >= startDetection and sliceTime <= endDetection,  'detectSet'
                                                                                       , 'other')
    | where dataSet in ('trainSet', 'detectSet')
);
// summarize the data by scope and entity. this will be used to create a distribution of entity appearances based on first seen data
let entityData = (
    processedData
    | summarize countRowsEntity = count(), firstSeenEntity = min(sliceTime), lastSeenEntity = max(sliceTime), firstSeenSet = arg_min(sliceTime, dataSet) 
        by scope, entity
    | extend firstSeenSet = dataSet
    | project-away dataSet
);
// aggregate entity data per scope and get the number of entities appearing over time
let aggregatedCandidateScopeData = (
    entityData
    | summarize countRowsScope = sum(countRowsEntity), countEntitiesScope = dcount(entity), countEntitiesScopeInTrain = dcountif(entity, firstSeenSet == 'trainSet')
        , firstSeenScope = min(firstSeenEntity), lastSeenScope = max(lastSeenEntity), hasNewEntities = iff(dcountif(entity,firstSeenSet == 'detectSet') > 0, 1, 0) 
            by scope
    | extend slicesInTrainingScope = datetime_diff(timePeriodBinSize, startDetection, firstSeenScope)
    | where countEntitiesScopeInTrain <= maxEntitiesThresh and slicesInTrainingScope >= minTrainingDaysThresh and lastSeenScope >= startDetection and hasNewEntities == 1
);
let modelData = (
    entityData
    | join kind = inner (aggregatedCandidateScopeData) on scope 
    | where firstSeenSet == 'trainSet'
    | summarize countAddedEntities = dcount(entity), firstSeenScope = min(firstSeenScope), slicesInTrainingScope = max(slicesInTrainingScope), countEntitiesScope = max(countEntitiesScope)
        by scope, firstSeenSet, firstSeenEntity
    | extend diffInDays = datetime_diff(timePeriodBinSize, startDetection, firstSeenEntity)
// adding exponentially decaying weights to counts
    | extend decayingWeight = pow(base = decayParam, exponent = diffInDays)
    | extend decayingValue = countAddedEntities * decayingWeight
    | summarize   newEntityProbability = round(1 - exp(-1.0 * sum(decayingValue)/max(diffInDays)), 4)
                , countKnownEntities = sum(countAddedEntities), lastNewEntityTimestamp = max(firstSeenEntity), slicesOnScope = max(slicesInTrainingScope)///for explainability
        by scope, firstSeenSet
// anomaly score is based on probability to get no new entities, calculated using Poisson distribution (P(X=0) = exp(-avg)) with added decay on average
    | extend newEntityAnomalyScore = round(1 - newEntityProbability, 4)
    | extend isAnomalousNewEntity = iff(newEntityAnomalyScore >= anomalyScoreThresh, 1, 0)
);
let resultsData = (
    processedData
    | where dataSet == 'detectSet'
    | join kind = inner (modelData) on scope
	| project-away scope1
    | where isAnomalousNewEntity == 1
    | summarize arg_min(sliceTime, *) by scope, entity
    | extend anomalyType = strcat('newEntity_', entityColumnName), anomalyExplainability = strcat('The ', entityColumnName, ' ', entity, ' wasn\'t seen on ', scopeColumnName, ' ', scope, ' during the last ',  slicesOnScope, ' ', timePeriodBinSize, 's. Previously, ', countKnownEntities
        , ' entities were seen, the last one of them appearing at ', format_datetime(lastNewEntityTimestamp, 'yyyy-MM-dd HH:mm'), '.')
    | join kind = leftouter (entityData | where firstSeenSet == 'trainSet' | extend entityFirstSeens = strcat(entity, ' : ', format_datetime(firstSeenEntity, 'yyyy-MM-dd HH:mm')) | sort by scope, firstSeenEntity asc | summarize anomalyState = make_list(entityFirstSeens) by scope) on scope
    | project-away scope1
);
resultsData
};
// Write your query to use the function here.

示例

以下示例使用 invoke 运算符运行函数。

若要使用查询定义的函数,请在嵌入的函数定义后调用它。

let detect_anomalous_new_entity_fl = (T:(*), entityColumnName:string, scopeColumnName:string
                                        , timeColumnName:string, startTraining:datetime, startDetection:datetime, endDetection:datetime
                                        , maxEntitiesThresh:int = 60, minTrainingDaysThresh:int = 14, decayParam:real = 0.95, anomalyScoreThresh:real = 0.9)
{
//pre-process the input data by adding standard column names and dividing to datasets
let timePeriodBinSize = 'day';      // we assume a reasonable bin for time is day, so the probability model is built per that bin size
let processedData = (
    T
    | extend scope      = column_ifexists(scopeColumnName, '')
    | extend entity     = column_ifexists(entityColumnName, '')
    | extend sliceTime  = todatetime(column_ifexists(timeColumnName, ''))
    | where isnotempty(scope) and isnotempty(entity) and isnotempty(sliceTime)
    | extend dataSet = case((sliceTime >= startTraining and sliceTime < startDetection), 'trainSet'
                           , sliceTime >= startDetection and sliceTime <= endDetection,  'detectSet'
                                                                                       , 'other')
    | where dataSet in ('trainSet', 'detectSet')
);
// summarize the data by scope and entity. this will be used to create a distribution of entity appearances based on first seen data
let entityData = (
    processedData
    | summarize countRowsEntity = count(), firstSeenEntity = min(sliceTime), lastSeenEntity = max(sliceTime), firstSeenSet = arg_min(sliceTime, dataSet) 
        by scope, entity
    | extend firstSeenSet = dataSet
    | project-away dataSet
);
// aggregate entity data per scope and get the number of entities appearing over time
let aggregatedCandidateScopeData = (
    entityData
    | summarize countRowsScope = sum(countRowsEntity), countEntitiesScope = dcount(entity), countEntitiesScopeInTrain = dcountif(entity, firstSeenSet == 'trainSet')
        , firstSeenScope = min(firstSeenEntity), lastSeenScope = max(lastSeenEntity), hasNewEntities = iff(dcountif(entity,firstSeenSet == 'detectSet') > 0, 1, 0) 
            by scope
    | extend slicesInTrainingScope = datetime_diff(timePeriodBinSize, startDetection, firstSeenScope)
    | where countEntitiesScopeInTrain <= maxEntitiesThresh and slicesInTrainingScope >= minTrainingDaysThresh and lastSeenScope >= startDetection and hasNewEntities == 1
);
let modelData = (
    entityData
    | join kind = inner (aggregatedCandidateScopeData) on scope 
    | where firstSeenSet == 'trainSet'
    | summarize countAddedEntities = dcount(entity), firstSeenScope = min(firstSeenScope), slicesInTrainingScope = max(slicesInTrainingScope), countEntitiesScope = max(countEntitiesScope)
        by scope, firstSeenSet, firstSeenEntity
    | extend diffInDays = datetime_diff(timePeriodBinSize, startDetection, firstSeenEntity)
// adding exponentially decaying weights to counts
    | extend decayingWeight = pow(base = decayParam, exponent = diffInDays)
    | extend decayingValue = countAddedEntities * decayingWeight
    | summarize   newEntityProbability =  round(1 - exp(-1.0 * sum(decayingValue)/max(diffInDays)), 4)
                , countKnownEntities = sum(countAddedEntities), lastNewEntityTimestamp = max(firstSeenEntity), slicesOnScope = max(slicesInTrainingScope)///for explainability
        by scope, firstSeenSet
// anomaly score is based on probability to get no new entities, calculated using Poisson distribution (P(X=0) = exp(-avg)) with added decay on average
    | extend newEntityAnomalyScore = round(1 - newEntityProbability, 4)
    | extend isAnomalousNewEntity = iff(newEntityAnomalyScore >= anomalyScoreThresh, 1, 0)
);
let resultsData = (
    processedData
    | where dataSet == 'detectSet'
    | join kind = inner (modelData) on scope
    | project-away scope1
    | where isAnomalousNewEntity == 1
    | summarize arg_min(sliceTime, *) by scope, entity
    | extend anomalyType = strcat('newEntity_', entityColumnName), anomalyExplainability = strcat('The ', entityColumnName, ' ', entity, ' wasn\'t seen on ', scopeColumnName, ' ', scope, ' during the last ',  slicesOnScope, ' ', timePeriodBinSize, 's. Previously, ', countKnownEntities
        , ' entities were seen, the last one of them appearing at ', format_datetime(lastNewEntityTimestamp, 'yyyy-MM-dd HH:mm'), '.')
    | join kind = leftouter (entityData | where firstSeenSet == 'trainSet' | extend entityFirstSeens = strcat(entity, ' : ', format_datetime(firstSeenEntity, 'yyyy-MM-dd HH:mm')) | sort by scope, firstSeenEntity asc | summarize anomalyState = make_list(entityFirstSeens) by scope) on scope
    | project-away scope1
);
resultsData
};
// synthetic data generation
let detectPeriodStart   = datetime(2022-04-30 05:00:00.0000000);
let trainPeriodStart    = datetime(2022-03-01 05:00);
let names               = pack_array("Admin", "Dev1", "Dev2", "IT-support");
let countNames          = array_length(names);
let testData            = range t from 1 to 24*60 step 1
    | extend timeSlice      = trainPeriodStart + 1h * t
    | extend countEvents    = round(2*rand() + iff((t/24)%7>=5, 10.0, 15.0) - (((t%24)/10)*((t%24)/10)), 2) * 100 // generate a series with weekly seasonality
    | extend userName       = tostring(names[toint(rand(countNames))])
    | extend deviceId       = hash_md5(rand())
    | extend accountName    = iff(((rand() < 0.2) and (timeSlice < detectPeriodStart)), 'testEnvironment', 'prodEnvironment')
    | extend userName       = iff(timeSlice == detectPeriodStart, 'H4ck3r', userName)
    | extend deviceId       = iff(timeSlice == detectPeriodStart, 'abcdefghijklmnoprtuvwxyz012345678', deviceId)
    | sort by timeSlice desc
;
testData
| invoke detect_anomalous_new_entity_fl(entityColumnName    = 'userName'  //principalName for positive, deviceId for negative
                                , scopeColumnName           = 'accountName'
                                , timeColumnName            = 'timeSlice'
                                , startTraining             = trainPeriodStart
                                , startDetection            = detectPeriodStart
                                , endDetection              = detectPeriodStart
                            )

输出

scope 实体 sliceTime t timeSlice countEvents userName deviceId accountName 数据集 firstSeenSet newEntityProbability countKnownEntities lastNewEntityTimestamp slicesOnScope newEntityAnomalyScore isAnomalousNewEntity anomalyType anomalyExplainability anomalyState
prodEnvironment H4ck3r 2022-04-30 05:00:00.0000000 1440 2022-04-30 05:00:00.0000000 1687 H4ck3r abcdefghijklmnoprtuvwxyz012345678 prodEnvironment detectSet trainSet 0.0031 4 2022-03-01 09:00:00.0000000 60 0.9969 1 newEntity_userName 在过去 60 天内,accountName prodEnvironment 上未显示 userName H4ck3r。 以前,可以看到四个实体,最后一个实体出现在2022-03-01 09:00。 [“IT 支持: 2022-03-01 07:00”, “Admin: 2022-03-01 08:00”, “Dev2: 2022-03-01 09:00”, “Dev1 : 2022-03-01 14:00”]

运行该函数的输出是每个范围中每个实体的测试数据集中的第一个看到行,针对新实体进行筛选(这意味着它们未出现在训练期间),标记为异常(这意味着实体异常分数高于 anomalyScoreThresh)。 为了清楚起见,添加了一些其他字段:

  • dataSet:当前数据集(始终为 detectSet)。
  • firstSeenSet:首次看到范围(应为“trainSet”)的数据集。
  • newEntityProbability:根据 Poisson 模型估计查看任何新实体的概率。
  • countKnownEntities:范围上的现有实体。
  • lastNewEntityTimestamp:上次在异常实体之前看到新实体的时间。
  • slicesOnScope:每个范围的切片计数。
  • newEntityAnomalyScore:异常分数是 [0, 1] 范围内的新实体,较高的值意味着更多的异常。
  • isAnomalousNewEntity:异常新实体的二进制标志
  • anomalyType:显示异常类型(在一起运行多个异常情况检测逻辑时很有用)。
  • anomalyExplainability:生成的异常及其解释的文本包装器。
  • anomalyState:范围上现有实体的包,其首次出现时间。

在具有默认参数的每个用户帐户上运行此函数将获取以前看不见和异常的用户('H4ck3r'),其异常分数为 0.9969,这意味着这是意外的(由于训练期间现有用户数量较少)。

当我们以实体形式在 deviceId 上运行具有默认参数的函数时,由于存在大量现有设备,因此不会看到异常。 但是,如果将参数 anomalyScoreThresh 降低到 0.0001 并将参数提升到 maxEntitiesThresh 到 10000,我们将有效地降低精度,以支持召回,并在设备“abcdefghijklmnoprtuvwxyz012345678”上检测异常(异常分数较低)。

输出显示异常实体以及采用标准化格式的说明字段。 这些字段可用于调查异常情况以及对多个实体运行异常实体检测或同时运行其他算法。

网络安全上下文中的建议用法是在有意义的实体(如用户名或 IP 地址)(如帐户订阅)上运行函数。 检测到的异常新实体意味着其外观在范围上不预期,并且可能可疑。