Freigeben über


graph_path_discovery_fl()

Gilt für: ✅Microsoft FabricAzure Data ExplorerAzure MonitorMicrosoft Sentinel

Ermitteln Sie gültige Pfade zwischen relevanten Endpunkten (Quellen und Zielen) über Diagrammdaten (Rand und Knoten).

Die Funktion graph_path_discovery_fl() ist eine UDF (benutzerdefinierte Funktion), mit der gültige Pfade zwischen relevanten Endpunkten über Diagrammdaten ermittelt werden können. Graph-Daten bestehen aus Knoten (z. B. Ressourcen, Anwendungen oder Benutzer) und Kanten (z. B. vorhandene Zugriffsberechtigungen). Im Kontext der Cybersicherheit können solche Pfade mögliche laterale Bewegungspfade darstellen, die ein potenzieller Angreifer nutzen kann. Wir interessieren uns für Pfade, die Endpunkte verbinden, die durch einige Kriterien definiert sind , z. B. verfügbar gemachte Quellen, die mit kritischen Zielen verbunden sind. Basierend auf der Konfiguration der Funktion können andere Pfadtypen ermittelt werden, die für andere Sicherheitsszenarien geeignet sind.

Die Daten, die als Eingabe für diese Funktion verwendet werden können, sind eine Tabelle mit Kanten im Format "SourceId, EdgeId, TargetId" und eine Liste von Knoten mit optionalen Knoteneigenschaften, die zum Definieren gültiger Pfade verwendet werden können. Alternativ können Graph-Eingaben aus anderen Datentypen extrahiert werden. Beispielsweise können Datenverkehrsprotokolle mit Einträgen vom Typ "Benutzer A, die bei Ressource B angemeldet sind" als Ränder vom Typ "(Benutzer A)-[angemeldet bei]->(Ressource B)" modelliert werden, während die Liste der unterschiedlichen Benutzer und Ressourcen als Knoten modelliert werden kann.

Wir machen mehrere Annahmen:

  • Alle Kanten sind für die Pfadermittlung gültig. Edges, die irrelevant sind, sollten vor dem Ausführen der Pfadermittlung herausgefiltert werden.
  • Ränder sind ungewichtet, unabhängig und bedingungslos, was bedeutet, dass alle Kanten die gleiche Wahrscheinlichkeit haben und der Wechsel von B zu C nicht von der vorherigen Verschiebung von A nach B abhängt.
  • Pfade, die wir ermitteln möchten, sind einfache direktionale Pfade ohne Zyklen vom Typ A->B->C. Komplexere Definitionen können durch Ändern der internen Syntax des Graph-Match-Operators in der Funktion vorgenommen werden.

Diese Annahmen können bei Bedarf angepasst werden, indem die interne Logik der Funktion geändert wird.

Die Funktion ermittelt alle möglichen Pfade zwischen gültigen Quellen zu gültigen Zielen unter optionalen Einschränkungen wie Pfadlängenbeschränkungen, maximale Ausgabegröße usw. Die Ausgabe ist eine Liste der ermittelten Pfade mit Quell- und Ziel-IDs sowie eine Liste der Verbindungsränder und Knoten. Die Funktion verwendet nur die erforderlichen Felder, z. B. Knoten-IDs und Edge-IDs. Falls andere relevante Felder , z. B. Typen, Eigenschaftenlisten, sicherheitsbezogene Bewertungen oder externe Signale, in Eingabedaten verfügbar sind, können sie logik und ausgabe hinzugefügt werden, indem sie die Funktionsdefinition ändern.

Syntax

graph_path_discovery_fl( edgesTableName, , nodesTableName, scopeColumnName, isValidPathStartColumnName, isValidPathEndColumnName, nodeIdColumnName, edgeIdColumnName, sourceIdColumnName, targetIdColumnName, [minPathLength], [maxPathLength], [resultCountLimit])

Erfahren Sie mehr über Syntaxkonventionen.

Parameter

