你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

将 Splunk 检测规则迁移到 Microsoft Sentinel

Splunk 检测规则是安全信息和事件管理 (SIEM) 组件,可与 Microsoft Sentinel 中的分析规则相比。 本文介绍了与识别它们、比较它们和将它们迁移到 Microsoft Sentinel 相关的概念。 最好的方法是从 SIEM 迁移体验开始,该体验可以识别要自动转换成的现成 (OOTB) 分析规则。

如果要迁移 Splunk 可观测性部署,请详细了解如何从 Splunk 迁移到 Azure Monitor 日志

审核规则

Microsoft Sentinel 使用机器学习分析来创建高保真的且可处理的事件。 某些现有 Splunk 检测在 Microsoft Sentinel 中可能是多余的,因此不要盲目迁移它们。 在识别现有检测规则时请查看这些注意事项。

  • 确保选择能够证明规则迁移合理的用例,同时考虑业务优先级和效率。
  • 确认你是否了解 Microsoft Sentinel 规则类型
  • 确认你是否理解规则术语
  • 查看过去 6-12 个月内没有警报的过时规则,并确定这些规则是否仍然相关。
  • 消除你在日常中会忽略的低级别威胁或警报。
  • 确认连接的数据源并查看数据连接方法。 Microsoft Sentinel Analytics 要求该数据类型在启用规则之前便存在于 Log Analytics 工作区中。 重新访问数据收集对话,确保打算检测的用例的数据深度和广度足够。 然后使用 SIEM 迁移体验来确保数据源得到适当映射。

迁移规则

确定要迁移的 Splunk 检测后,请查看迁移过程的以下注意事项:

  • 将 Microsoft Sentinel 的 OOTB 分析规则的现有功能与当前的用例进行比较。 使用 SIEM 迁移体验查看哪些 Splunk 检测会自动转换为 OOTB 模板。
  • 转换与 OOTB 分析规则不一致的检测。 自动转换 Splunk 检测的最佳方式是使用 SIEM 迁移体验
  • 通过探索社区资源(例如 SOC Prime Threat Detection Marketplace),发现更多适合用例的算法。
  • 如果内置规则不可用或未自动转换,则手动转换检测。 创建新的 KQL 查询并查看规则映射

有关详细信息,请参阅迁移检测规则的最佳做法

规则迁移步骤

  1. 验证是否为要迁移的每个规则准备好了一个测试系统。

    1. 为迁移的规则准备验证过程,包括完整测试方案和脚本。

    2. 确保团队具有有用的资源可用于测试迁移的规则。

    3. 确认已连接所需的数据源,并检查数据连接方法

  2. 验证你的检测是否是作为 Microsoft Sentinel 中的 OOTB 模板提供的:

    • 使用 SIEM 迁移体验自动转换和安装 OOTB 模板。

      有关详细信息,请参阅使用 SIEM 迁移体验

    • 如果你的用例未反映在检测中,请使用 OOTB 规则模板为你自己的工作区创建规则。

      在 Microsoft Sentinel 中转到“内容中心”

      筛选“分析规则”模板的“内容类型”

      找到并安装/更新每个相应的内容中心解决方案或独立分析规则模板。

      有关详细信息,请参阅直接检测威胁

    • 如果你的检测是 Microsoft Sentinel 的 OOTB 规则未涵盖的内容,请先尝试使用 SIEM 迁移体验进行自动转换。

    • 如果 OOTB 规则和 SIEM 迁移都无法完全转换该检测,请手动创建规则。 在这种情况下,请使用以下步骤创建规则:

      1. 确定要在规则中使用的数据源。 通过在数据源和数据表之间创建映射表来识别要查询的 Microsoft Sentinel 表。

      2. 确定数据中的要在规则中使用的任何属性、字段或实体。

      3. 确定规则条件和逻辑。 在此阶段,请考虑查找规则模板作为示例来了解如何构造 KQL 查询。

        考虑筛选器、关联规则、活动列表、参考集、监视列表、检测异常情况、聚合等。 可以使用旧式 SIEM 提供的参考来了解如何以最佳方式映射查询语法

      4. 确定触发条件和规则操作,然后构建并检查 KQL 查询。 检查查询时,请考虑 KQL 优化指导资源。

  3. 使用每个相关用例来测试规则。 如果它未提供预期的结果,请检查和编辑 KQL 并再次测试它。

  4. 如果对结果满意,请考虑迁移该规则。 根据需要为规则操作创建一个 playbook。 有关详细信息,请参阅使用 Microsoft Sentinel 中的 playbook 自动响应威胁

