你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
将 Splunk 检测规则迁移到 Microsoft Sentinel
Splunk 检测规则是安全信息和事件管理 (SIEM) 组件,可与 Microsoft Sentinel 中的分析规则相比。 本文介绍了与识别它们、比较它们和将它们迁移到 Microsoft Sentinel 相关的概念。 最好的方法是从 SIEM 迁移体验开始,该体验可以识别要自动转换成的现成 (OOTB) 分析规则。
如果要迁移 Splunk 可观测性部署,请详细了解如何从 Splunk 迁移到 Azure Monitor 日志。
审核规则
Microsoft Sentinel 使用机器学习分析来创建高保真的且可处理的事件。 某些现有 Splunk 检测在 Microsoft Sentinel 中可能是多余的,因此不要盲目迁移它们。 在识别现有检测规则时请查看这些注意事项。
- 确保选择能够证明规则迁移合理的用例,同时考虑业务优先级和效率。
- 确认你是否了解 Microsoft Sentinel 规则类型。
- 确认你是否理解规则术语。
- 查看过去 6-12 个月内没有警报的过时规则,并确定这些规则是否仍然相关。
- 消除你在日常中会忽略的低级别威胁或警报。
- 确认连接的数据源并查看数据连接方法。 Microsoft Sentinel Analytics 要求该数据类型在启用规则之前便存在于 Log Analytics 工作区中。 重新访问数据收集对话,确保打算检测的用例的数据深度和广度足够。 然后使用 SIEM 迁移体验来确保数据源得到适当映射。
迁移规则
确定要迁移的 Splunk 检测后,请查看迁移过程的以下注意事项:
- 将 Microsoft Sentinel 的 OOTB 分析规则的现有功能与当前的用例进行比较。 使用 SIEM 迁移体验查看哪些 Splunk 检测会自动转换为 OOTB 模板。
- 转换与 OOTB 分析规则不一致的检测。 自动转换 Splunk 检测的最佳方式是使用 SIEM 迁移体验。
- 通过探索社区资源(例如 SOC Prime Threat Detection Marketplace),发现更多适合用例的算法。
- 如果内置规则不可用或未自动转换,则手动转换检测。 创建新的 KQL 查询并查看规则映射。
有关详细信息,请参阅迁移检测规则的最佳做法。
规则迁移步骤
验证是否为要迁移的每个规则准备好了一个测试系统。
为迁移的规则准备验证过程,包括完整测试方案和脚本。
确保团队具有有用的资源可用于测试迁移的规则。
确认已连接所需的数据源,并检查数据连接方法。
验证你的检测是否是作为 Microsoft Sentinel 中的 OOTB 模板提供的:
使用 SIEM 迁移体验自动转换和安装 OOTB 模板。
有关详细信息,请参阅使用 SIEM 迁移体验。
如果你的用例未反映在检测中,请使用 OOTB 规则模板为你自己的工作区创建规则。
在 Microsoft Sentinel 中转到“内容中心”。
筛选“分析规则”模板的“内容类型”。
找到并安装/更新每个相应的内容中心解决方案或独立分析规则模板。
有关详细信息,请参阅直接检测威胁。
如果你的检测是 Microsoft Sentinel 的 OOTB 规则未涵盖的内容,请先尝试使用 SIEM 迁移体验进行自动转换。
如果 OOTB 规则和 SIEM 迁移都无法完全转换该检测,请手动创建规则。 在这种情况下,请使用以下步骤创建规则:
确定要在规则中使用的数据源。 通过在数据源和数据表之间创建映射表来识别要查询的 Microsoft Sentinel 表。
确定数据中的要在规则中使用的任何属性、字段或实体。
确定规则条件和逻辑。 在此阶段,请考虑查找规则模板作为示例来了解如何构造 KQL 查询。
考虑筛选器、关联规则、活动列表、参考集、监视列表、检测异常情况、聚合等。 可以使用旧式 SIEM 提供的参考来了解如何以最佳方式映射查询语法。
确定触发条件和规则操作,然后构建并检查 KQL 查询。 检查查询时,请考虑 KQL 优化指导资源。
使用每个相关用例来测试规则。 如果它未提供预期的结果,请检查和编辑 KQL 并再次测试它。
如果对结果满意,请考虑迁移该规则。 根据需要为规则操作创建一个 playbook。 有关详细信息,请参阅使用 Microsoft Sentinel 中的 playbook 自动响应威胁。
详细了解分析规则:
- 创建自定义分析规则以检测威胁。 使用警报分组,通过将给定时间范围内发生的警报分组到一起来减轻警报疲劳。
- 将数据字段映射到 Microsoft Sentinel 中的实体,使 SOC 工程师能够将实体定义为在调查期间要跟踪的证据的一部分。 实体映射还可让 SOC 分析师利用直观的 [调查图] (investigate-cases.md#use-the-investigation-graph-to-deep-dive) 来帮助减少时间和工作量。
- 使用 UEBA 数据调查事件,例如,使用证据来显示与事件预览窗格中特定事件关联的事件、警报和任何书签。
- Kusto 查询语言 (KQL),可用于将只读请求发送到 Log Analytics 数据库,以处理数据并返回结果。 还可以在 Microsoft Defender for Endpoint 和 Application Insights 等其他 Microsoft 服务中使用 KQL。
比较规则术语
此表有助于阐明基于 Microsoft Sentinel 中的 Kusto 查询语言 (KQL) 的规则的概念(相比于基于搜索处理语言 (SPL) 的 Splunk 检测)。
Splunk | Microsoft Sentinel | |
---|---|---|
规则类型 | • 计划 • 实时 |
• 计划的查询 • 融合 • Microsoft 安全 • 机器学习 (ML) 行为分析 |
条件 | 在 SPL 中定义 | 在 KQL 中定义 |
触发器条件 | • 结果数 • 主机数 • 源数 • 自定义 |
阈值:查询结果数 |
操作 | • 添加到触发的警报 • 记录事件 • 输出结果以供查找 • 等等 |
• 创建警报或事件 • 与逻辑应用集成 |
映射和比较规则示例
在各种方案中使用这些示例来比较 Splunk 和 Microsoft Sentinel 中的规则并将规则从 Splunk 映射到 Microsoft Sentinel。
常见搜索命令
SPL 命令 | 说明 | KQL 运算符 | KQL 示例 |
---|---|---|---|
chart/ timechart |
返回时序图表的表格输出结果。 | render 运算符 | … | render timechart |
dedup |
删除与指定条件匹配的后续结果。 | • distinct • summarize |
… | summarize by Computer, EventID |
eval |
计算表达式。 了解常用的 eval 命令。 |
extend | T | extend duration = endTime - startTime |
fields |
从搜索结果中删除字段。 | • project • project-away |
T | project cost=price*quantity, price |
head/tail |
返回前 N 个或最后 N 个结果。 | 返回页首 | T | top 5 by Name desc nulls last |
lookup |
从外部源添加字段值。 | • externaldata • lookup |
KQL 示例 |
rename |
重命名字段。 使用通配符指定多个字段。 | project-rename | T | project-rename new_column_name = column_name |
rex |
使用正则表达式指定组名来提取字段。 | matches regex | … | where field matches regex "^addr.*" |
search |
将结果筛选为与搜索表达式匹配的结果。 | search | search "X" |
sort |
按指定字段对搜索结果进行排序。 | sort | T | sort by strlen(country) asc, price desc |
stats |
提供按字段(可选)分组的统计信息。 详细了解常见 stats 命令。 | summarize | KQL 示例 |
mstats |
与统计信息类似,用于指标而不是事件。 | summarize | KQL 示例 |
table |
指定要保留在结果集中的字段,并采用表格格式保留数据。 | project | T | project columnA, columnB |
top/rare |
显示字段的最常见值或最不常见值。 | 返回页首 | T | top 5 by Name desc nulls last |
transaction |
将搜索结果分组为事务。 SPL 示例 |
示例:row_window_session | KQL 示例 |
eventstats |
从事件的字段生成摘要统计信息,并将这些统计信息保存在新字段中。 SPL 示例 |
示例: • join • make_list • mv-expand |
KQL 示例 |
streamstats |
查找字段的累积总和。 SPL 示例: ... | streamstats sum(bytes) as bytes _ total \| timechart |
row_cumsum | ...\| serialize cs=row_cumsum(bytes) |
anomalydetection |
查找指定字段中的异常。 SPL 示例 |
series_decompose_anomalies() | KQL 示例 |
where |
使用 eval 表达式筛选搜索结果。 用于比较两个不同的字段。 |
where | T | where fruit=="apple" |
lookup
命令:KQL 示例
Users
| where UserID in ((externaldata (UserID:string) [
@"https://storageaccount.blob.core.windows.net/storagecontainer/users.txt"
h@"?...SAS..." // Secret token to access the blob
])) | ...
stats
命令:KQL 示例
Sales
| summarize NumTransactions=count(),
Total=sum(UnitPrice * NumUnits) by Fruit,
StartOfMonth=startofmonth(SellDateTime)
mstats
命令:KQL 示例
T | summarize count() by price_range=bin(price, 10.0)
transaction
命令:SPL 示例
sourcetype=MyLogTable type=Event
| transaction ActivityId startswith="Start" endswith="Stop"
| Rename timestamp as StartTime
| Table City, ActivityId, StartTime, Duration
transaction
命令:KQL 示例
let Events = MyLogTable | where type=="Event";
Events
| where Name == "Start"
| project Name, City, ActivityId, StartTime=timestamp
| join (Events
| where Name == "Stop"
| project StopTime=timestamp, ActivityId)
on ActivityId
| project City, ActivityId, StartTime,
Duration = StopTime – StartTime
使用 row_window_session()
计算序列化行集中的列的会话开始值。
...| extend SessionStarted = row_window_session(
Timestamp, 1h, 5m, ID != prev(ID))
eventstats
命令:SPL 示例
… | bin span=1m _time
|stats count AS count_i by _time, category
| eventstats sum(count_i) as count_total by _time
eventstats
命令:KQL 示例
下面是 join
语句的示例:
let binSize = 1h;
let detail = SecurityEvent
| summarize detail_count = count() by EventID,
tbin = bin(TimeGenerated, binSize);
let summary = SecurityEvent
| summarize sum_count = count() by
tbin = bin(TimeGenerated, binSize);
detail
| join kind=leftouter (summary) on tbin
| project-away tbin1
下面是 make_list
语句的示例:
let binSize = 1m;
SecurityEvent
| where TimeGenerated >= ago(24h)
| summarize TotalEvents = count() by EventID,
groupBin =bin(TimeGenerated, binSize)
|summarize make_list(EventID), make_list(TotalEvents),
sum(TotalEvents) by groupBin
| mvexpand list_EventID, list_TotalEvents
anomalydetection
命令:SPL 示例
sourcetype=nasdaq earliest=-10y
| anomalydetection Close _ Price
anomalydetection
命令:KQL 示例
let LookBackPeriod= 7d;
let disableAccountLogon=SignIn
| where ResultType == "50057"
| where ResultDescription has "account is disabled";
disableAccountLogon
| make-series Trend=count() default=0 on TimeGenerated
in range(startofday(ago(LookBackPeriod)), now(), 1d)
| extend (RSquare,Slope,Variance,RVariance,Interception,
LineFit)=series_fit_line(Trend)
| extend (anomalies,score) =
series_decompose_anomalies(Trend)
常用 eval
命令
SPL 命令 | 说明 | SPL 示例 | KQL 命令 | KQL 示例 |
---|---|---|---|---|
abs(X) |
返回 X 的绝对值。 | abs(number) |
abs() |
abs(X) |
case(X,"Y",…) |
采用 X 和 Y 参数对,其中 X 参数是布尔表达式。 计算结果为 TRUE 时,参数返回相应的 Y 参数。 |
SPL 示例 | case |
KQL 示例 |
ceil(X) |
数字 X 的上限。 | ceil(1.9) |
ceiling() |
ceiling(1.9) |
cidrmatch("X",Y) |
标识属于特定子网的 IP 地址。 | cidrmatch ("123.132.32.0/25",ip) |
• ipv4_is_match() • ipv6_is_match() |
ipv4_is_match('192.168.1.1', '192.168.1.255') == false |
coalesce(X,…) |
返回不为 null 的第一个值。 | coalesce(null(), "Returned val", null()) |
coalesce() |
coalesce(tolong("not a number"), tolong("42"), 33) == 42 |
cos(X) |
计算 X 的余弦。 | n=cos(0) |
cos() | cos(X) |
exact(X) |
使用双精度浮点算法计算表达式 X。 | exact(3.14*num) |
todecimal() |
todecimal(3.14*2) |
exp(X) |
返回 eX。 | exp(3) |
exp() | exp(3) |
if(X,Y,Z) |
如果 X 计算结果为 TRUE ,则结果为第二个参数 Y 。 如果 X 计算结果为 FALSE ,则结果为第三个参数 Z 。 |
if(error==200, "OK", "Error") |
iif() |
KQL 示例 |
isbool(X) |
如果 X 为布尔值,则返回 TRUE 。 |
isbool(field) |
• iif() • gettype |
iif(gettype(X) =="bool","TRUE","FALSE") |
isint(X) |
如果 X 为整数,则返回 TRUE 。 |
isint(field) |
• iif() • gettype |
KQL 示例 |
isnull(X) |
如果 X 为 null,则返回 TRUE 。 |
isnull(field) |
isnull() |
isnull(field) |
isstr(X) |
如果 X 为字符串,则返回 TRUE 。 |
isstr(field) |
• iif() • gettype |
KQL 示例 |
len(X) |
此函数返回字符串 X 的字符长度。 |
len(field) |
strlen() |
strlen(field) |
like(X,"y") |
当且仅当 X 类似于 Y 中的 SQLite 模式时,返回 TRUE 。 |
like(field, "addr%") |
• has • contains • startswith • matches regex |
KQL 示例 |
log(X,Y) |
使用第二个参数 Y 作为基准返回第一个参数 X 的日志。 Y 的默认值为 10 。 |
log(number,2) |
• log • log2 • log10 |
log(X) log2(X) log10(X) |
lower(X) |
返回 X 的小写值。 |
lower(username) |
tolower | tolower(username) |
ltrim(X,Y) |
返回 X ,其中参数 Y 中的字符左侧被截断。 Y 的默认输出为空格和制表符。 |
ltrim(" ZZZabcZZ ", " Z") |
trim_start() |
trim_start(“ ZZZabcZZ”,” ZZZ”) |
match(X,Y) |
如果 X 与正则表达式模式 Y 匹配,则返回。 | match(field, "^\d{1,3}.\d$") |
matches regex |
… | where field matches regex @"^\d{1,3}.\d$") |
max(X,…) |
返回列中的最大值。 | max(delay, mydelay) |
• max() • arg_max() |
… | summarize max(field) |
md5(X) |
返回字符串值 X 的 MD5 哈希。 |
md5(field) |
hash_md5 |
hash_md5("X") |
min(X,…) |
返回列中的最小值。 | min(delay, mydelay) |
• min_of() • min() • arg_min |
KQL 示例 |
mvcount(X) |
返回 X 值的总数。 |
mvcount(multifield) |
dcount |
…| summarize dcount(X) by Y |
mvfilter(X) |
基于布尔 X 表达式筛选多值字段。 |
mvfilter(match(email, "net$")) |
mv-apply |
KQL 示例 |
mvindex(X,Y,Z) |
返回多值 X 参数从起点(以零为基础)Y 到 Z (可选)的子集。 |
mvindex( multifield, 2) |
array_slice |
array_slice(arr, 1, 2) |
mvjoin(X,Y) |
给定多值字段 X 和字符串分隔符 Y ,并使用 Y 联接 X 的各个值。 |
mvjoin(address, ";") |
strcat_array |
KQL 示例 |
now() |
返回用 Unix 时间表示的当前时间。 | now() |
now() |
now() now(-2d) |
null() |
不接受参数并返回 NULL 。 |
null() |
null | null |
nullif(X,Y) |
包括两个参数 X 和 Y ,如果参数不同则返回 X 。 否则返回 NULL 。 |
nullif(fieldA, fieldB) |
iif |
iif(fieldA==fieldB, null, fieldA) |
random() |
返回介于 0 和 2147483647 之间的伪随机数。 |
random() |
rand() |
rand() |
relative_ time(X,Y) |
给定纪元时间 X 和相对时间说明符 Y ,返回应用于 X 的 Y 的纪元时间值。 |
relative_time(now(),"-1d@d") |
unix time | KQL 示例 |
replace(X,Y,Z) |
返回为字符串 X 中正则表达式字符串 Y 的每个匹配项替换字符串 Z 而得到的字符串。 |
返回月份和天互换的日期。 例如,输入为 4/30/2015 ,则输出为 30/4/2009 :replace(date, "^(\d{1,2})/ (\d{1,2})/", "\2/\1/") |
replace() |
KQL 示例 |
round(X,Y) |
返回舍入到由 Y 指定的小数位数的 X 。 默认为舍入为整数。 |
round(3.5) |
round |
round(3.5) |
rtrim(X,Y) |
返回 X ,其中 Y 的字符右侧被截断。 如果未指定 Y ,则截断空格和制表符。 |
rtrim(" ZZZZabcZZ ", " Z") |
trim_end() |
trim_end(@"[ Z]+",A) |
searchmatch(X) |
如果事件与搜索字符串 X 匹配,则返回 TRUE 。 |
searchmatch("foo AND bar") |
iif() | iif(field has "X","Yes","No") |
split(X,"Y") |
以多值字段的形式返回 X ,由分隔符 Y 分隔。 |
split(address, ";") |
split() |
split(address, ";") |
sqrt(X) |
返回 X 的平方根。 |
sqrt(9) |
sqrt() |
sqrt(9) |
strftime(X,Y) |
返回使用 Y 所指定的格式来呈现的纪元时间值 X 。 |
strftime(_time, "%H:%M") |
format_datetime() |
format_datetime(time,'HH:mm') |
strptime(X,Y) |
给定由字符串 X 表示的时间,返回从格式 Y 分析的值。 |
strptime(timeStr, "%H:%M") |
format_datetime() | KQL 示例 |
substr(X,Y,Z) |
返回从起点(基于一)Y 算起共 Z (可选)个字符的子字符串字段 X 。 |
substr("string", 1, 3) |
substring() |
substring("string", 0, 3) |
time() |
返回精确到微秒的时钟时间。 | time() |
format_datetime() |
KQL 示例 |
tonumber(X,Y) |
将输入字符串 X 转换为数字,其中 Y (可选,默认值为 10 )定义要转换为的数字的基准。 |
tonumber("0A4",16) |
toint() |
toint("123") |
tostring(X,Y) |
描述 | SPL 示例 | tostring() |
tostring(123) |
typeof(X) |
返回字段类型的字符串表示形式。 | typeof(12) |
gettype() |
gettype(12) |
urldecode(X) |
返回解码的 URL X 。 |
SPL 示例 | url_decode |
KQL 示例 |
case(X,"Y",…)
SPL 示例
case(error == 404, "Not found",
error == 500,"Internal Server Error",
error == 200, "OK")
case(X,"Y",…)
KQL 示例
T
| extend Message = case(error == 404, "Not found",
error == 500,"Internal Server Error", "OK")
if(X,Y,Z)
KQL 示例
iif(floor(Timestamp, 1d)==floor(now(), 1d),
"today", "anotherday")
isint(X)
KQL 示例
iif(gettype(X) =="long","TRUE","FALSE")
isstr(X)
KQL 示例
iif(gettype(X) =="string","TRUE","FALSE")
like(X,"y")
实例
… | where field has "addr"
… | where field contains "addr"
… | where field startswith "addr"
… | where field matches regex "^addr.*"
min(X,…)
KQL 示例
min_of (expr_1, expr_2 ...)
…|summarize min(expr)
…| summarize arg_min(Price,*) by Product
mvfilter(X)
KQL 示例
T | mv-apply Metric to typeof(real) on
(
top 2 by Metric desc
)
mvjoin(X,Y)
KQL 示例
strcat_array(dynamic([1, 2, 3]), "->")
relative time(X,Y)
KQL 示例
let toUnixTime = (dt:datetime)
{
(dt - datetime(1970-01-01))/1s
};
replace(X,Y,Z)
KQL 示例
replace( @'^(\d{1,2})/(\d{1,2})/', @'\2/\1/',date)
strptime(X,Y)
KQL 示例
format_datetime(datetime('2017-08-16 11:25:10'),
'HH:mm')
time()
KQL 示例
format_datetime(datetime(2015-12-14 02:03:04),
'h:m:s')
tostring(X,Y)
以字符串形式返回 X
的字段值。
- 如果
X
的值为数字,则将X
的格式重设为字符串值。 - 如果
X
为布尔值,则将X
的格式重设为TRUE
或FALSE
。 - 如果
X
为数字,则第二个参数Y
是可选的,可以是hex
(将X
转换为十六进制)、commas
(采用逗号设置X
的格式,有两个小数位)或者duration
(将X
从以秒为单位的时间格式转换为可读时间格式:HH:MM:SS
)。
tostring(X,Y)
SPL 示例
此示例返回:
foo=615 and foo2=00:10:15:
… | eval foo=615 | eval foo2 = tostring(
foo, "duration")
urldecode(X)
SPL 示例
urldecode("http%3A%2F%2Fwww.splunk.com%2Fdownload%3Fr%3Dheader")
常用 stats
命令 KQL 示例
SPL 命令 | 说明 | KQL 命令 | KQL 示例 |
---|---|---|---|
avg(X) |
返回字段 X 的平均值。 |
avg() | avg(X) |
count(X) |
返回字段 X 出现的次数。 若要指示要匹配的特定字段值,将 X 的格式设为 eval(field="value") 。 |
count() | summarize count() |
dc(X) |
返回字段 X 的非重复值的计数。 |
dcount() | …\| summarize countries=dcount(country) by continent |
earliest(X) |
返回 X 按时间顺序最早出现的值。 |
arg_min() | … \| summarize arg_min(TimeGenerated, *) by X |
latest(X) |
返回 X 按时间顺序最晚出现的值。 |
arg_max() | … \| summarize arg_max(TimeGenerated, *) by X |
max(X) |
返回字段 X 的最大值。 如果 X 的值非数值,则按字母顺序排序找到最大值。 |
max() | …\| summarize max(X) |
median(X) |
返回字段 X 的最中间的值。 |
percentile() | …\| summarize percentile(X, 50) |
min(X) |
返回字段 X 的最小值。 如果 X 的值非数值,则按字母顺序排序找到最小值。 |
min() | …\| summarize min(X) |
mode(X) |
返回字段 X 最常出现的值。 |
top-hitters() | …\| top-hitters 1 of Y by X |
perc(Y) |
返回字段 Y 的 X 百分位值。 例如,perc5(total) 返回字段 total 的第五百分位值。 |
percentile() | …\| summarize percentile(Y, 5) |
range(X) |
返回字段 X 的最大值和最小值之间的差。 |
range() | range(1, 3) |
stdev(X) |
返回字段 X 的样本标准偏差。 |
stdev | stdev() |
stdevp(X) |
返回字段 X 的总体标准偏差。 |
stdevp() | stdevp() |
sum(X) |
返回字段 X 的值的总和。 |
sum() | sum(X) |
sumsq(X) |
返回字段 X 的值的平方和。 |
||
values(X) |
以多值条目的形式返回字段 X 的所有非重复值的列表。 值的顺序按字母顺序排列。 |
make_set() | …\| summarize r = make_set(X) |
var(X) |
返回字段 X 的样本方差。 |
variance | variance(X) |
后续步骤
在本文中,你已了解如何将迁移规则从 Splunk 映射到 Microsoft Sentinel。