你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
graph_path_discovery_fl()
适用于:✅Microsoft Fabric✅Azure 数据资源管理器✅Azure Monitor✅Microsoft Sentinel
通过图形数据(边缘和节点)发现相关终结点(源和目标)之间的有效路径。
函数 graph_path_discovery_fl()
是 UDF(用户定义的函数),可用于通过图形数据发现相关终结点之间的有效路径。 图形数据由节点(例如资源、应用程序或用户)和边缘(例如-现有访问权限)组成。 在网络安全上下文中,此类路径可能表示潜在攻击者可以利用的横向移动路径。 我们对连接终结点的路径感兴趣,这些路径定义为与某些条件相关- 例如,公开了连接到关键目标的源。 根据函数的配置,可以发现适用于其他安全方案的其他类型的路径。
可用作此函数输入的数据是一个边缘表,格式为“SourceId、EdgeId、TargetId”,以及具有可选节点属性的节点列表,可用于定义有效路径。 或者,可以从其他类型的数据中提取图形输入。 例如,具有“用户 A 登录到资源 B”类型的条目的流量日志可以建模为类型“(用户 A)-[已登录]->(资源 B)”的边缘,而不同的用户和资源列表可以建模为节点。
我们假设了以下几个假设:
- 所有边缘都对路径发现有效。 在运行路径发现之前,应筛选掉无关的边缘。
- 边缘不加权、独立且无条件,这意味着所有边缘的概率相同,从 B 移动到 C 并不依赖于以前的从 A 移动到 B。
- 我们想要发现的路径是简单的方向路径,没有周期,类型为 A->B->C。 可以通过更改函数中图形匹配运算符的内部语法来实现更复杂的定义。
可以通过更改函数的内部逻辑来根据需要调整这些假设。
该函数根据可选约束(如路径长度限制、最大输出大小等)发现有效源到有效目标之间的所有可能路径。输出是包含源和目标 ID 的已发现路径的列表,以及连接边缘和节点的列表。 该函数仅使用必填字段,例如节点 ID 和边缘 ID。 如果其他相关字段(例如类型、属性列表、安全相关分数或外部信号)可用于输入数据,可以通过更改函数定义将其添加到逻辑和输出中。
语法
graph_path_discovery_fl(
edgesTableName、nodesTableName、scopeColumnName、isValidPathStartColumnName、isValidPathEndColumnName、nodeIdColumnName,edgeIdColumnName,sourceIdColumnName,targetIdColumnName,[minPathLength], [maxPathLength], [resultCountLimit])
详细了解
参数
名字 | 类型 | 必填 | 描述 |
---|---|---|---|
edgesTableName | string |
✔️ | 包含图形边缘的输入表的名称。 |
nodesTableName | string |
✔️ | 包含图形节点的输入表的名称。 |
scopeColumnName | string |
✔️ | 节点和边缘表中包含分区或作用域(例如订阅或帐户)中的列的名称,以便为每个范围生成不同的异常模型。 |
isValidPathStartColumnName | string |
✔️ | 节点表中包含节点布尔标志的列的名称,True 表示节点是路径的有效起点,False - 无效。 |
isValidPathEndColumnName | string |
✔️ | 节点表中包含节点布尔标志的列的名称,True 表示节点是路径的有效终结点,False - 无效。 |
nodeIdColumnName | string |
✔️ | 包含节点 ID 的节点表中的列的名称。 |
edgeIdColumnName | string |
✔️ | 边缘表中包含边缘 ID 的列的名称。 |
sourceIdColumnName | string |
✔️ | 边缘表中包含边缘的源节点 ID 的列的名称。 |
targetIdColumnName | string |
✔️ | 边缘表中包含边缘的目标节点 ID 的列的名称。 |
minPathLength | long |
路径中的最小步骤数(边缘)。 默认值为 1。 | |
maxPathLength | long |
路径中的最大步骤数(边缘)。 默认值为 8。 | |
resultCountLimit | long |
为输出返回的最大路径数。 默认值为 100000。 |
函数定义
可以通过将函数代码嵌入为查询定义的函数,或将其创建为数据库中的存储函数来定义函数,如下所示:
- 查询定义的
- 存储
使用以下 let 语句定义函数。 不需要任何权限。
let graph_path_discovery_fl = ( edgesTableName:string, nodesTableName:string, scopeColumnName:string
, isValidPathStartColumnName:string, isValidPathEndColumnName:string
, nodeIdColumnName:string, edgeIdColumnName:string, sourceIdColumnName:string, targetIdColumnName:string
, minPathLength:long = 1, maxPathLength:long = 8, resultCountLimit:long = 100000)
{
let edges = (
table(edgesTableName)
| extend sourceId = column_ifexists(sourceIdColumnName, '')
| extend targetId = column_ifexists(targetIdColumnName, '')
| extend edgeId = column_ifexists(edgeIdColumnName, '')
| extend scope = column_ifexists(scopeColumnName, '')
);
let nodes = (
table(nodesTableName)
| extend nodeId = column_ifexists(nodeIdColumnName, '')
| extend isValidPathStart = column_ifexists(isValidPathStartColumnName, '')
| extend isValidPathEnd = column_ifexists(isValidPathEndColumnName, '')
| extend scope = column_ifexists(scopeColumnName, '')
);
let paths = (
edges
// Build graph object partitioned by scope, so that no connections are allowed between scopes.
// In case no scopes are relevant, partitioning should be removed for better performance.
| make-graph sourceId --> targetId with nodes on nodeId partitioned-by scope (
// Look for existing paths between source nodes and target nodes with less than predefined number of hops
// Current configurations looks for directed paths without any cycles; this can be changed if needed
graph-match cycles = none (s)-[e*minPathLength..maxPathLength]->(t)
// Filter only by paths with that connect valid endpoints
where ((s.isValidPathStart) and (t.isValidPathEnd))
project sourceId = s.nodeId
, isSourceValidPathStart = s.isValidPathStart
, targetId = t.nodeId
, isTargetValidPathEnd = t.isValidPathEnd
, scope = s.scope
, edgeIds = e.edgeId
, edgeAllTargetIds = e.targetId
| limit resultCountLimit
)
| extend pathLength = array_length(edgeIds)
, pathId = hash_md5(strcat(sourceId, strcat(edgeIds), targetId))
, pathAllNodeIds = array_concat(pack_array(sourceId), edgeAllTargetIds)
| project-away edgeAllTargetIds
| mv-apply with_itemindex = SortIndex nodesInPath = pathAllNodeIds to typeof(string), edgesInPath = edgeIds to typeof(string) on (
extend step = strcat(
iff(isnotempty(nodesInPath), strcat('(', nodesInPath, ')'), '')
, iff(isnotempty(edgesInPath), strcat('-[', edgesInPath, ']->'), ''))
| summarize fullPath = array_strcat(make_list(step), '')
)
);
paths
};
// Write your query to use the function here.
例
以下示例使用 调用运算符 来运行函数。
- 查询定义的
- 存储
若要使用查询定义的函数,请调用嵌入的函数定义之后。
let graph_path_discovery_fl = ( edgesTableName:string, nodesTableName:string, scopeColumnName:string
, isValidPathStartColumnName:string, isValidPathEndColumnName:string
, nodeIdColumnName:string, edgeIdColumnName:string, sourceIdColumnName:string, targetIdColumnName:string
, minPathLength:long = 1, maxPathLength:long = 8, resultCountLimit:long = 100000)
{
let edges = (
table(edgesTableName)
| extend sourceId = column_ifexists(sourceIdColumnName, '')
| extend targetId = column_ifexists(targetIdColumnName, '')
| extend edgeId = column_ifexists(edgeIdColumnName, '')
| extend scope = column_ifexists(scopeColumnName, '')
);
let nodes = (
table(nodesTableName)
| extend nodeId = column_ifexists(nodeIdColumnName, '')
| extend isValidPathStart = column_ifexists(isValidPathStartColumnName, '')
| extend isValidPathEnd = column_ifexists(isValidPathEndColumnName, '')
| extend scope = column_ifexists(scopeColumnName, '')
);
let paths = (
edges
// Build graph object partitioned by scope, so that no connections are allowed between scopes.
// In case no scopes are relevant, partitioning should be removed for better performance.
| make-graph sourceId --> targetId with nodes on nodeId partitioned-by scope (
// Look for existing paths between source nodes and target nodes with less than predefined number of hops
// Current configurations looks for directed paths without any cycles; this can be changed if needed
graph-match cycles = none (s)-[e*minPathLength..maxPathLength]->(t)
// Filter only by paths with that connect valid endpoints
where ((s.isValidPathStart) and (t.isValidPathEnd))
project sourceId = s.nodeId
, isSourceValidPathStart = s.isValidPathStart
, targetId = t.nodeId
, isTargetValidPathEnd = t.isValidPathEnd
, scope = s.scope
, edgeIds = e.edgeId
, edgeAllTargetIds = e.targetId
| limit resultCountLimit
)
| extend pathLength = array_length(edgeIds)
, pathId = hash_md5(strcat(sourceId, strcat(edgeIds), targetId))
, pathAllNodeIds = array_concat(pack_array(sourceId), edgeAllTargetIds)
| project-away edgeAllTargetIds
| mv-apply with_itemindex = SortIndex nodesInPath = pathAllNodeIds to typeof(string), edgesInPath = edgeIds to typeof(string) on (
extend step = strcat(
iff(isnotempty(nodesInPath), strcat('(', nodesInPath, ')'), '')
, iff(isnotempty(edgesInPath), strcat('-[', edgesInPath, ']->'), ''))
| summarize fullPath = array_strcat(make_list(step), '')
)
);
paths
};
let edges = datatable (SourceNodeName:string, EdgeName:string, EdgeType:string, TargetNodeName:string, Region:string)[
'vm-work-1', 'e1', 'can use', 'webapp-prd', 'US',
'vm-custom', 'e2', 'can use', 'webapp-prd', 'US',
'webapp-prd', 'e3', 'can access', 'vm-custom', 'US',
'webapp-prd', 'e4', 'can access', 'test-machine', 'US',
'vm-custom', 'e5', 'can access', 'server-0126', 'US',
'vm-custom', 'e6', 'can access', 'hub_router', 'US',
'webapp-prd', 'e7', 'can access', 'hub_router', 'US',
'test-machine', 'e8', 'can access', 'vm-custom', 'US',
'test-machine', 'e9', 'can access', 'hub_router', 'US',
'hub_router', 'e10', 'routes traffic to', 'remote_DT', 'US',
'vm-work-1', 'e11', 'can access', 'storage_main_backup', 'US',
'hub_router', 'e12', 'routes traffic to', 'vm-work-2', 'US',
'vm-work-2', 'e13', 'can access', 'backup_prc', 'US',
'remote_DT', 'e14', 'can access', 'backup_prc', 'US',
'backup_prc', 'e15', 'moves data to', 'storage_main_backup', 'US',
'backup_prc', 'e16', 'moves data to', 'storage_DevBox', 'US',
'device_A1', 'e17', 'is connected to', 'sevice_B2', 'EU',
'sevice_B2', 'e18', 'is connected to', 'device_A1', 'EU'
];
let nodes = datatable (NodeName:string, NodeType:string, NodeEnvironment:string, Region:string) [
'vm-work-1', 'Virtual Machine', 'Production', 'US',
'vm-custom', 'Virtual Machine', 'Production', 'US',
'webapp-prd', 'Application', 'None', 'US',
'test-machine', 'Virtual Machine', 'Test', 'US',
'hub_router', 'Traffic Router', 'None', 'US',
'vm-work-2', 'Virtual Machine', 'Production', 'US',
'remote_DT', 'Virtual Machine', 'Production', 'US',
'backup_prc', 'Service', 'Production', 'US',
'server-0126', 'Server', 'Production', 'US',
'storage_main_backup', 'Cloud Storage', 'Production', 'US',
'storage_DevBox', 'Cloud Storage', 'Test', 'US',
'device_A1', 'Device', 'Backend', 'EU',
'device_B2', 'Device', 'Backend', 'EU'
];
let nodesEnriched = (
nodes
| extend IsValidStart = (NodeType == 'Virtual Machine'), IsValidEnd = (NodeType == 'Cloud Storage') // option 1
//| extend IsValidStart = (NodeName in('vm-work-1', 'vm-work-2')), IsValidEnd = (NodeName in('storage_main_backup')) // option 2
//| extend IsValidStart = (NodeEnvironment == 'Test'), IsValidEnd = (NodeEnvironment == 'Production') // option 3
);
graph_path_discovery_fl(edgesTableName = 'edges'
, nodesTableName = 'nodesEnriched'
, scopeColumnName = 'Region'
, nodeIdColumnName = 'NodeName'
, edgeIdColumnName = 'EdgeName'
, sourceIdColumnName = 'SourceNodeName'
, targetIdColumnName = 'TargetNodeName'
, isValidPathStartColumnName = 'IsValidStart'
, isValidPathEndColumnName = 'IsValidEnd'
)
输出
sourceId | isSourceValidPathStart | targetId | isTargetValidPathEnd | 范围 | edgeIds | pathLength | pathId | pathAllNodeIds | fullPath |
---|---|---|---|---|---|---|---|---|---|
test-machine | 真 | storage_DevBox | 真 | 我们 | [“e9”,“e10”,“e14”,“e16”] | 4 | 00605d35b6e1d28024fd846f217b43ac | [“test-machine”,“hub_router”,“remote_DT”,“backup_prc”,“storage_DevBox”] | (test-machine)-[e9]->(hub_router)-[e10]->(remote_DT)-[e14]->(backup_prc)-[e16]->(storage_DevBox) |
运行函数使用输入边缘查找所有路径,这些输入边缘将标记为有效起点(isSourceValidPathStart == True)连接到标记为有效终结点的所有目标(isTargetValidPathEnd == True)。 输出是一个表,其中每一行描述单个路径(仅限 resultCountLimit 参数的最大行数)。 每行包含以下字段:
-
sourceId
:源的 nodeId - 路径中的第一个节点。 -
isSourceValidPathStart
:源节点的布尔标志作为有效路径开始;应等于 True。 -
targetId
:目标节点的 nodeId - 路径中的最后一个节点。 -
isTargetValidPathEnd
:目标节点的布尔标志是有效的路径结尾;应始终等于 True。 -
scope
:包含路径的范围。 -
edgeIds
:路径中边缘的有序列表。 -
pathLength
:路径中的边缘数(跃点)。 -
pathId
:路径终结点和步骤的哈希可用作路径的唯一标识符。 -
pathAllNodeIds
:路径中节点的有序列表。 -
fullPath
:表示完整路径的字符串(源节点)-[edge 1]->(node2)-.....->(目标节点)。
在上面的示例中,我们预处理节点表,并添加多个可能的终结点定义选项。 通过注释/取消注释不同的选项,可以发现多个方案:
- 选项 1:查找虚拟机到云存储资源之间的路径。 在探索节点类型之间的连接模式时非常有用。
- 选项 2:查找任何特定节点(vm-work-1,vm-work-2)到特定节点(storage_main_backup)之间的路径。 可用于调查已知案件,例如从已知已泄露资产到已知关键资产的路径。
- 选项 3:查找节点组之间的路径,例如不同环境中的节点。 可用于监视不安全的路径,例如测试和生产环境之间的路径。
在上面的示例中,我们使用第一个选项来查找 VM 到云存储资源之间的所有路径,这些路径可能供想要访问存储数据的潜在攻击者使用。 通过将更多筛选器添加到有效终结点(例如,将具有已知漏洞的 VM 连接到包含敏感数据的存储帐户)可以增强此方案。
函数 graph_path_discovery_fl()
可用于网络安全域中,通过建模为图形的数据发现有趣的路径,例如横向移动路径。