详细了解分析规则:

比较规则术语

此表有助于阐明基于 Microsoft Sentinel 中的 Kusto 查询语言 (KQL) 的规则的概念(相比于基于搜索处理语言 (SPL) 的 Splunk 检测)。

Splunk Microsoft Sentinel
规则类型 • 计划
• 实时
• 计划的查询
• 融合
• Microsoft 安全
• 机器学习 (ML) 行为分析
条件 在 SPL 中定义 在 KQL 中定义
触发器条件 • 结果数
• 主机数
• 源数
• 自定义
阈值:查询结果数
操作 • 添加到触发的警报
• 记录事件
• 输出结果以供查找
• 等等
• 创建警报或事件
• 与逻辑应用集成

映射和比较规则示例

在各种方案中使用这些示例来比较 Splunk 和 Microsoft Sentinel 中的规则并将规则从 Splunk 映射到 Microsoft Sentinel。

常见搜索命令

SPL 命令 说明 KQL 运算符 KQL 示例
chart/ timechart 返回时序图表的表格输出结果。 render 运算符 … | render timechart
dedup 删除与指定条件匹配的后续结果。 distinct
summarize
… | summarize by Computer, EventID
eval 计算表达式。 了解常用的 eval 命令 extend T | extend duration = endTime - startTime
fields 从搜索结果中删除字段。 project
project-away
T | project cost=price*quantity, price
head/tail 返回前 N 个或最后 N 个结果。 返回页首 T | top 5 by Name desc nulls last
lookup 从外部源添加字段值。 externaldata
lookup
KQL 示例
rename 重命名字段。 使用通配符指定多个字段。 project-rename T | project-rename new_column_name = column_name
rex 使用正则表达式指定组名来提取字段。 matches regex … | where field matches regex "^addr.*"
search 将结果筛选为与搜索表达式匹配的结果。 search search "X"
sort 按指定字段对搜索结果进行排序。 sort T | sort by strlen(country) asc, price desc
stats 提供按字段(可选)分组的统计信息。 详细了解常见 stats 命令 summarize KQL 示例
mstats 与统计信息类似,用于指标而不是事件。 summarize KQL 示例
table 指定要保留在结果集中的字段,并采用表格格式保留数据。 project T | project columnA, columnB
top/rare 显示字段的最常见值或最不常见值。 返回页首 T | top 5 by Name desc nulls last
transaction 将搜索结果分组为事务。

SPL 示例
示例:row_window_session KQL 示例
eventstats 从事件的字段生成摘要统计信息,并将这些统计信息保存在新字段中。

SPL 示例
示例:
join
make_list
mv-expand
KQL 示例
streamstats 查找字段的累积总和。

SPL 示例:
... | streamstats sum(bytes) as bytes _ total \| timechart
row_cumsum ...\| serialize cs=row_cumsum(bytes)
anomalydetection 查找指定字段中的异常。

SPL 示例
series_decompose_anomalies() KQL 示例
where 使用 eval 表达式筛选搜索结果。 用于比较两个不同的字段。 where T | where fruit=="apple"

lookup 命令:KQL 示例

Users 
| where UserID in ((externaldata (UserID:string) [
@"https://storageaccount.blob.core.windows.net/storagecontainer/users.txt" 
h@"?...SAS..." // Secret token to access the blob 
])) | ... 

stats 命令:KQL 示例

Sales 
| summarize NumTransactions=count(), 
Total=sum(UnitPrice * NumUnits) by Fruit, 
StartOfMonth=startofmonth(SellDateTime) 

mstats 命令:KQL 示例

T | summarize count() by price_range=bin(price, 10.0) 

transaction 命令:SPL 示例

sourcetype=MyLogTable type=Event
| transaction ActivityId startswith="Start" endswith="Stop"
| Rename timestamp as StartTime
| Table City, ActivityId, StartTime, Duration

transaction 命令:KQL 示例

let Events = MyLogTable | where type=="Event";
Events
| where Name == "Start"
| project Name, City, ActivityId, StartTime=timestamp
| join (Events
| where Name == "Stop"
| project StopTime=timestamp, ActivityId)
on ActivityId
| project City, ActivityId, StartTime, 
Duration = StopTime – StartTime

