detect_anomalous_new_entity_fl()
Aplica-se a: ✅Microsoft Fabric✅Azure Data Explorer✅Azure Monitor✅Microsoft Sentinel
Detete o aparecimento de novas entidades anômalas em dados com carimbo de data/hora.
A função detect_anomalous_new_entity_fl()
é um UDF (função definida pelo usuário) que deteta o aparecimento de novas entidades anômalas - como endereços IP ou usuários - em dados com carimbo de data/hora, como logs de tráfego. No contexto da cibersegurança, esses eventos podem ser suspeitos e indicar um potencial ataque ou compromisso.
O modelo de anomalia é baseado em uma distribuição de Poisson que representa o número de novas entidades que aparecem por compartimento de tempo (como um dia) para cada escopo. O parâmetro de distribuição de Poisson é estimado com base na taxa de aparecimento de novas entidades no período de treinamento, com fator de decaimento adicional refletindo o fato de que as aparições recentes são mais importantes do que as antigas. Assim, calculamos a probabilidade de encontrar uma nova entidade em um período de deteção definido por algum escopo - como uma assinatura ou uma conta. A saída do modelo é controlada por vários parâmetros opcionais, como limiar mínimo para anomalia, parâmetro de taxa de decaimento e outros.
A saída direta do modelo é uma pontuação de anomalia, calculada com base no inverso da probabilidade estimada de encontrar uma nova entidade. O escore de anomalia varia de 0 a 1, onde 1 indica uma entidade altamente anômala. Além da pontuação de anomalias, há um sinalizador binário para anomalias detetadas (controlado por um parâmetro de limite mínimo) e outros campos explicativos.
Sintaxe
detect_anomalous_new_entity_fl(
entityColumnName, scopeColumnName, timeColumnName, startTraining, startDetection, endDetection, [maxEntitiesThresh], [minTrainingDaysThresh], [decayParam], [anomalyScoreThresh])
Saiba mais sobre convenções de sintaxe.
Parâmetros
Designação | Tipo | Necessário | Descrição |
---|---|---|---|
entityColumnName | string |
✔️ | O nome da coluna da tabela de entrada que contém os nomes ou IDs das entidades para as quais o modelo de anomalia é calculado. |
scopeColumnName | string |
✔️ | O nome da coluna da tabela de entrada que contém a partição ou o escopo, para que um modelo de anomalia diferente seja criado para cada escopo. |
timeColumnName | string |
✔️ | O nome da coluna da tabela de entrada que contém os carimbos de data/hora, que são usados para definir os períodos de treinamento e deteção. |
start de formação | datetime |
✔️ | O início do período de treinamento para o modelo de anomalia. Seu fim é definido pelo início do período de deteção. |
startDetection | datetime |
✔️ | O início do período de deteção para deteção de anomalias. |
endDetection | datetime |
✔️ | O fim do período de deteção para deteção de anomalias. |
maxEntitiesThresh | int |
O número máximo de entidades existentes no âmbito para calcular anomalias. Se o número de entidades estiver acima do limite, o âmbito é considerado demasiado ruidoso e as anomalias não são calculadas. O valor padrão é 60. | |
minFormaçãoDiasThresh | int |
O número mínimo de dias no período de treinamento que existe para calcular anomalias. Se estiver abaixo do limite, o escopo é considerado muito novo e desconhecido, então as anomalias não são calculadas. O valor padrão é 14. | |
decaimentoParam | real |
O parâmetro da taxa de decaimento para o modelo de anomalia, um número no intervalo (0,1). Valores mais baixos significam decaimento mais rápido, por isso é dada mais importância a aparições posteriores no período de treinamento. Um valor de 1 significa que não há decaimento, portanto, uma média simples é usada para a estimativa do parâmetro de distribuição de Poisson. O valor padrão é 0,95. | |
anomaliaScoreThresh | real |
O valor mínimo da pontuação de anomalia para a qual uma anomalia é detetada, um número no intervalo [0, 1]. Valores mais elevados significam que apenas os casos mais significativos são considerados anómalos, pelo que são detetadas menos anomalias (maior precisão, menor recordação). O valor padrão é 0,9. |
Definição de função
Você pode definir a função incorporando seu código como uma função definida por consulta ou criando-a como uma função armazenada em seu banco de dados, da seguinte maneira:
- definido por consulta
- Armazenado
Defina a função usando a seguinte instrução let. Não são necessárias permissões.
Importante
Uma declaração let não pode ser executada sozinha. Ela deve ser seguida por uma instrução de expressão tabular . Para executar um exemplo funcional de detect_anomalous_new_entity_fl()
, consulte Exemplo.
let detect_anomalous_new_entity_fl = (T:(*), entityColumnName:string, scopeColumnName:string
, timeColumnName:string, startTraining:datetime, startDetection:datetime, endDetection:datetime
, maxEntitiesThresh:int = 60, minTrainingDaysThresh:int = 14, decayParam:real = 0.95, anomalyScoreThresh:real = 0.9)
{
//pre-process the input data by adding standard column names and dividing to datasets
let timePeriodBinSize = 'day'; // we assume a reasonable bin for time is day, so the probability model is built per that bin size
let processedData = (
T
| extend scope = column_ifexists(scopeColumnName, '')
| extend entity = column_ifexists(entityColumnName, '')
| extend sliceTime = todatetime(column_ifexists(timeColumnName, ''))
| where isnotempty(scope) and isnotempty(entity) and isnotempty(sliceTime)
| extend dataSet = case((sliceTime >= startTraining and sliceTime < startDetection), 'trainSet'
, sliceTime >= startDetection and sliceTime <= endDetection, 'detectSet'
, 'other')
| where dataSet in ('trainSet', 'detectSet')
);
// summarize the data by scope and entity. this will be used to create a distribution of entity appearances based on first seen data
let entityData = (
processedData
| summarize countRowsEntity = count(), firstSeenEntity = min(sliceTime), lastSeenEntity = max(sliceTime), firstSeenSet = arg_min(sliceTime, dataSet)
by scope, entity
| extend firstSeenSet = dataSet
| project-away dataSet
);
// aggregate entity data per scope and get the number of entities appearing over time
let aggregatedCandidateScopeData = (
entityData
| summarize countRowsScope = sum(countRowsEntity), countEntitiesScope = dcount(entity), countEntitiesScopeInTrain = dcountif(entity, firstSeenSet == 'trainSet')
, firstSeenScope = min(firstSeenEntity), lastSeenScope = max(lastSeenEntity), hasNewEntities = iff(dcountif(entity,firstSeenSet == 'detectSet') > 0, 1, 0)
by scope
| extend slicesInTrainingScope = datetime_diff(timePeriodBinSize, startDetection, firstSeenScope)
| where countEntitiesScopeInTrain <= maxEntitiesThresh and slicesInTrainingScope >= minTrainingDaysThresh and lastSeenScope >= startDetection and hasNewEntities == 1
);
let modelData = (
entityData
| join kind = inner (aggregatedCandidateScopeData) on scope
| where firstSeenSet == 'trainSet'
| summarize countAddedEntities = dcount(entity), firstSeenScope = min(firstSeenScope), slicesInTrainingScope = max(slicesInTrainingScope), countEntitiesScope = max(countEntitiesScope)
by scope, firstSeenSetOnScope = firstSeenSet, firstSeenEntityOnScope = firstSeenEntity
| extend diffInDays = datetime_diff(timePeriodBinSize, startDetection, firstSeenEntityOnScope)
// adding exponentially decaying weights to counts
| extend decayingWeight = pow(base = decayParam, exponent = diffInDays)
| extend decayingValue = countAddedEntities * decayingWeight
| summarize newEntityProbability = round(1 - exp(-1.0 * sum(decayingValue)/max(diffInDays)), 4)
, countKnownEntities = sum(countAddedEntities), lastNewEntityTimestamp = max(firstSeenEntityOnScope), slicesOnScope = max(slicesInTrainingScope)///for explainability
by scope, firstSeenSetOnScope
// anomaly score is based on probability to get no new entities, calculated using Poisson distribution (P(X=0) = exp(-avg)) with added decay on average
| extend newEntityAnomalyScore = round(1 - newEntityProbability, 4)
| extend isAnomalousNewEntity = iff(newEntityAnomalyScore >= anomalyScoreThresh, 1, 0)
);
let resultsData = (
processedData
| where dataSet == 'detectSet'
| join kind = inner (modelData) on scope
| join kind = inner (entityData | where firstSeenSet == 'detectSet') on scope, entity, $left.sliceTime == $right.firstSeenEntity
| project-away scope1, scope2, entity1
| where isAnomalousNewEntity == 1
| summarize arg_min(sliceTime, *) by scope, entity
| extend anomalyType = strcat('newEntity_', entityColumnName), anomalyExplainability = strcat('The ', entityColumnName, ' ', entity, ' wasn\'t seen on ', scopeColumnName, ' ', scope, ' during the last ', slicesOnScope, ' ', timePeriodBinSize, 's. Previously, ', countKnownEntities
, ' entities were seen, the last one of them appearing at ', format_datetime(lastNewEntityTimestamp, 'yyyy-MM-dd HH:mm'), '.')
| join kind = leftouter (entityData | where firstSeenSet == 'trainSet' | extend entityFirstSeens = strcat(entity, ' : ', format_datetime(firstSeenEntity, 'yyyy-MM-dd HH:mm')) | sort by scope, firstSeenEntity asc | summarize anomalyState = make_list(entityFirstSeens) by scope) on scope
| project-away scope1
);
resultsData
};
// Write your query to use the function here.
Exemplo
O exemplo a seguir usa o operador invoke para executar a função.
- definido por consulta
- Armazenado
Para usar uma função definida por consulta, invoque-a após a definição da função incorporada.
let detect_anomalous_new_entity_fl = (T:(*), entityColumnName:string, scopeColumnName:string
, timeColumnName:string, startTraining:datetime, startDetection:datetime, endDetection:datetime
, maxEntitiesThresh:int = 60, minTrainingDaysThresh:int = 14, decayParam:real = 0.95, anomalyScoreThresh:real = 0.9)
{
//pre-process the input data by adding standard column names and dividing to datasets
let timePeriodBinSize = 'day'; // we assume a reasonable bin for time is day, so the probability model is built per that bin size
let processedData = (
T
| extend scope = column_ifexists(scopeColumnName, '')
| extend entity = column_ifexists(entityColumnName, '')
| extend sliceTime = todatetime(column_ifexists(timeColumnName, ''))
| where isnotempty(scope) and isnotempty(entity) and isnotempty(sliceTime)
| extend dataSet = case((sliceTime >= startTraining and sliceTime < startDetection), 'trainSet'
, sliceTime >= startDetection and sliceTime <= endDetection, 'detectSet'
, 'other')
| where dataSet in ('trainSet', 'detectSet')
);
// summarize the data by scope and entity. this will be used to create a distribution of entity appearances based on first seen data
let entityData = (
processedData
| summarize countRowsEntity = count(), firstSeenEntity = min(sliceTime), lastSeenEntity = max(sliceTime), firstSeenSet = arg_min(sliceTime, dataSet)
by scope, entity
| extend firstSeenSet = dataSet
| project-away dataSet
);
// aggregate entity data per scope and get the number of entities appearing over time
let aggregatedCandidateScopeData = (
entityData
| summarize countRowsScope = sum(countRowsEntity), countEntitiesScope = dcount(entity), countEntitiesScopeInTrain = dcountif(entity, firstSeenSet == 'trainSet')
, firstSeenScope = min(firstSeenEntity), lastSeenScope = max(lastSeenEntity), hasNewEntities = iff(dcountif(entity,firstSeenSet == 'detectSet') > 0, 1, 0)
by scope
| extend slicesInTrainingScope = datetime_diff(timePeriodBinSize, startDetection, firstSeenScope)
| where countEntitiesScopeInTrain <= maxEntitiesThresh and slicesInTrainingScope >= minTrainingDaysThresh and lastSeenScope >= startDetection and hasNewEntities == 1
);
let modelData = (
entityData
| join kind = inner (aggregatedCandidateScopeData) on scope
| where firstSeenSet == 'trainSet'
| summarize countAddedEntities = dcount(entity), firstSeenScope = min(firstSeenScope), slicesInTrainingScope = max(slicesInTrainingScope), countEntitiesScope = max(countEntitiesScope)
by scope, firstSeenSetOnScope = firstSeenSet, firstSeenEntityOnScope = firstSeenEntity
| extend diffInDays = datetime_diff(timePeriodBinSize, startDetection, firstSeenEntityOnScope)
// adding exponentially decaying weights to counts
| extend decayingWeight = pow(base = decayParam, exponent = diffInDays)
| extend decayingValue = countAddedEntities * decayingWeight
| summarize newEntityProbability = round(1 - exp(-1.0 * sum(decayingValue)/max(diffInDays)), 4)
, countKnownEntities = sum(countAddedEntities), lastNewEntityTimestamp = max(firstSeenEntityOnScope), slicesOnScope = max(slicesInTrainingScope)///for explainability
by scope, firstSeenSetOnScope
// anomaly score is based on probability to get no new entities, calculated using Poisson distribution (P(X=0) = exp(-avg)) with added decay on average
| extend newEntityAnomalyScore = round(1 - newEntityProbability, 4)
| extend isAnomalousNewEntity = iff(newEntityAnomalyScore >= anomalyScoreThresh, 1, 0)
);
let resultsData = (
processedData
| where dataSet == 'detectSet'
| join kind = inner (modelData) on scope
| join kind = inner (entityData | where firstSeenSet == 'detectSet') on scope, entity, $left.sliceTime == $right.firstSeenEntity
| project-away scope1, scope2, entity1
| where isAnomalousNewEntity == 1
| summarize arg_min(sliceTime, *) by scope, entity
| extend anomalyType = strcat('newEntity_', entityColumnName), anomalyExplainability = strcat('The ', entityColumnName, ' ', entity, ' wasn\'t seen on ', scopeColumnName, ' ', scope, ' during the last ', slicesOnScope, ' ', timePeriodBinSize, 's. Previously, ', countKnownEntities
, ' entities were seen, the last one of them appearing at ', format_datetime(lastNewEntityTimestamp, 'yyyy-MM-dd HH:mm'), '.')
| join kind = leftouter (entityData | where firstSeenSet == 'trainSet' | extend entityFirstSeens = strcat(entity, ' : ', format_datetime(firstSeenEntity, 'yyyy-MM-dd HH:mm')) | sort by scope, firstSeenEntity asc | summarize anomalyState = make_list(entityFirstSeens) by scope) on scope
| project-away scope1
);
resultsData
};
// synthetic data generation
let detectPeriodStart = datetime(2022-04-30 05:00:00.0000000);
let trainPeriodStart = datetime(2022-03-01 05:00);
let names = pack_array("Admin", "Dev1", "Dev2", "IT-support");
let countNames = array_length(names);
let testData = range t from 1 to 24*60 step 1
| extend timeSlice = trainPeriodStart + 1h * t
| extend countEvents = round(2*rand() + iff((t/24)%7>=5, 10.0, 15.0) - (((t%24)/10)*((t%24)/10)), 2) * 100 // generate a series with weekly seasonality
| extend userName = tostring(names[toint(rand(countNames))])
| extend deviceId = hash_md5(rand())
| extend accountName = iff(((rand() < 0.2) and (timeSlice < detectPeriodStart)), 'testEnvironment', 'prodEnvironment')
| extend userName = iff(timeSlice == detectPeriodStart, 'H4ck3r', userName)
| extend deviceId = iff(timeSlice == detectPeriodStart, 'abcdefghijklmnoprtuvwxyz012345678', deviceId)
| sort by timeSlice desc
;
testData
| invoke detect_anomalous_new_entity_fl(entityColumnName = 'userName' //principalName for positive, deviceId for negative
, scopeColumnName = 'accountName'
, timeColumnName = 'timeSlice'
, startTraining = trainPeriodStart
, startDetection = detectPeriodStart
, endDetection = detectPeriodStart
)
Output
Âmbito de aplicação | entidade | fatiaTempo | t | timeSlice | contarEventos | nome de utilizador | deviceId | nomedaconta; | conjunto de dados | firstSeenSetOnScope | newEntityProbability | countKnownEntities | lastNewEntityTimestamp | fatiasOnScope | newEntityAnomalyScore | isAnomalousNewEntity | anomalyType | anomaliaExplicabilidade | anomaliaEstado |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
prodAmbiente | H4ck3r | 2022-04-30 05:00:00.0000000 | 1440 | 2022-04-30 05:00:00.0000000 | 1687 | H4ck3r | abcdefghijklmnoprtuvwxyz012345678 | prodAmbiente | detectSet | Conjunto de comboios | 0.0031 | 4 | 2022-03-01 09:00:00.0000000 | 60 | 0.9969 | 1 | newEntity_userName | O userName H4ck3r não foi visto em accountName prodEnvironment durante os últimos 60 dias. Anteriormente, quatro entidades foram vistas, a última delas aparecendo em 2022-03-01 09:00. | ["Suporte de TI : 2022-03-01 07:00", "Admin : 2022-03-01 08:00", "Dev2 : 2022-03-01 09:00", "Dev1 : 2022-03-01 14:00"] |
A saída da execução da função é a primeira linha vista no conjunto de dados de teste para cada entidade por escopo, filtrada para novas entidades (o que significa que elas não apareceram durante o período de treinamento) que foram marcadas como anômalas (o que significa que a pontuação de anomalia da entidade estava acima de anomalyScoreThresh). Para maior clareza, são acrescentados alguns outros campos:
-
dataSet
: conjunto de dados atual (é sempredetectSet
). -
firstSeenSetOnScope
: conjunto de dados no qual o âmbito foi visto pela primeira vez (deve ser «trainSet»). -
newEntityProbability
: probabilidade de ver qualquer nova entidade com base na estimativa do modelo de Poisson. -
countKnownEntities
: entidades existentes no âmbito de aplicação. -
lastNewEntityTimestamp
: última vez que uma nova entidade foi vista antes da anômala. -
slicesOnScope
: contagem de fatias por escopo. -
newEntityAnomalyScore
: A pontuação de anomalia foi a nova entidade no intervalo [0, 1], valores mais altos significando mais anomalia. -
isAnomalousNewEntity
: sinalizador binário para novas entidades anômalas -
anomalyType
: mostra o tipo de anomalia (útil ao executar várias lógicas de deteção de anomalias juntas). -
anomalyExplainability
: invólucro textual para anomalia gerada e sua explicação. -
anomalyState
: saco de entidades existentes no âmbito com os seus primeiros tempos vistos.
Executar esta função em usuário por conta com parâmetros padrão deteta um usuário invisível e anômalo anteriormente ('H4ck3r') com alta pontuação de anomalia de 0,9969, o que significa que isso é inesperado (devido ao pequeno número de usuários existentes no período de treinamento).
Quando executamos a função com parâmetros padrão em deviceId como entidade, não veremos uma anomalia, devido ao grande número de dispositivos existentes - o que torna a aparência de um novo esperado. No entanto, se baixarmos o parâmetro anomalyScoreThresh para 0,0001 e aumentarmos o parâmetro maxEntitiesThresh para 10000, diminuiremos efetivamente a precisão em favor da recuperação e detetaremos uma anomalia (com uma pontuação de anomalia baixa) no dispositivo 'abcdefghijklmnoprtuvwxyz012345678'.
A saída mostra as entidades anômalas juntamente com campos de explicação em formato padronizado. Esses campos são úteis para investigar a anomalia e para executar a deteção de entidades anômalas em várias entidades ou executar outros algoritmos juntos.
O uso sugerido no contexto de segurança cibernética é executar a função em entidades significativas - como nomes de usuário ou endereços IP - por escopos significativos - como assinatura em contas. Uma nova entidade anômala detetada significa que sua aparência não é esperada no escopo e pode ser suspeita.