Name Art Erforderlich Beschreibung
EdgesTableName- string ✔️ Der Name der Eingabetabelle, die die Ränder des Diagramms enthält.
nodeTableName- string ✔️ Der Name der Eingabetabelle, die die Knoten des Diagramms enthält.
scopeColumnName- string ✔️ Der Name der Spalte in Knoten und Kantentabellen, die die Partition oder den Bereich enthalten (z. B. Abonnement oder Konto), sodass für jeden Bereich ein anderes Anomaliemodell erstellt wird.
isValidPathStartColumnName string ✔️ Der Name der Spalte in der Knotentabelle, die ein boolesches Flag für einen Knoten enthält, True bedeutet, dass der Knoten ein gültiger Startpunkt für einen Pfad ist und False - nicht gültig.
isValidPathEndColumnName string ✔️ Der Name der Spalte in der Knotentabelle, die ein boolesches Flag für einen Knoten enthält, True bedeutet, dass der Knoten ein gültiger Endpunkt für einen Pfad ist und False - keine gültige.
nodeIdColumnName- string ✔️ Der Name der Spalte in der Knotentabelle, die die Knoten-ID enthält.
edgeIdColumnName- string ✔️ Der Name der Spalte in der Rändertabelle, die die Edge-ID enthält.
sourceIdColumnName- string ✔️ Der Name der Spalte in der Rändertabelle, die die Quellknoten-ID des Edges enthält.
targetIdColumnName- string ✔️ Der Name der Spalte in der Rändertabelle, die die Zielknoten-ID des Edges enthält.
minPathLength- long Die Mindestanzahl der Schritte (Kanten) im Pfad. Der Standardwert ist 1.
maxPathLength- long Die maximale Anzahl von Schritten (Kanten) im Pfad. Der Standardwert ist 8.
resultCountLimit- long Die maximale Anzahl von Pfaden, die für die Ausgabe zurückgegeben werden. Der Standardwert ist 100000.

Funktionsdefinition

Sie können die Funktion definieren, indem Sie den Code entweder als abfragedefinierte Funktion einbetten oder wie folgt als gespeicherte Funktion in Ihrer Datenbank erstellen:

Definieren Sie die Funktion mithilfe der folgenden let-Anweisung. Es sind keine Berechtigungen erforderlich.

Wichtig

Eine anweisung zulassen, nicht eigenständig ausgeführt werden kann. Darauf muss eine tabellarische Ausdrucksanweisungfolgen. Informationen zum Ausführen eines funktionierenden Beispiels für graph_path_discovery_fl()finden Sie unter Beispiel.

let graph_path_discovery_fl = (   edgesTableName:string, nodesTableName:string, scopeColumnName:string
								, isValidPathStartColumnName:string, isValidPathEndColumnName:string
								, nodeIdColumnName:string, edgeIdColumnName:string, sourceIdColumnName:string, targetIdColumnName:string
								, minPathLength:long = 1, maxPathLength:long = 8, resultCountLimit:long = 100000) 
{
let edges = (
    table(edgesTableName)
    | extend sourceId           = column_ifexists(sourceIdColumnName, '')
    | extend targetId           = column_ifexists(targetIdColumnName, '')
    | extend edgeId             = column_ifexists(edgeIdColumnName, '')
    | extend scope              = column_ifexists(scopeColumnName, '')
    );
let nodes = (
    table(nodesTableName)
    | extend nodeId             = column_ifexists(nodeIdColumnName, '')
    | extend isValidPathStart   = column_ifexists(isValidPathStartColumnName, '')
    | extend isValidPathEnd     = column_ifexists(isValidPathEndColumnName, '')
    | extend scope              = column_ifexists(scopeColumnName, '')
);
let paths = (
    edges
    // Build graph object partitioned by scope, so that no connections are allowed between scopes.
    // In case no scopes are relevant, partitioning should be removed for better performance.
    | make-graph sourceId --> targetId with nodes on nodeId partitioned-by scope (
    // Look for existing paths between source nodes and target nodes with less than predefined number of hops
    // Current configurations looks for directed paths without any cycles; this can be changed if needed
      graph-match cycles = none (s)-[e*minPathLength..maxPathLength]->(t)
        // Filter only by paths with that connect valid endpoints
        where ((s.isValidPathStart) and (t.isValidPathEnd))
        project   sourceId                  = s.nodeId
                , isSourceValidPathStart    = s.isValidPathStart
                , targetId                  = t.nodeId
                , isTargetValidPathEnd      = t.isValidPathEnd
                , scope                     = s.scope
                , edgeIds                   = e.edgeId
                , edgeAllTargetIds          = e.targetId
    | limit resultCountLimit
    )
    | extend  pathLength                    = array_length(edgeIds)
            , pathId                        = hash_md5(strcat(sourceId, strcat(edgeIds), targetId))
            , pathAllNodeIds                = array_concat(pack_array(sourceId), edgeAllTargetIds)
    | project-away edgeAllTargetIds
    | mv-apply with_itemindex = SortIndex nodesInPath = pathAllNodeIds to typeof(string), edgesInPath = edgeIds to typeof(string) on (
        extend step = strcat(
              iff(isnotempty(nodesInPath), strcat('(', nodesInPath, ')'), '')
            , iff(isnotempty(edgesInPath), strcat('-[',  edgesInPath, ']->'), ''))
       | summarize fullPath = array_strcat(make_list(step), '')
    )
);
paths
};
// Write your query to use the function here.

