Επεξεργασία

Κοινή χρήση μέσω


detect_anomalous_new_entity_fl()

Applies to: ✅ Microsoft FabricAzure Data ExplorerAzure MonitorMicrosoft Sentinel

Detect the appearance of anomalous new entities in timestamped data.

The function detect_anomalous_new_entity_fl() is a UDF (user-defined function) that detects the appearance of anomalous new entities - such as IP addresses or users - in timestamped data, such as traffic logs. In cybersecurity context, such events might be suspicious and indicate a potential attack or compromise.

The anomaly model is based on a Poisson distribution representing the number of new entities appearing per time bin (such as day) for each scope. Poisson distribution parameter is estimated based on the rate of appearance of new entities in training period, with added decay factor reflecting the fact that recent appearances are more important than old ones. Thus we calculate the probability to encounter a new entity in defined detection period per some scope - such as a subscription or an account. The model output is controlled by several optional parameters, such as minimal threshold for anomaly, decay rate parameter, and others.

The model's direct output is an anomaly score based on the inverse of estimated probability to encounter a new entity. The score is monotonous in the range of [0, 1], with 1 representing something anomalous. In addition to the anomaly score, there's a binary flag for detected anomaly (controlled by a minimal threshold parameter), and other explanatory fields.

Syntax

detect_anomalous_new_entity_fl(entityColumnName, scopeColumnName, timeColumnName, startTraining, startDetection, endDetection, [maxEntitiesThresh], [minTrainingDaysThresh], [decayParam], [anomalyScoreThresh])

Learn more about syntax conventions.

Parameters

Name Type Required Description
entityColumnName string ✔️ The name of the input table column containing the names or IDs of the entities for which anomaly model is calculated.
scopeColumnName string ✔️ The name of the input table column containing the partition or scope, so that a different anomaly model is built for each scope.
timeColumnName string ✔️ The name of the input table column containing the timestamps, that are used to define the training and detection periods.
startTraining datetime ✔️ The beginning of the training period for the anomaly model. Its end is defined by the beginning of detection period.
startDetection datetime ✔️ The beginning of the detection period for anomaly detection.
endDetection datetime ✔️ The end of the detection period for anomaly detection.
maxEntitiesThresh int The maximum number of existing entities in scope to calculate anomalies. If the number of entities is above the threshold, the scope is considered too noisy and anomalies aren't calculated. The default value is 60.
minTrainingDaysThresh int The minimum number of days in training period that a scope exists to calculate anomalies. If it is below threshold, the scope is considered too new and unknown, so anomalies aren't calculated. The default value is 14.
decayParam real The decay rate parameter for anomaly model, a number in range (0,1]. Lower values mean faster decay, so more importance is given to later appearances in training period. A value of 1 means no decay, so a simple average is used for Poisson distribution parameter estimation. The default value is 0.95.
anomalyScoreThresh real The minimum value of anomaly score for which an anomaly is detected, a number in range [0, 1]. Higher values mean that only more significant cases are considered anomalous, so fewer anomalies are detected (higher precision, lower recall). The default value is 0.9.

Function definition

You can define the function by either embedding its code as a query-defined function, or creating it as a stored function in your database, as follows:

Define the function using the following let statement. No permissions are required.

Important

A let statement can't run on its own. It must be followed by a tabular expression statement. To run a working example of detect_anomalous_new_entity_fl(), see Example.