使用 row_window_session() 计算序列化行集中的列的会话开始值。

...| extend SessionStarted = row_window_session(
Timestamp, 1h, 5m, ID != prev(ID))

eventstats 命令:SPL 示例

… | bin span=1m _time
|stats count AS count_i by _time, category
| eventstats sum(count_i) as count_total by _time

eventstats 命令:KQL 示例

下面是 join 语句的示例:

let binSize = 1h;
let detail = SecurityEvent 
| summarize detail_count = count() by EventID,
tbin = bin(TimeGenerated, binSize);
let summary = SecurityEvent
| summarize sum_count = count() by 
tbin = bin(TimeGenerated, binSize);
detail 
| join kind=leftouter (summary) on tbin 
| project-away tbin1

下面是 make_list 语句的示例:

let binSize = 1m;
SecurityEvent
| where TimeGenerated >= ago(24h)
| summarize TotalEvents = count() by EventID, 
groupBin =bin(TimeGenerated, binSize)
|summarize make_list(EventID), make_list(TotalEvents), 
sum(TotalEvents) by groupBin
| mvexpand list_EventID, list_TotalEvents

anomalydetection 命令:SPL 示例

sourcetype=nasdaq earliest=-10y
| anomalydetection Close _ Price

anomalydetection 命令:KQL 示例

let LookBackPeriod= 7d;
let disableAccountLogon=SignIn
| where ResultType == "50057"
| where ResultDescription has "account is disabled";
disableAccountLogon
| make-series Trend=count() default=0 on TimeGenerated 
in range(startofday(ago(LookBackPeriod)), now(), 1d)
| extend (RSquare,Slope,Variance,RVariance,Interception,
LineFit)=series_fit_line(Trend)
| extend (anomalies,score) = 
series_decompose_anomalies(Trend)

常用 eval 命令

SPL 命令 说明 SPL 示例 KQL 命令 KQL 示例
abs(X) 返回 X 的绝对值。 abs(number) abs() abs(X)
case(X,"Y",…) 采用 XY 参数对,其中 X 参数是布尔表达式。 计算结果为 TRUE 时,参数返回相应的 Y 参数。 SPL 示例 case KQL 示例
ceil(X) 数字 X 的上限。 ceil(1.9) ceiling() ceiling(1.9)
cidrmatch("X",Y) 标识属于特定子网的 IP 地址。 cidrmatch
("123.132.32.0/25",ip)
ipv4_is_match()
ipv6_is_match()
ipv4_is_match('192.168.1.1', '192.168.1.255')
== false
coalesce(X,…) 返回不为 null 的第一个值。 coalesce(null(), "Returned val", null()) coalesce() coalesce(tolong("not a number"),
tolong("42"), 33) == 42
cos(X) 计算 X 的余弦。 n=cos(0) cos() cos(X)
exact(X) 使用双精度浮点算法计算表达式 X。 exact(3.14*num) todecimal() todecimal(3.14*2)
exp(X) 返回 eX。 exp(3) exp() exp(3)
if(X,Y,Z) 如果 X 计算结果为 TRUE,则结果为第二个参数 Y。 如果 X 计算结果为 FALSE,则结果为第三个参数 Z if(error==200,
"OK", "Error")
iif() KQL 示例
isbool(X) 如果 X 为布尔值,则返回 TRUE isbool(field) iif()
gettype
iif(gettype(X) =="bool","TRUE","FALSE")
isint(X) 如果 X 为整数,则返回 TRUE isint(field) iif()
gettype
KQL 示例
isnull(X) 如果 X 为 null,则返回 TRUE isnull(field) isnull() isnull(field)
isstr(X) 如果 X 为字符串,则返回 TRUE isstr(field) iif()
gettype
KQL 示例
len(X) 此函数返回字符串 X 的字符长度。 len(field) strlen() strlen(field)
like(X,"y") 当且仅当 X 类似于 Y 中的 SQLite 模式时,返回 TRUE like(field, "addr%") has
contains
startswith
matches regex
KQL 示例
log(X,Y) 使用第二个参数 Y 作为基准返回第一个参数 X 的日志。 Y 的默认值为 10 log(number,2) log
log2
log10
log(X)

log2(X)