Beispiel

Im folgenden Beispiel wird der Aufrufoperator verwendet, um die Funktion auszuführen.

Um eine abfragedefinierte Funktion zu verwenden, rufen Sie sie nach der definition der eingebetteten Funktion auf.

let graph_path_discovery_fl = (   edgesTableName:string, nodesTableName:string, scopeColumnName:string
								, isValidPathStartColumnName:string, isValidPathEndColumnName:string
								, nodeIdColumnName:string, edgeIdColumnName:string, sourceIdColumnName:string, targetIdColumnName:string
								, minPathLength:long = 1, maxPathLength:long = 8, resultCountLimit:long = 100000) 
{
let edges = (
    table(edgesTableName)
    | extend sourceId           = column_ifexists(sourceIdColumnName, '')
    | extend targetId           = column_ifexists(targetIdColumnName, '')
    | extend edgeId             = column_ifexists(edgeIdColumnName, '')
    | extend scope              = column_ifexists(scopeColumnName, '')
    );
let nodes = (
    table(nodesTableName)
    | extend nodeId             = column_ifexists(nodeIdColumnName, '')
    | extend isValidPathStart   = column_ifexists(isValidPathStartColumnName, '')
    | extend isValidPathEnd     = column_ifexists(isValidPathEndColumnName, '')
    | extend scope              = column_ifexists(scopeColumnName, '')
);
let paths = (
    edges
    // Build graph object partitioned by scope, so that no connections are allowed between scopes.
    // In case no scopes are relevant, partitioning should be removed for better performance.
    | make-graph sourceId --> targetId with nodes on nodeId partitioned-by scope (
    // Look for existing paths between source nodes and target nodes with less than predefined number of hops
    // Current configurations looks for directed paths without any cycles; this can be changed if needed
      graph-match cycles = none (s)-[e*minPathLength..maxPathLength]->(t)
        // Filter only by paths with that connect valid endpoints
        where ((s.isValidPathStart) and (t.isValidPathEnd))
        project   sourceId                  = s.nodeId
                , isSourceValidPathStart    = s.isValidPathStart
                , targetId                  = t.nodeId
                , isTargetValidPathEnd      = t.isValidPathEnd
                , scope                     = s.scope
                , edgeIds                   = e.edgeId
                , edgeAllTargetIds          = e.targetId
    | limit resultCountLimit
    )
    | extend  pathLength                    = array_length(edgeIds)
            , pathId                        = hash_md5(strcat(sourceId, strcat(edgeIds), targetId))
            , pathAllNodeIds                = array_concat(pack_array(sourceId), edgeAllTargetIds)
    | project-away edgeAllTargetIds
    | mv-apply with_itemindex = SortIndex nodesInPath = pathAllNodeIds to typeof(string), edgesInPath = edgeIds to typeof(string) on (
        extend step = strcat(
              iff(isnotempty(nodesInPath), strcat('(', nodesInPath, ')'), '')
            , iff(isnotempty(edgesInPath), strcat('-[',  edgesInPath, ']->'), ''))
       | summarize fullPath = array_strcat(make_list(step), '')
    )
);
paths
};
let edges = datatable (SourceNodeName:string, EdgeName:string, EdgeType:string, TargetNodeName:string, Region:string)[						
    'vm-work-1',            'e1',           'can use',	            'webapp-prd', 	          'US',
    'vm-custom',        	'e2',           'can use',	            'webapp-prd', 	          'US',
    'webapp-prd',           'e3',           'can access',	        'vm-custom', 	          'US',
    'webapp-prd',       	'e4',           'can access',	        'test-machine', 	      'US',
    'vm-custom',        	'e5',           'can access',	        'server-0126', 	          'US',
    'vm-custom',        	'e6',	        'can access',	        'hub_router', 	          'US',
    'webapp-prd',       	'e7',	        'can access',	        'hub_router', 	          'US',
    'test-machine',       	'e8',	        'can access',	        'vm-custom',              'US',
    'test-machine',        	'e9',	        'can access',	        'hub_router', 	          'US',
    'hub_router',           'e10',	        'routes traffic to',	'remote_DT', 	          'US',
    'vm-work-1',            'e11',	        'can access',	        'storage_main_backup', 	  'US',
    'hub_router',           'e12',	        'routes traffic to',	'vm-work-2', 	          'US',
    'vm-work-2',        	'e13',          'can access',	        'backup_prc', 	          'US',
    'remote_DT',            'e14',	        'can access',	        'backup_prc', 	          'US',
    'backup_prc',           'e15',	        'moves data to',        'storage_main_backup', 	  'US',
    'backup_prc',           'e16',	        'moves data to',        'storage_DevBox', 	      'US',
    'device_A1',            'e17',	        'is connected to',      'sevice_B2', 	          'EU',
    'sevice_B2',            'e18',	        'is connected to',      'device_A1', 	          'EU'
];
let nodes = datatable (NodeName:string, NodeType:string, NodeEnvironment:string, Region:string) [
        'vm-work-1',                'Virtual Machine',      'Production',       'US',
        'vm-custom',                'Virtual Machine',      'Production',       'US',
        'webapp-prd',               'Application',          'None',             'US',
        'test-machine',             'Virtual Machine',      'Test',             'US',
        'hub_router',               'Traffic Router',       'None',             'US',
        'vm-work-2',                'Virtual Machine',      'Production',       'US',
        'remote_DT',                'Virtual Machine',      'Production',       'US',
        'backup_prc',               'Service',              'Production',       'US',
        'server-0126',              'Server',               'Production',       'US',
        'storage_main_backup',      'Cloud Storage',        'Production',       'US',
        'storage_DevBox',           'Cloud Storage',        'Test',             'US',
        'device_A1',                'Device',               'Backend',          'EU',
        'device_B2',                'Device',               'Backend',          'EU'
];
let nodesEnriched = (
    nodes
    | extend IsValidStart = (NodeType == 'Virtual Machine'),             IsValidEnd = (NodeType == 'Cloud Storage')              // option 1
    //| extend IsValidStart = (NodeName in('vm-work-1', 'vm-work-2')),     IsValidEnd = (NodeName in('storage_main_backup'))       // option 2
    //| extend IsValidStart = (NodeEnvironment == 'Test'),                 IsValidEnd = (NodeEnvironment == 'Production')          // option 3
);
graph_path_discovery_fl(edgesTableName                = 'edges'
                , nodesTableName                = 'nodesEnriched'
                , scopeColumnName               = 'Region'
                , nodeIdColumnName              = 'NodeName'
                , edgeIdColumnName              = 'EdgeName'
                , sourceIdColumnName            = 'SourceNodeName'
                , targetIdColumnName            = 'TargetNodeName'
                , isValidPathStartColumnName    = 'IsValidStart'
                , isValidPathEndColumnName      = 'IsValidEnd'
)