let detect_anomalous_new_entity_fl = (T:(*), entityColumnName:string, scopeColumnName:string
                                        , timeColumnName:string, startTraining:datetime, startDetection:datetime, endDetection:datetime
                                        , maxEntitiesThresh:int = 60, minTrainingDaysThresh:int = 14, decayParam:real = 0.95, anomalyScoreThresh:real = 0.9)
{
//pre-process the input data by adding standard column names and dividing to datasets
let timePeriodBinSize = 'day';      // we assume a reasonable bin for time is day, so the probability model is built per that bin size
let processedData = (
    T
    | extend scope      = column_ifexists(scopeColumnName, '')
    | extend entity     = column_ifexists(entityColumnName, '')
    | extend sliceTime  = todatetime(column_ifexists(timeColumnName, ''))
    | where isnotempty(scope) and isnotempty(entity) and isnotempty(sliceTime)
    | extend dataSet = case((sliceTime >= startTraining and sliceTime < startDetection), 'trainSet'
                           , sliceTime >= startDetection and sliceTime <= endDetection,  'detectSet'
                                                                                       , 'other')
    | where dataSet in ('trainSet', 'detectSet')
);
// summarize the data by scope and entity. this will be used to create a distribution of entity appearances based on first seen data
let entityData = (
    processedData
    | summarize countRowsEntity = count(), firstSeenEntity = min(sliceTime), lastSeenEntity = max(sliceTime), firstSeenSet = arg_min(sliceTime, dataSet) 
        by scope, entity
    | extend firstSeenSet = dataSet
    | project-away dataSet
);
// aggregate entity data per scope and get the number of entities appearing over time
let aggregatedCandidateScopeData = (
    entityData
    | summarize countRowsScope = sum(countRowsEntity), countEntitiesScope = dcount(entity), countEntitiesScopeInTrain = dcountif(entity, firstSeenSet == 'trainSet')
        , firstSeenScope = min(firstSeenEntity), lastSeenScope = max(lastSeenEntity), hasNewEntities = iff(dcountif(entity,firstSeenSet == 'detectSet') > 0, 1, 0) 
            by scope
    | extend slicesInTrainingScope = datetime_diff(timePeriodBinSize, startDetection, firstSeenScope)
    | where countEntitiesScopeInTrain <= maxEntitiesThresh and slicesInTrainingScope >= minTrainingDaysThresh and lastSeenScope >= startDetection and hasNewEntities == 1
);
let modelData = (
    entityData
    | join kind = inner (aggregatedCandidateScopeData) on scope 
    | where firstSeenSet == 'trainSet'
    | summarize countAddedEntities = dcount(entity), firstSeenScope = min(firstSeenScope), slicesInTrainingScope = max(slicesInTrainingScope), countEntitiesScope = max(countEntitiesScope)
        by scope, firstSeenSet, firstSeenEntity
    | extend diffInDays = datetime_diff(timePeriodBinSize, startDetection, firstSeenEntity)
// adding exponentially decaying weights to counts
    | extend decayingWeight = pow(base = decayParam, exponent = diffInDays)
    | extend decayingValue = countAddedEntities * decayingWeight
    | summarize   newEntityProbability = round(1 - exp(-1.0 * sum(decayingValue)/max(diffInDays)), 4)
                , countKnownEntities = sum(countAddedEntities), lastNewEntityTimestamp = max(firstSeenEntity), slicesOnScope = max(slicesInTrainingScope)///for explainability
        by scope, firstSeenSet
// anomaly score is based on probability to get no new entities, calculated using Poisson distribution (P(X=0) = exp(-avg)) with added decay on average
    | extend newEntityAnomalyScore = round(1 - newEntityProbability, 4)
    | extend isAnomalousNewEntity = iff(newEntityAnomalyScore >= anomalyScoreThresh, 1, 0)
);
let resultsData = (
    processedData
    | where dataSet == 'detectSet'
    | join kind = inner (modelData) on scope
	| project-away scope1
    | where isAnomalousNewEntity == 1
    | summarize arg_min(sliceTime, *) by scope, entity
    | extend anomalyType = strcat('newEntity_', entityColumnName), anomalyExplainability = strcat('The ', entityColumnName, ' ', entity, ' wasn\'t seen on ', scopeColumnName, ' ', scope, ' during the last ',  slicesOnScope, ' ', timePeriodBinSize, 's. Previously, ', countKnownEntities
        , ' entities were seen, the last one of them appearing at ', format_datetime(lastNewEntityTimestamp, 'yyyy-MM-dd HH:mm'), '.')
    | join kind = leftouter (entityData | where firstSeenSet == 'trainSet' | extend entityFirstSeens = strcat(entity, ' : ', format_datetime(firstSeenEntity, 'yyyy-MM-dd HH:mm')) | sort by scope, firstSeenEntity asc | summarize anomalyState = make_list(entityFirstSeens) by scope) on scope
    | project-away scope1
);
resultsData
};
// Write your query to use the function here.

Example

The following example uses the invoke operator to run the function.

To use a query-defined function, invoke it after the embedded function definition.