log10(X)
lower(X) 返回 X 的小写值。 lower(username) tolower tolower(username)
ltrim(X,Y) 返回 X,其中参数 Y 中的字符左侧被截断。 Y 的默认输出为空格和制表符。 ltrim(" ZZZabcZZ ", " Z") trim_start() trim_start(“ ZZZabcZZ”,” ZZZ”)
match(X,Y) 如果 X 与正则表达式模式 Y 匹配,则返回。 match(field, "^\d{1,3}.\d$") matches regex … | where field matches regex @"^\d{1,3}.\d$")
max(X,…) 返回列中的最大值。 max(delay, mydelay) max()
arg_max()
… | summarize max(field)
md5(X) 返回字符串值 X 的 MD5 哈希。 md5(field) hash_md5 hash_md5("X")
min(X,…) 返回列中的最小值。 min(delay, mydelay) min_of()
min()
arg_min
KQL 示例
mvcount(X) 返回 X 值的总数。 mvcount(multifield) dcount …| summarize dcount(X) by Y
mvfilter(X) 基于布尔 X 表达式筛选多值字段。 mvfilter(match(email, "net$")) mv-apply KQL 示例
mvindex(X,Y,Z) 返回多值 X 参数从起点(以零为基础)YZ(可选)的子集。 mvindex( multifield, 2) array_slice array_slice(arr, 1, 2)
mvjoin(X,Y) 给定多值字段 X 和字符串分隔符 Y,并使用 Y 联接 X 的各个值。 mvjoin(address, ";") strcat_array KQL 示例
now() 返回用 Unix 时间表示的当前时间。 now() now() now()

now(-2d)
null() 不接受参数并返回 NULL null() null null
nullif(X,Y) 包括两个参数 XY,如果参数不同则返回 X。 否则返回 NULL nullif(fieldA, fieldB) iif iif(fieldA==fieldB, null, fieldA)
random() 返回介于 02147483647 之间的伪随机数。 random() rand() rand()
relative_ time(X,Y) 给定纪元时间 X 和相对时间说明符 Y,返回应用于 XY 的纪元时间值。 relative_time(now(),"-1d@d") unix time KQL 示例
replace(X,Y,Z) 返回为字符串 X 中正则表达式字符串 Y 的每个匹配项替换字符串 Z 而得到的字符串。 返回月份和天互换的日期。
例如,输入为 4/30/2015,则输出为 30/4/2009

replace(date, "^(\d{1,2})/ (\d{1,2})/", "\2/\1/")
replace() KQL 示例
round(X,Y) 返回舍入到由 Y 指定的小数位数的 X。 默认为舍入为整数。 round(3.5) round round(3.5)
rtrim(X,Y) 返回 X,其中 Y 的字符右侧被截断。 如果未指定 Y,则截断空格和制表符。 rtrim(" ZZZZabcZZ ", " Z") trim_end() trim_end(@"[ Z]+",A)
searchmatch(X) 如果事件与搜索字符串 X 匹配,则返回 TRUE searchmatch("foo AND bar") iif() iif(field has "X","Yes","No")
split(X,"Y") 以多值字段的形式返回 X,由分隔符 Y 分隔。 split(address, ";") split() split(address, ";")
sqrt(X) 返回 X 的平方根。 sqrt(9) sqrt() sqrt(9)
strftime(X,Y) 返回使用 Y 所指定的格式来呈现的纪元时间值 X strftime(_time, "%H:%M") format_datetime() format_datetime(time,'HH:mm')
strptime(X,Y) 给定由字符串 X 表示的时间,返回从格式 Y 分析的值。 strptime(timeStr, "%H:%M") format_datetime() KQL 示例
substr(X,Y,Z) 返回从起点(基于一)Y 算起共 Z(可选)个字符的子字符串字段 X substr("string", 1, 3) substring() substring("string", 0, 3)
time() 返回精确到微秒的时钟时间。 time() format_datetime() KQL 示例
tonumber(X,Y) 将输入字符串 X 转换为数字,其中 Y(可选,默认值为 10)定义要转换为的数字的基准。 tonumber("0A4",16) toint() toint("123")
tostring(X,Y) 描述 SPL 示例 tostring() tostring(123)
typeof(X) 返回字段类型的字符串表示形式。 typeof(12) gettype() gettype(12)
urldecode(X) 返回解码的 URL X SPL 示例 url_decode KQL 示例

case(X,"Y",…) SPL 示例

case(error == 404, "Not found",
error == 500,"Internal Server Error",
error == 200, "OK")

case(X,"Y",…) KQL 示例

