detect_anomalous_new_entity_fl()
Aplica-se a: ✅Microsoft Fabric✅Azure Data Explorer✅Azure Monitor✅Microsoft Sentinel
Detecte o aparecimento de novas entidades anômalas em dados com carimbo de data/hora.
A função detect_anomalous_new_entity_fl()
é uma UDF (função definida pelo usuário) que detecta o aparecimento de novas entidades anômalas - como endereços IP ou usuários - em dados com carimbo de data/hora, como logs de tráfego. No contexto de segurança cibernética, esses eventos podem ser suspeitos e indicar um possível ataque ou comprometimento.
O modelo de anomalia é baseado em uma distribuição de Poisson que representa o número de novas entidades que aparecem por compartimento de tempo (como dia) para cada escopo. O parâmetro de distribuição de Poisson é estimado com base na taxa de aparecimento de novas entidades no período de treinamento, com fator de decaimento adicionado refletindo o fato de que as aparições recentes são mais importantes do que as antigas. Assim, calculamos a probabilidade de encontrar uma nova entidade no período de detecção definido por algum escopo - como uma assinatura ou uma conta. A saída do modelo é controlada por vários parâmetros opcionais, como limite mínimo para anomalia, parâmetro de taxa de decaimento e outros.
A saída direta do modelo é uma pontuação de anomalia baseada no inverso da probabilidade estimada de encontrar uma nova entidade. A pontuação é monótona na faixa de [0, 1], com 1 representando algo anômalo. Além da pontuação de anomalia, há um sinalizador binário para anomalia detectada (controlada por um parâmetro de limite mínimo) e outros campos explicativos.
Sintaxe
detect_anomalous_new_entity_fl(
entityColumnName, scopeColumnName, timeColumnName, startTraining, startDetection, endDetection, [maxEntitiesThresh], [minTrainingDaysThresh], [decayParam], [anomalyScoreThresh])
Saiba mais sobre as convenções de sintaxe.
Parâmetros
Nome | Digitar | Obrigatória | Descrição |
---|---|---|---|
entityColumnName | string |
✔️ | O nome da coluna da tabela de entrada que contém os nomes ou IDs das entidades para as quais o modelo de anomalia é calculado. |
scopeColumnName | string |
✔️ | O nome da coluna da tabela de entrada que contém a partição ou o escopo, para que um modelo de anomalia diferente seja criado para cada escopo. |
timeColumnName | string |
✔️ | O nome da coluna da tabela de entrada que contém os carimbos de data/hora, que são usados para definir os períodos de treinamento e detecção. |
startTraining | datetime |
✔️ | O início do período de treinamento para o modelo de anomalia. Seu fim é definido pelo início do período de detecção. |
Detecção inicial | datetime |
✔️ | O início do período de detecção de anomalias. |
Detecção final | datetime |
✔️ | O fim do período de detecção de anomalias. |
maxEntitiesThresh | int |
O número máximo de entidades existentes no escopo para calcular anomalias. Se o número de entidades estiver acima do limite, o escopo será considerado muito barulhento e as anomalias não serão calculadas. O valor padrão é 60. | |
minTrainingDaysThresh | int |
O número mínimo de dias no período de treinamento em que existe um escopo para calcular anomalias. Se estiver abaixo do limite, o escopo será considerado muito novo e desconhecido, portanto, as anomalias não serão calculadas. O valor padrão é 14. | |
decayParam | real |
O parâmetro de taxa de decaimento para o modelo de anomalia, um número no intervalo (0,1). Valores mais baixos significam decaimento mais rápido, portanto, mais importância é dada às aparições posteriores no período de treinamento. Um valor de 1 significa que não há decaimento, portanto, uma média simples é usada para a estimativa do parâmetro de distribuição de Poisson. O valor padrão é 0,95. | |
anomalyScoreThresh | real |
O valor mínimo da pontuação de anomalia para a qual uma anomalia é detectada, um número no intervalo [0, 1]. Valores mais altos significam que apenas casos mais significativos são considerados anômalos, portanto, menos anomalias são detectadas (maior precisão, menor recall). O valor padrão é 0.9. |
Definição de função
Você pode definir a função inserindo seu código como uma função definida por consulta ou criando-a como uma função armazenada em seu banco de dados, da seguinte maneira:
Defina a função usando a instrução let a seguir. Nenhuma permissão é necessária.
Importante
Uma instrução let não pode ser executada sozinha. Ele deve ser seguido por uma instrução de expressão tabular. Para executar um exemplo funcional de detect_anomalous_new_entity_fl()
, consulte Exemplo.
let detect_anomalous_new_entity_fl = (T:(*), entityColumnName:string, scopeColumnName:string
, timeColumnName:string, startTraining:datetime, startDetection:datetime, endDetection:datetime
, maxEntitiesThresh:int = 60, minTrainingDaysThresh:int = 14, decayParam:real = 0.95, anomalyScoreThresh:real = 0.9)
{
//pre-process the input data by adding standard column names and dividing to datasets
let timePeriodBinSize = 'day'; // we assume a reasonable bin for time is day, so the probability model is built per that bin size
let processedData = (
T
| extend scope = column_ifexists(scopeColumnName, '')
| extend entity = column_ifexists(entityColumnName, '')
| extend sliceTime = todatetime(column_ifexists(timeColumnName, ''))
| where isnotempty(scope) and isnotempty(entity) and isnotempty(sliceTime)
| extend dataSet = case((sliceTime >= startTraining and sliceTime < startDetection), 'trainSet'
, sliceTime >= startDetection and sliceTime <= endDetection, 'detectSet'
, 'other')
| where dataSet in ('trainSet', 'detectSet')
);
// summarize the data by scope and entity. this will be used to create a distribution of entity appearances based on first seen data
let entityData = (
processedData
| summarize countRowsEntity = count(), firstSeenEntity = min(sliceTime), lastSeenEntity = max(sliceTime), firstSeenSet = arg_min(sliceTime, dataSet)
by scope, entity
| extend firstSeenSet = dataSet
| project-away dataSet
);
// aggregate entity data per scope and get the number of entities appearing over time
let aggregatedCandidateScopeData = (
entityData
| summarize countRowsScope = sum(countRowsEntity), countEntitiesScope = dcount(entity), countEntitiesScopeInTrain = dcountif(entity, firstSeenSet == 'trainSet')
, firstSeenScope = min(firstSeenEntity), lastSeenScope = max(lastSeenEntity), hasNewEntities = iff(dcountif(entity,firstSeenSet == 'detectSet') > 0, 1, 0)
by scope
| extend slicesInTrainingScope = datetime_diff(timePeriodBinSize, startDetection, firstSeenScope)
| where countEntitiesScopeInTrain <= maxEntitiesThresh and slicesInTrainingScope >= minTrainingDaysThresh and lastSeenScope >= startDetection and hasNewEntities == 1
);
let modelData = (
entityData
| join kind = inner (aggregatedCandidateScopeData) on scope
| where firstSeenSet == 'trainSet'
| summarize countAddedEntities = dcount(entity), firstSeenScope = min(firstSeenScope), slicesInTrainingScope = max(slicesInTrainingScope), countEntitiesScope = max(countEntitiesScope)
by scope, firstSeenSet, firstSeenEntity
| extend diffInDays = datetime_diff(timePeriodBinSize, startDetection, firstSeenEntity)
// adding exponentially decaying weights to counts
| extend decayingWeight = pow(base = decayParam, exponent = diffInDays)
| extend decayingValue = countAddedEntities * decayingWeight
| summarize newEntityProbability = round(1 - exp(-1.0 * sum(decayingValue)/max(diffInDays)), 4)
, countKnownEntities = sum(countAddedEntities), lastNewEntityTimestamp = max(firstSeenEntity), slicesOnScope = max(slicesInTrainingScope)///for explainability
by scope, firstSeenSet
// anomaly score is based on probability to get no new entities, calculated using Poisson distribution (P(X=0) = exp(-avg)) with added decay on average
| extend newEntityAnomalyScore = round(1 - newEntityProbability, 4)
| extend isAnomalousNewEntity = iff(newEntityAnomalyScore >= anomalyScoreThresh, 1, 0)
);
let resultsData = (
processedData
| where dataSet == 'detectSet'
| join kind = inner (modelData) on scope
| project-away scope1
| where isAnomalousNewEntity == 1
| summarize arg_min(sliceTime, *) by scope, entity
| extend anomalyType = strcat('newEntity_', entityColumnName), anomalyExplainability = strcat('The ', entityColumnName, ' ', entity, ' wasn\'t seen on ', scopeColumnName, ' ', scope, ' during the last ', slicesOnScope, ' ', timePeriodBinSize, 's. Previously, ', countKnownEntities
, ' entities were seen, the last one of them appearing at ', format_datetime(lastNewEntityTimestamp, 'yyyy-MM-dd HH:mm'), '.')
| join kind = leftouter (entityData | where firstSeenSet == 'trainSet' | extend entityFirstSeens = strcat(entity, ' : ', format_datetime(firstSeenEntity, 'yyyy-MM-dd HH:mm')) | sort by scope, firstSeenEntity asc | summarize anomalyState = make_list(entityFirstSeens) by scope) on scope
| project-away scope1
);
resultsData
};
// Write your query to use the function here.
Exemplo
O exemplo a seguir usa o operador invoke para executar a função.
Para usar uma função definida por consulta, invoque-a após a definição da função inserida.
let detect_anomalous_new_entity_fl = (T:(*), entityColumnName:string, scopeColumnName:string
, timeColumnName:string, startTraining:datetime, startDetection:datetime, endDetection:datetime
, maxEntitiesThresh:int = 60, minTrainingDaysThresh:int = 14, decayParam:real = 0.95, anomalyScoreThresh:real = 0.9)
{
//pre-process the input data by adding standard column names and dividing to datasets
let timePeriodBinSize = 'day'; // we assume a reasonable bin for time is day, so the probability model is built per that bin size
let processedData = (
T
| extend scope = column_ifexists(scopeColumnName, '')
| extend entity = column_ifexists(entityColumnName, '')
| extend sliceTime = todatetime(column_ifexists(timeColumnName, ''))
| where isnotempty(scope) and isnotempty(entity) and isnotempty(sliceTime)
| extend dataSet = case((sliceTime >= startTraining and sliceTime < startDetection), 'trainSet'
, sliceTime >= startDetection and sliceTime <= endDetection, 'detectSet'
, 'other')
| where dataSet in ('trainSet', 'detectSet')
);
// summarize the data by scope and entity. this will be used to create a distribution of entity appearances based on first seen data
let entityData = (
processedData
| summarize countRowsEntity = count(), firstSeenEntity = min(sliceTime), lastSeenEntity = max(sliceTime), firstSeenSet = arg_min(sliceTime, dataSet)
by scope, entity
| extend firstSeenSet = dataSet
| project-away dataSet
);
// aggregate entity data per scope and get the number of entities appearing over time
let aggregatedCandidateScopeData = (
entityData
| summarize countRowsScope = sum(countRowsEntity), countEntitiesScope = dcount(entity), countEntitiesScopeInTrain = dcountif(entity, firstSeenSet == 'trainSet')
, firstSeenScope = min(firstSeenEntity), lastSeenScope = max(lastSeenEntity), hasNewEntities = iff(dcountif(entity,firstSeenSet == 'detectSet') > 0, 1, 0)
by scope
| extend slicesInTrainingScope = datetime_diff(timePeriodBinSize, startDetection, firstSeenScope)
| where countEntitiesScopeInTrain <= maxEntitiesThresh and slicesInTrainingScope >= minTrainingDaysThresh and lastSeenScope >= startDetection and hasNewEntities == 1
);
let modelData = (
entityData
| join kind = inner (aggregatedCandidateScopeData) on scope
| where firstSeenSet == 'trainSet'
| summarize countAddedEntities = dcount(entity), firstSeenScope = min(firstSeenScope), slicesInTrainingScope = max(slicesInTrainingScope), countEntitiesScope = max(countEntitiesScope)
by scope, firstSeenSet, firstSeenEntity
| extend diffInDays = datetime_diff(timePeriodBinSize, startDetection, firstSeenEntity)
// adding exponentially decaying weights to counts
| extend decayingWeight = pow(base = decayParam, exponent = diffInDays)
| extend decayingValue = countAddedEntities * decayingWeight
| summarize newEntityProbability = round(1 - exp(-1.0 * sum(decayingValue)/max(diffInDays)), 4)
, countKnownEntities = sum(countAddedEntities), lastNewEntityTimestamp = max(firstSeenEntity), slicesOnScope = max(slicesInTrainingScope)///for explainability
by scope, firstSeenSet
// anomaly score is based on probability to get no new entities, calculated using Poisson distribution (P(X=0) = exp(-avg)) with added decay on average
| extend newEntityAnomalyScore = round(1 - newEntityProbability, 4)
| extend isAnomalousNewEntity = iff(newEntityAnomalyScore >= anomalyScoreThresh, 1, 0)
);
let resultsData = (
processedData
| where dataSet == 'detectSet'
| join kind = inner (modelData) on scope
| project-away scope1
| where isAnomalousNewEntity == 1
| summarize arg_min(sliceTime, *) by scope, entity
| extend anomalyType = strcat('newEntity_', entityColumnName), anomalyExplainability = strcat('The ', entityColumnName, ' ', entity, ' wasn\'t seen on ', scopeColumnName, ' ', scope, ' during the last ', slicesOnScope, ' ', timePeriodBinSize, 's. Previously, ', countKnownEntities
, ' entities were seen, the last one of them appearing at ', format_datetime(lastNewEntityTimestamp, 'yyyy-MM-dd HH:mm'), '.')
| join kind = leftouter (entityData | where firstSeenSet == 'trainSet' | extend entityFirstSeens = strcat(entity, ' : ', format_datetime(firstSeenEntity, 'yyyy-MM-dd HH:mm')) | sort by scope, firstSeenEntity asc | summarize anomalyState = make_list(entityFirstSeens) by scope) on scope
| project-away scope1
);
resultsData
};
// synthetic data generation
let detectPeriodStart = datetime(2022-04-30 05:00:00.0000000);
let trainPeriodStart = datetime(2022-03-01 05:00);
let names = pack_array("Admin", "Dev1", "Dev2", "IT-support");
let countNames = array_length(names);
let testData = range t from 1 to 24*60 step 1
| extend timeSlice = trainPeriodStart + 1h * t
| extend countEvents = round(2*rand() + iff((t/24)%7>=5, 10.0, 15.0) - (((t%24)/10)*((t%24)/10)), 2) * 100 // generate a series with weekly seasonality
| extend userName = tostring(names[toint(rand(countNames))])
| extend deviceId = hash_md5(rand())
| extend accountName = iff(((rand() < 0.2) and (timeSlice < detectPeriodStart)), 'testEnvironment', 'prodEnvironment')
| extend userName = iff(timeSlice == detectPeriodStart, 'H4ck3r', userName)
| extend deviceId = iff(timeSlice == detectPeriodStart, 'abcdefghijklmnoprtuvwxyz012345678', deviceId)
| sort by timeSlice desc
;
testData
| invoke detect_anomalous_new_entity_fl(entityColumnName = 'userName' //principalName for positive, deviceId for negative
, scopeColumnName = 'accountName'
, timeColumnName = 'timeSlice'
, startTraining = trainPeriodStart
, startDetection = detectPeriodStart
, endDetection = detectPeriodStart
)
Saída
scope | entity | fatia | t | Fatia de tempo | contagemEventos | userName | deviceId | accountName | dataSet | firstSeenSet | newEntityProbability | countKnownEntities | lastNewEntityTimestamp | fatiasOnScope | newEntityAnomalyScore | éAnomalousNewEntity | anomalyType | anomaliaExplainability | estado de anomalia |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
prodAmbiente | H4ck3r | 2022-04-30 05:00:00.0000000 | 1440 | 2022-04-30 05:00:00.0000000 | 1,687 | H4ck3r | abcdefghijklmnoprtuvwxyz012345678 | prodAmbiente | conjunto de detecção | trainSet | 0.0031 | 4 | 2022-03-01 09:00:00.0000000 | 60 | 0.9969 | 1 | newEntity_userName | O userName H4ck3r não foi visto em accountName prodEnvironment durante os últimos 60 dias. Anteriormente, quatro entidades foram vistas, a última delas aparecendo em 01/03/2022 às 09:00. | ["Suporte de TI : 01/03/2022 07:00", "Administrador : 01/03/2022 08:00", "Dev2 : 01/03/2022 09:00", "Dev1 : 01/03/2022 14:00"] |
A saída da execução da função é a primeira linha vista no conjunto de dados de teste para cada entidade por escopo, filtrada para novas entidades (o que significa que elas não apareceram durante o período de treinamento) que foram marcadas como anômalas (o que significa que a pontuação de anomalia da entidade estava acima de anomalyScoreThresh). Alguns outros campos são adicionados para maior clareza:
dataSet
: conjunto de dados atual (é sempredetectSet
).firstSeenSet
: conjunto de dados no qual o escopo foi visto pela primeira vez (deve ser 'trainSet').newEntityProbability
: probabilidade de ver qualquer nova entidade com base na estimativa do modelo de Poisson.countKnownEntities
: entidades existentes no escopo.lastNewEntityTimestamp
: última vez que uma nova entidade foi vista antes da anômala.slicesOnScope
: contagem de fatias por escopo.newEntityAnomalyScore
: a pontuação de anomalia foi a nova entidade no intervalo [0, 1], valores mais altos significando mais anomalia.isAnomalousNewEntity
: sinalizador binário para novas entidades anômalasanomalyType
: mostra o tipo de anomalia (útil ao executar várias lógicas de detecção de anomalias juntas).anomalyExplainability
: invólucro textual para anomalia gerada e sua explicação.anomalyState
: saco de entidades existentes no escopo com seus primeiros tempos vistos.
A execução dessa função no usuário por conta com parâmetros padrão obtém um usuário inédito e anômalo ('H4ck3r') com alta pontuação de anomalia de 0,9969, o que significa que isso é inesperado (devido ao pequeno número de usuários existentes no período de treinamento).
Quando executamos a função com parâmetros padrão em deviceId como entidade, não veremos uma anomalia, devido ao grande número de dispositivos existentes, o que a torna esperada. No entanto, se reduzirmos o parâmetro anomalyScoreThresh para 0,0001 e aumentarmos o parâmetro para maxEntitiesThresh para 10000, diminuiremos efetivamente a precisão em favor do recall e detectaremos uma anomalia (com uma pontuação de anomalia baixa) no dispositivo 'abcdefghijklmnoprtuvwxyz012345678'.
A saída mostra as entidades anômalas junto com os campos de explicação em formato padronizado. Esses campos são úteis para investigar a anomalia e para executar a detecção de entidade anômala em várias entidades ou executar outros algoritmos juntos.
O uso sugerido no contexto de segurança cibernética é executar a função em entidades significativas, como nomes de usuário ou endereços IP, por escopos significativos, como assinatura em contas. Uma nova entidade anômala detectada significa que sua aparência não é esperada no escopo e pode ser suspeita.