let detect_anomalous_new_entity_fl = (T:(*), entityColumnName:string, scopeColumnName:string
                                        , timeColumnName:string, startTraining:datetime, startDetection:datetime, endDetection:datetime
                                        , maxEntitiesThresh:int = 60, minTrainingDaysThresh:int = 14, decayParam:real = 0.95, anomalyScoreThresh:real = 0.9)
{
//pre-process the input data by adding standard column names and dividing to datasets
let timePeriodBinSize = 'day';      // we assume a reasonable bin for time is day, so the probability model is built per that bin size
let processedData = (
    T
    | extend scope      = column_ifexists(scopeColumnName, '')
    | extend entity     = column_ifexists(entityColumnName, '')
    | extend sliceTime  = todatetime(column_ifexists(timeColumnName, ''))
    | where isnotempty(scope) and isnotempty(entity) and isnotempty(sliceTime)
    | extend dataSet = case((sliceTime >= startTraining and sliceTime < startDetection), 'trainSet'
                           , sliceTime >= startDetection and sliceTime <= endDetection,  'detectSet'
                                                                                       , 'other')
    | where dataSet in ('trainSet', 'detectSet')
);
// summarize the data by scope and entity. this will be used to create a distribution of entity appearances based on first seen data
let entityData = (
    processedData
    | summarize countRowsEntity = count(), firstSeenEntity = min(sliceTime), lastSeenEntity = max(sliceTime), firstSeenSet = arg_min(sliceTime, dataSet) 
        by scope, entity
    | extend firstSeenSet = dataSet
    | project-away dataSet
);
// aggregate entity data per scope and get the number of entities appearing over time
let aggregatedCandidateScopeData = (
    entityData
    | summarize countRowsScope = sum(countRowsEntity), countEntitiesScope = dcount(entity), countEntitiesScopeInTrain = dcountif(entity, firstSeenSet == 'trainSet')
        , firstSeenScope = min(firstSeenEntity), lastSeenScope = max(lastSeenEntity), hasNewEntities = iff(dcountif(entity,firstSeenSet == 'detectSet') > 0, 1, 0) 
            by scope
    | extend slicesInTrainingScope = datetime_diff(timePeriodBinSize, startDetection, firstSeenScope)
    | where countEntitiesScopeInTrain <= maxEntitiesThresh and slicesInTrainingScope >= minTrainingDaysThresh and lastSeenScope >= startDetection and hasNewEntities == 1
);
let modelData = (
    entityData
    | join kind = inner (aggregatedCandidateScopeData) on scope 
    | where firstSeenSet == 'trainSet'
    | summarize countAddedEntities = dcount(entity), firstSeenScope = min(firstSeenScope), slicesInTrainingScope = max(slicesInTrainingScope), countEntitiesScope = max(countEntitiesScope)
        by scope, firstSeenSet, firstSeenEntity
    | extend diffInDays = datetime_diff(timePeriodBinSize, startDetection, firstSeenEntity)
// adding exponentially decaying weights to counts
    | extend decayingWeight = pow(base = decayParam, exponent = diffInDays)
    | extend decayingValue = countAddedEntities * decayingWeight
    | summarize   newEntityProbability =  round(1 - exp(-1.0 * sum(decayingValue)/max(diffInDays)), 4)
                , countKnownEntities = sum(countAddedEntities), lastNewEntityTimestamp = max(firstSeenEntity), slicesOnScope = max(slicesInTrainingScope)///for explainability
        by scope, firstSeenSet
// anomaly score is based on probability to get no new entities, calculated using Poisson distribution (P(X=0) = exp(-avg)) with added decay on average
    | extend newEntityAnomalyScore = round(1 - newEntityProbability, 4)
    | extend isAnomalousNewEntity = iff(newEntityAnomalyScore >= anomalyScoreThresh, 1, 0)
);
let resultsData = (
    processedData
    | where dataSet == 'detectSet'
    | join kind = inner (modelData) on scope
    | project-away scope1
    | where isAnomalousNewEntity == 1
    | summarize arg_min(sliceTime, *) by scope, entity
    | extend anomalyType = strcat('newEntity_', entityColumnName), anomalyExplainability = strcat('The ', entityColumnName, ' ', entity, ' wasn\'t seen on ', scopeColumnName, ' ', scope, ' during the last ',  slicesOnScope, ' ', timePeriodBinSize, 's. Previously, ', countKnownEntities
        , ' entities were seen, the last one of them appearing at ', format_datetime(lastNewEntityTimestamp, 'yyyy-MM-dd HH:mm'), '.')
    | join kind = leftouter (entityData | where firstSeenSet == 'trainSet' | extend entityFirstSeens = strcat(entity, ' : ', format_datetime(firstSeenEntity, 'yyyy-MM-dd HH:mm')) | sort by scope, firstSeenEntity asc | summarize anomalyState = make_list(entityFirstSeens) by scope) on scope
    | project-away scope1
);
resultsData
};
// synthetic data generation
let detectPeriodStart   = datetime(2022-04-30 05:00:00.0000000);
let trainPeriodStart    = datetime(2022-03-01 05:00);
let names               = pack_array("Admin", "Dev1", "Dev2", "IT-support");
let countNames          = array_length(names);
let testData            = range t from 1 to 24*60 step 1
    | extend timeSlice      = trainPeriodStart + 1h * t
    | extend countEvents    = round(2*rand() + iff((t/24)%7>=5, 10.0, 15.0) - (((t%24)/10)*((t%24)/10)), 2) * 100 // generate a series with weekly seasonality
    | extend userName       = tostring(names[toint(rand(countNames))])
    | extend deviceId       = hash_md5(rand())
    | extend accountName    = iff(((rand() < 0.2) and (timeSlice < detectPeriodStart)), 'testEnvironment', 'prodEnvironment')
    | extend userName       = iff(timeSlice == detectPeriodStart, 'H4ck3r', userName)
    | extend deviceId       = iff(timeSlice == detectPeriodStart, 'abcdefghijklmnoprtuvwxyz012345678', deviceId)
    | sort by timeSlice desc
