GraphFrames-Benutzerhandbuch – Scala
In diesem Artikel werden Beispiele aus dem GraphFrames-Benutzerleitfaden veranschaulicht.
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._
Erstellen von GraphFrames-Objekten
GraphFrames-Objekte können auf der Grundlage von Knoten- und Kanten-DataFrame-Objekten erstellt werden.
- Knoten-DataFrame: Ein Knoten-DataFrame-Objekt sollte eine spezielle Spalte mit dem Namen
id
enthalten, in der eine eindeutige ID für jeden Knoten im Graph angegeben wird. - Kanten-DataFrame: Ein Kanten-DataFrame-Objekt sollte die beiden speziellen Spalten
src
(Quellknoten-ID der Kante) unddst
(Zielknoten-ID der Kante) enthalten.
Beide DataFrames können beliebige weitere Spalten aufweisen. Diese Spalten können Knoten- und Kantenattribute darstellen.
Erstellen von Knoten und Kanten
// Vertex DataFrame
val v = spark.createDataFrame(List(
("a", "Alice", 34),
("b", "Bob", 36),
("c", "Charlie", 30),
("d", "David", 29),
("e", "Esther", 32),
("f", "Fanny", 36),
("g", "Gabby", 60)
)).toDF("id", "name", "age")
// Edge DataFrame
val e = spark.createDataFrame(List(
("a", "b", "friend"),
("b", "c", "follow"),
("c", "b", "follow"),
("f", "c", "follow"),
("e", "f", "follow"),
("e", "d", "friend"),
("d", "a", "friend"),
("a", "e", "friend")
)).toDF("src", "dst", "relationship")
Erstellen Sie basierend auf diesen Knoten und Kanten einen Graph:
val g = GraphFrame(v, e)
// This example graph also comes with the GraphFrames package.
// val g = examples.Graphs.friends
Grundlegende Graph- und DataFrame-Abfragen
GraphFrames stellt einfache Graphabfragen wie den Knotengrad bereit.
Da in GraphFrames-Objekten Graphen zudem als Paare aus Knoten-Kanten-DataFrames dargestellt werden, ist es einfach, leistungsstarke Abfragen direkt für Knoten- und Kanten-DataFrames auszuführen. Diese DataFrame-Objekte sind als Knoten- und Kantenfelder in GraphFrames verfügbar.
display(g.vertices)
display(g.edges)
Der eingehende Grad der Knoten:
display(g.inDegrees)
Der ausgehende Grad der Knoten:
display(g.outDegrees)
Der Grad der Knoten:
display(g.degrees)
Sie können Abfragen direkt für das Knoten-DataFrame-Objekt ausführen. So ermitteln Sie etwa das Alter der jüngsten Person im Graph:
val youngest = g.vertices.groupBy().min("age")
display(youngest)
Gleiches gilt für das Ausführen von Abfragen für das Kanten-DataFrame-Objekt. So können Sie etwa die Anzahl der Beziehungen vom Typ „follow“ im Graph bestimmen:
val numFollows = g.edges.filter("relationship = 'follow'").count()
Motivsuche
Mithilfe von Motiven können Sie komplexere Beziehungen zwischen Kanten und Knoten erstellen. Mit der folgenden Zelle suchen Sie nach Knotenpaaren, zwischen denen sich in beiden Richtungen Kanten befinden. Das Ergebnis ist ein DataFrame, dessen Spaltennamen Motivschlüssel darstellen.
Weitere Informationen zur API finden Sie im GraphFrames-Benutzerleitfaden.
// Search for pairs of vertices with edges in both directions between them.
val motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
display(motifs)
Da es sich bei dem Ergebnis um ein DataFrame-Objekt handelt, können Sie auf der Grundlage des Motivs auch komplexere Abfragen erstellen. So können Sie etwa nach allen reziproken Beziehungen suchen, in denen eine Person älter als 30 ist:
val filtered = motifs.filter("b.age > 30")
display(filtered)
Zustandsbehaftete Abfragen
Die meisten Motivabfragen sind zustandslos und einfach zu erstellen (wie die Beispiele oben). In den folgenden Beispielen sehen Sie komplexere Abfragen, die einen Zustand für einen Pfad im Motiv enthalten. Erstellen Sie diese Abfragen, indem Sie die Motivsuche in GraphFrames mit Filtern für das Ergebnis kombinieren, wobei die Filter mithilfe von Sequenzvorgängen eine Reihe von DataFrame-Spalten erstellen.
Angenommen, Sie möchten eine Kette von vier Knoten mit einer Eigenschaft identifizieren, die durch eine Sequenz von Funktionen definiert wird. Das heißt, es soll die Teilmenge von Ketten bestehend aus vier Knoten (a->b->c->d
) identifiziert werden, die die Bedingungen des folgenden komplexen Filters erfüllt:
- Der Zustand wird für den Pfad initialisiert.
- Der Zustand wird auf der Grundlage von Knoten a aktualisiert.
- Der Zustand wird auf der Grundlage von Knoten b aktualisiert.
- Gleiches gilt für Knoten c und d.
- Stimmt der Endzustand mit einer der Bedingungen überein, akzeptiert der Filter die Kette.
In den folgenden Codeausschnitten wird dieser Vorgang veranschaulicht. Es werden Ketten bestehend aus vier Knoten identifiziert, wobei mindestens zwei von drei Kanten Beziehungen vom Typ „Friend“ aufweisen. In diesem Beispiel stellt der Zustand die aktuelle Anzahl von „befreundeten“ Kanten dar. Generell kann es sich dabei um eine beliebige DataFrame-Spalte handeln.
// Find chains of 4 vertices.
val chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")
// Query on sequence, with state (cnt)
// (a) Define method for updating state given the next element of the motif.
def sumFriends(cnt: Column, relationship: Column): Column = {
when(relationship === "friend", cnt + 1).otherwise(cnt)
}
// (b) Use sequence operation to apply method to sequence of elements in motif.
// In this case, the elements are the 3 edges.
val condition = Seq("ab", "bc", "cd").
foldLeft(lit(0))((cnt, e) => sumFriends(cnt, col(e)("relationship")))
// (c) Apply filter to DataFrame.
val chainWith2Friends2 = chain4.where(condition >= 2)
display(chainWith2Friends2)
Teilgraphen
GraphFrames stellt APIs zum Erstellen von Teilgraphen durch das Filtern nach Kanten und Knoten bereit. Diese Filter können auch kombiniert werden. Der folgende Teilgraph enthält beispielsweise nur Personen, die befreundet und älter als 30 Jahre sind.
// Select subgraph of users older than 30, and edges of type "friend"
val g2 = g
.filterEdges("relationship = 'friend'")
.filterVertices("age > 30")
.dropIsolatedVertices()
Komplexe Tripelfilter
Im folgenden Beispiel wird gezeigt, wie Sie einen Teilgraph über Tripelfilter auswählen, die für eine Kante und deren Knoten „src“ und „dst“ ausgeführt werden. Mithilfe einer komplexeren Motivsuche kann dieses Beispiel problemlos über Tripelfilter hinaus erweitert werden.
// Select subgraph based on edges "e" of type "follow"
// pointing from a younger user "a" to an older user "b".
val paths = g.find("(a)-[e]->(b)")
.filter("e.relationship = 'follow'")
.filter("a.age < b.age")
// "paths" contains vertex info. Extract the edges.
val e2 = paths.select("e.src", "e.dst", "e.relationship")
// In Spark 1.5+, the user may simplify this call:
// val e2 = paths.select("e.*")
// Construct the subgraph
val g2 = GraphFrame(g.vertices, e2)
display(g2.vertices)
display(g2.edges)
Standardgraphalgorithmen
In diesem Abschnitt werden die in GraphFrames integrierten Standardgraphalgorithmen beschrieben.
Breadth-First Search (BFS)
Suchen Sie nach Benutzern mit dem Namen „Esther“, die jünger als 32 sind.
val paths: DataFrame = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32").run()
display(paths)
Dabei können Sie Kantenfilter und die Pfadlänge begrenzen.
val filteredPaths = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32")
.edgeFilter("relationship != 'friend'")
.maxPathLength(3)
.run()
display(filteredPaths)
Connected Components
Berechnen Sie die Zugehörigkeit jedes Knoten zu verbundenen Komponenten, und geben Sie einen Graph zurück, bei dem jedem Knoten eine Komponenten-ID zugewiesen ist.
val result = g.connectedComponents.run() // doesn't work on Spark 1.4
display(result)
Strongly Connected Components (SCC)
Berechnen Sie für jeden Knoten dessen stark verbundene Komponente, und geben Sie einen Graph zurück, bei dem jedem Knoten dessen SCC zugewiesen ist.
val result = g.stronglyConnectedComponents.maxIter(10).run()
display(result.orderBy("component"))
Label Propagation
Führen Sie den statischen Label Propagation-Algorithmus (LPA) aus, um Communitys in Netzwerken zu erkennen.
Jeder Knoten in einem Netzwerk wird zu Beginn einer Community zugewiesen. Bei jedem Schritt sendet ein Knoten seine Communityzugehörigkeit an alle Nachbarknoten und aktualisiert deren Status auf den Modus „Communityzugehörigkeit eingehender Nachrichten“.
LPA ist ein Standardalgorithmus zur Ermittlung von Communitys für Graphen. Die Berechnung ist kostengünstig. Beachten Sie jedoch, dass (1) eine Konvergenz nicht garantiert ist und (2) es zu trivialen Lösungen kommen kann (etwa dass alle Knoten als Teil einer einzigen Community erkannt werden).
val result = g.labelPropagation.maxIter(5).run()
display(result.orderBy("label"))
PageRank
Mit PageRank können Sie wichtige Knoten in einem Graphen basierend auf Verbindungen identifizieren.
// Run PageRank until convergence to tolerance "tol".
val results = g.pageRank.resetProbability(0.15).tol(0.01).run()
display(results.vertices)
display(results.edges)
// Run PageRank for a fixed number of iterations.
val results2 = g.pageRank.resetProbability(0.15).maxIter(10).run()
display(results2.vertices)
// Run PageRank personalized for vertex "a"
val results3 = g.pageRank.resetProbability(0.15).maxIter(10).sourceId("a").run()
display(results3.vertices)
Shortest Paths
Berechnet die kürzesten Pfade zur angegebenen Gruppe von Landmarkknoten, wobei die Landmarks anhand der Knoten-ID angegeben werden.
val paths = g.shortestPaths.landmarks(Seq("a", "d")).run()
display(paths)
Triangle Counting
Berechnet die Anzahl der Dreiecke, die die einzelnen Knoten durchlaufen.
import org.graphframes.examples
val g: GraphFrame = examples.Graphs.friends // get example graph
val results = g.triangleCount.run()
results.select("id", "count").show()