Ausgabe-

sourceId isSourceValidPathStart targetId isTargetValidPathEnd Umfang edgeIds pathLength pathId pathAllNodeIds fullPath
Testcomputer STIMMT storage_DevBox STIMMT UNS ["e9";"e10";"e14";"e16"] 4 00605d35b6e1d28024fd846f217b43ac ["test-machine";"hub_router";"remote_DT";"backup_prc";"storage_DevBox"] (Testmaschine)-[e9]->(hub_router)-[e10]->(remote_DT)-[e14]->(backup_prc)-[e16]->(storage_DevBox)

Wenn Sie die Funktion ausführen, werden alle Pfade mithilfe von Eingaberändern gesucht, die eine Verbindung zwischen Quellknoten herstellen, die als gültige Startpunkte gekennzeichnet sind (isSourceValidPathStart == True) mit allen Zielen, die als gültige Endpunkte gekennzeichnet sind (isTargetValidPathEnd == True). Die Ausgabe ist eine Tabelle, in der jede Zeile einen einzelnen Pfad beschreibt (auf die maximale Anzahl von Zeilen nach resultCountLimit-Parameter beschränkt). Jede Zeile enthält die folgenden Felder:

  • sourceId: nodeId der Quelle - erster Knoten im Pfad.
  • isSourceValidPathStart: Boolesche Kennzeichnung für Quellknoten, die ein gültiger Pfadanfang ist; sollte gleich "True" sein.
  • targetId: nodeId des Ziels - letzter Knoten im Pfad.
  • isTargetValidPathEnd: Boolesche Kennzeichnung für Zielknoten, die ein gültiges Pfadende ist; sollte immer gleich "True" sein.
  • scope: der Bereich, der den Pfad enthält.
  • edgeIds: eine sortierte Liste von Rändern im Pfad.
  • pathLength: die Anzahl der Ränder (Hops) im Pfad.
  • pathId: Ein Hash der Endpunkte und Schritte des Pfads kann als eindeutiger Bezeichner für den Pfad verwendet werden.
  • pathAllNodeIds: eine sortierte Liste der Knoten im Pfad.
  • fullPath: eine Zeichenfolge, die den vollständigen Pfad darstellt, im Format (Quellknoten)-[Edge 1]->(Node2)-.....->(Zielknoten).