;
testData
| invoke detect_anomalous_new_entity_fl(entityColumnName    = 'userName'  //principalName for positive, deviceId for negative
                                , scopeColumnName           = 'accountName'
                                , timeColumnName            = 'timeSlice'
                                , startTraining             = trainPeriodStart
                                , startDetection            = detectPeriodStart
                                , endDetection              = detectPeriodStart
                            )

Output

scope entity sliceTime t timeSlice countEvents userName deviceId accountName dataSet firstSeenSet newEntityProbability countKnownEntities lastNewEntityTimestamp slicesOnScope newEntityAnomalyScore isAnomalousNewEntity anomalyType anomalyExplainability anomalyState
prodEnvironment H4ck3r 2022-04-30 05:00:00.0000000 1440 2022-04-30 05:00:00.0000000 1687 H4ck3r abcdefghijklmnoprtuvwxyz012345678 prodEnvironment detectSet trainSet 0.0031 4 2022-03-01 09:00:00.0000000 60 0.9969 1 newEntity_userName The userName H4ck3r wasn't seen on accountName prodEnvironment during the last 60 days. Previously, four entities were seen, the last one of them appearing at 2022-03-01 09:00. ["IT-support : 2022-03-01 07:00", "Admin : 2022-03-01 08:00", "Dev2 : 2022-03-01 09:00", "Dev1 : 2022-03-01 14:00"]

The output of running the function is the first-seen row in test dataset for each entity per scope, filtered for new entities (meaning they didn't appear during the training period) that were tagged as anomalous (meaning that entity anomaly score was above anomalyScoreThresh). Some other fields are added for clarity:

  • dataSet: current dataset (is always detectSet).
  • firstSeenSet: dataset in which the scope was first seen (should be 'trainSet').
  • newEntityProbability: probability to see any new entity based on Poisson model estimation.
  • countKnownEntities: existing entities on scope.
  • lastNewEntityTimestamp: last time a new entity was seen before the anomalous one.
  • slicesOnScope: count of slices per scope.
  • newEntityAnomalyScore: anomaly score was the new entity in range [0, 1], higher values meaning more anomaly.
  • isAnomalousNewEntity: binary flag for anomalous new entities
  • anomalyType: shows the type of anomaly (helpful when running several anomaly detection logics together).
  • anomalyExplainability: textual wrapper for generated anomaly and its explanation.
  • anomalyState: bag of existing entities on scope with their first seen times.

Running this function on user per account with default parameters gets a previously unseen and anomalous user ('H4ck3r') with high anomaly score of 0.9969, meaning that this is unexpected (due to small numbers of existing users in training period).

When we run the function with default parameters on deviceId as entity, we won't see an anomaly, due to large number of existing devices which makes it expected. However, if we lower the parameter anomalyScoreThresh to 0.0001 and raise the parameter to maxEntitiesThresh to 10000, we'll effectively decrease precision in favor of recall, and detect an anomaly (with a low anomaly score) on device 'abcdefghijklmnoprtuvwxyz012345678'.

The output shows the anomalous entities together with explanation fields in standardized format. These fields are useful for investigating the anomaly and for running anomalous entity detection on several entities or running other algorithms together.

The suggested usage in cybersecurity context is running the function on meaningful entities - such as usernames or IP addresses - per meaningful scopes - such as subscription on accounts. A detected anomalous new entity means that its appearance isn't expected on the scope, and might be suspicious.