T
| extend Message = case(error == 404, "Not found", 
error == 500,"Internal Server Error", "OK") 

if(X,Y,Z) KQL 示例

iif(floor(Timestamp, 1d)==floor(now(), 1d), 
"today", "anotherday")

isint(X) KQL 示例

iif(gettype(X) =="long","TRUE","FALSE")

isstr(X) KQL 示例

iif(gettype(X) =="string","TRUE","FALSE")

like(X,"y") 实例

… | where field has "addr"

… | where field contains "addr"

… | where field startswith "addr"

… | where field matches regex "^addr.*"

min(X,…) KQL 示例

min_of (expr_1, expr_2 ...)

…|summarize min(expr)

…| summarize arg_min(Price,*) by Product

mvfilter(X) KQL 示例

T | mv-apply Metric to typeof(real) on 
(
 top 2 by Metric desc
)

mvjoin(X,Y) KQL 示例

strcat_array(dynamic([1, 2, 3]), "->")

relative time(X,Y) KQL 示例

let toUnixTime = (dt:datetime)
{
(dt - datetime(1970-01-01))/1s 
};

replace(X,Y,Z) KQL 示例

replace( @'^(\d{1,2})/(\d{1,2})/', @'\2/\1/',date)

strptime(X,Y) KQL 示例

format_datetime(datetime('2017-08-16 11:25:10'),
'HH:mm')

time() KQL 示例

format_datetime(datetime(2015-12-14 02:03:04),
'h:m:s')

tostring(X,Y)

以字符串形式返回 X 的字段值。

  • 如果 X 的值为数字,则将 X 的格式重设为字符串值。
  • 如果 X 为布尔值,则将 X 的格式重设为 TRUEFALSE
  • 如果 X 为数字,则第二个参数 Y 是可选的,可以是 hex(将 X 转换为十六进制)、commas(采用逗号设置 X 的格式,有两个小数位)或者 duration(将 X 从以秒为单位的时间格式转换为可读时间格式:HH:MM:SS)。
tostring(X,Y) SPL 示例

此示例返回:

foo=615 and foo2=00:10:15:

… | eval foo=615 | eval foo2 = tostring(
foo, "duration")

urldecode(X) SPL 示例

urldecode("http%3A%2F%2Fwww.splunk.com%2Fdownload%3Fr%3Dheader")

常用 stats 命令 KQL 示例

SPL 命令 说明 KQL 命令 KQL 示例
avg(X) 返回字段 X 的平均值。 avg() avg(X)
count(X) 返回字段 X 出现的次数。 若要指示要匹配的特定字段值,将 X 的格式设为 eval(field="value") count() summarize count()
dc(X) 返回字段 X 的非重复值的计数。 dcount() …\| summarize countries=dcount(country) by continent
earliest(X) 返回 X 按时间顺序最早出现的值。 arg_min() … \| summarize arg_min(TimeGenerated, *) by X
latest(X) 返回 X 按时间顺序最晚出现的值。 arg_max() … \| summarize arg_max(TimeGenerated, *) by X
max(X) 返回字段 X 的最大值。 如果 X 的值非数值,则按字母顺序排序找到最大值。 max() …\| summarize max(X)
median(X) 返回字段 X 的最中间的值。 percentile() …\| summarize percentile(X, 50)
min(X) 返回字段 X 的最小值。 如果 X 的值非数值,则按字母顺序排序找到最小值。 min() …\| summarize min(X)
mode(X) 返回字段 X 最常出现的值。 top-hitters() …\| top-hitters 1 of Y by X
perc(Y) 返回字段 YX 百分位值。 例如,perc5(total) 返回字段 total 的第五百分位值。 percentile() …\| summarize percentile(Y, 5)
range(X) 返回字段 X 的最大值和最小值之间的差。 range() range(1, 3)
stdev(X) 返回字段 X 的样本标准偏差。 stdev stdev()
stdevp(X) 返回字段 X 的总体标准偏差。 stdevp() stdevp()
sum(X) 返回字段 X 的值的总和。 sum() sum(X)
sumsq(X) 返回字段 X 的值的平方和。
values(X) 以多值条目的形式返回字段 X 的所有非重复值的列表。 值的顺序按字母顺序排列。 make_set() …\| summarize r = make_set(X)
var(X) 返回字段 X 的样本方差。 variance variance(X)

后续步骤

在本文中,你已了解如何将迁移规则从 Splunk 映射到 Microsoft Sentinel。