Im obigen Beispiel verarbeiten wir die Knotentabelle vor und fügen mehrere Optionen möglicher Endpunktdefinitionen hinzu. Durch Kommentieren/Aufheben der Kommentare verschiedener Optionen können mehrere Szenarien ermittelt werden:

  • Option 1: Suchen von Pfaden zwischen virtuellen Computern zu Cloudspeicherressourcen. Nützlich beim Untersuchen von Verbindungsmustern zwischen Knotentypen.
  • Option 2: Suchen von Pfaden zwischen einem der einzelnen Knoten (vm-work-1, vm-work-2) zu einem bestimmten Knoten (storage_main_backup). Nützlich bei der Untersuchung bekannter Fälle – z. B. Pfade von bekannten kompromittierten Ressourcen zu bekannten kritischen Objekten.
  • Option 3: Suchen von Pfaden zwischen Knotengruppen, z. B. Knoten in verschiedenen Umgebungen. Nützlich für die Überwachung unsicherer Pfade, z. B. Pfade zwischen Test- und Produktionsumgebungen.

Im obigen Beispiel verwenden wir die erste Option, um alle Pfade zwischen VMs und Cloudspeicherressourcen zu finden, die von potenziellen Angreifern verwendet werden können, die auf gespeicherte Daten zugreifen möchten. Dieses Szenario kann durch das Hinzufügen weiterer Filter zu gültigen Endpunkten gestärkt werden, z. B. das Verbinden von VMs mit bekannten Sicherheitsrisiken mit Speicherkonten, die vertrauliche Daten enthalten.

Die Funktion graph_path_discovery_fl() kann in der Cybersicherheitsdomäne verwendet werden, um interessante Pfade zu entdecken, z. B. laterale Bewegungspfade, über Daten, die als Diagramm modelliert wurden.