GraphFrames-Benutzerhandbuch – Scala
In diesem Artikel werden Beispiele aus dem GraphFrames-Benutzerhandbuchveranschaulicht.
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._
Erstellen von GraphFrames
Sie können GraphFrames aus Vertex- und Edgedatenframes erstellen.
- Vertex DataFrame: Ein Vertex DataFrame sollte eine spezielle Spalte mit dem Namen
id
enthalten, die eindeutige IDs für jeden Scheitelpunkt im Diagramm angibt. - Edge DataFrame: Ein Edge-DataFrame sollte zwei spezielle Spalten enthalten:
src
(Quellvertex-ID des Edges) unddst
(Zielvertex-ID des Edges).
Beide DataFrames können beliebige andere Spalten aufweisen. Diese Spalten können Vertex- und Randattribute darstellen.
Erstellen Sie die Scheitelpunkte und Kanten
// Vertex DataFrame
val v = spark.createDataFrame(List(
("a", "Alice", 34),
("b", "Bob", 36),
("c", "Charlie", 30),
("d", "David", 29),
("e", "Esther", 32),
("f", "Fanny", 36),
("g", "Gabby", 60)
)).toDF("id", "name", "age")
// Edge DataFrame
val e = spark.createDataFrame(List(
("a", "b", "friend"),
("b", "c", "follow"),
("c", "b", "follow"),
("f", "c", "follow"),
("e", "f", "follow"),
("e", "d", "friend"),
("d", "a", "friend"),
("a", "e", "friend")
)).toDF("src", "dst", "relationship")
Erstellen wir ein Diagramm aus diesen Scheitelpunkten und den folgenden Rändern:
val g = GraphFrame(v, e)
// This example graph also comes with the GraphFrames package.
// val g = examples.Graphs.friends
Einfache Graph- und DataFrame-Abfragen
GraphFrames stellen einfache Diagrammabfragen bereit, z. B. Knotengrad.
Da GraphFrames Diagramme als Vertex- und Edge-DataFrames-Paare darstellen, ist es einfach, leistungsstarke Abfragen direkt auf den Vertex- und Edge-DataFrames zu erstellen. Diese DataFrames sind als Scheitelpunkte und Kantenfelder im GraphFrame verfügbar.
display(g.vertices)
display(g.edges)
Der eingehende Grad der Scheitelpunkte:
display(g.inDegrees)
Der ausgehende Grad der Knoten:
display(g.outDegrees)
Der Grad der Scheitelpunkte:
display(g.degrees)
Sie können Abfragen direkt auf dem Vertices DataFrame ausführen. Beispielsweise können wir das Alter der jüngsten Person im Diagramm finden:
val youngest = g.vertices.groupBy().min("age")
display(youngest)
Ebenso können Sie Abfragen an den Rändern von DataFrame ausführen. Lassen Sie uns beispielsweise die Anzahl der "Follow"-Beziehungen im Diagramm zählen:
val numFollows = g.edges.filter("relationship = 'follow'").count()
Motivsuche
Erstellen Sie komplexere Beziehungen mit Kanten und Scheitelpunkten mithilfe von Motiven. Mit der folgenden Zelle suchen Sie nach Scheitelpunktpaaren, zwischen denen sich in beiden Richtungen Kanten befinden. Das Ergebnis ist ein DataFrame, in dem die Spaltennamen Motivschlüssel sind.
Weitere Informationen zur API finden Sie im GraphFrame-Benutzerhandbuch.
// Search for pairs of vertices with edges in both directions between them.
val motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
display(motifs)
Da das Ergebnis ein DataFrame ist, können Sie komplexere Abfragen auf dem Motiv erstellen. Lassen Sie uns alle gegenseitigen Beziehungen finden, in denen eine Person älter als 30 ist:
val filtered = motifs.filter("b.age > 30")
display(filtered)
Zustandsbehaftete Abfragen
Die meisten Motivabfragen sind zustandslos und einfach zu erstellen (wie die Beispiele oben). In den folgenden Beispielen sehen Sie komplexere Abfragen, die einen Zustand für einen Pfad im Motiv enthalten. Ausdrücken Sie diese Abfragen, indem Sie die GraphFrame-Motivsuche mit Filtern für das Ergebnis kombinieren, wobei die Filter Sequenzvorgänge verwenden, um eine Reihe von DataFrame-Spalten zu erstellen.
Angenommen, Sie möchten eine Kette von 4 Scheitelpunkten mit einer Eigenschaft identifizieren, die durch eine Sequenz von Funktionen definiert wird. Dies bedeutet, dass die Teilmenge von Ketten bestehend aus vier Scheitelpunkten (a->b->c->d
) identifiziert werden soll, die die Bedingungen des folgenden komplexen Filters erfüllt:
- Der Zustand wird für den Pfad initialisiert.
- Der Zustand wird auf der Grundlage von Knoten a aktualisiert.
- Der Zustand wird auf der Grundlage von Scheitelpunkt „b“ aktualisiert.
- Usw. für c und d.
- Wenn der endgültige Zustand mit einer Bedingung übereinstimmt, akzeptiert der Filter die Kette.
In den folgenden Codeschnipseln wird dieser Vorgang veranschaulicht. Es werden Ketten bestehend aus vier Scheitelpunkten identifiziert, wobei mindestens zwei von drei Kanten Beziehungen vom Typ „Friend“ aufweisen. In diesem Beispiel stellt der Zustand die aktuelle Anzahl von „befreundeten“ Kanten dar. Generell kann es sich dabei um eine beliebige DataFrame-Spalte handeln.
// Find chains of 4 vertices.
val chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")
// Query on sequence, with state (cnt)
// (a) Define method for updating state given the next element of the motif.
def sumFriends(cnt: Column, relationship: Column): Column = {
when(relationship === "friend", cnt + 1).otherwise(cnt)
}
// (b) Use sequence operation to apply method to sequence of elements in motif.
// In this case, the elements are the 3 edges.
val condition = Seq("ab", "bc", "cd").
foldLeft(lit(0))((cnt, e) => sumFriends(cnt, col(e)("relationship")))
// (c) Apply filter to DataFrame.
val chainWith2Friends2 = chain4.where(condition >= 2)
display(chainWith2Friends2)
Teilgraphen
GraphFrames stellt APIs zum Erstellen von Untergraphen bereit, indem sie auf Kanten und Scheitelpunkte filtern. Diese Filter können kombiniert werden. Der folgende Untergraph enthält beispielsweise nur Personen, die Freunde sind und mehr als 30 Jahre alt sind.
// Select subgraph of users older than 30, and edges of type "friend"
val g2 = g
.filterEdges("relationship = 'friend'")
.filterVertices("age > 30")
.dropIsolatedVertices()
Komplexe Dreifachfilter
Das folgende Beispiel zeigt, wie Sie ein Unterdiagramm basierend auf Tripletfiltern auswählen, die an einem Rand und seinen "src" und "dst" Scheitelpunkten arbeiten. Das Ausweiten dieses Beispiels auf über die Dreiergruppen hinaus durch die Verwendung komplexerer Motive ist einfach.
// Select subgraph based on edges "e" of type "follow"
// pointing from a younger user "a" to an older user "b".
val paths = g.find("(a)-[e]->(b)")
.filter("e.relationship = 'follow'")
.filter("a.age < b.age")
// "paths" contains vertex info. Extract the edges.
val e2 = paths.select("e.src", "e.dst", "e.relationship")
// In Spark 1.5+, the user may simplify this call:
// val e2 = paths.select("e.*")
// Construct the subgraph
val g2 = GraphFrame(g.vertices, e2)
display(g2.vertices)
display(g2.edges)
Standardgraphalgorithmen
In diesem Abschnitt werden die in GraphFrames integrierten Standarddiagrammalgorithmen beschrieben.
Breadth-First Search (BFS)
Suchen Sie in „Esther“ nach Benutzern mit dem Alter < 32.
val paths: DataFrame = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32").run()
display(paths)
Die Suche kann auch Kantenfilter und maximale Pfadlängen einschränken.
val filteredPaths = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32")
.edgeFilter("relationship != 'friend'")
.maxPathLength(3)
.run()
display(filteredPaths)
Verbundene Komponenten
Berechnen Sie die Zugehörigkeit zur verbundenen Komponente jedes Knotens und geben Sie einen Graphen zurück, wobei jedem Knoten eine Komponenten-ID zugewiesen ist.
val result = g.connectedComponents.run() // doesn't work on Spark 1.4
display(result)
Stark verbundene Komponenten
Berechnen Sie die stark verbundene Komponente (SCC) jedes Scheitelpunkts, und geben Sie ein Diagramm zurück, dessen Scheitelpunkt dem SCC zugeordnet ist, der diesen Scheitelpunkt enthält.
val result = g.stronglyConnectedComponents.maxIter(10).run()
display(result.orderBy("component"))
Bezeichnungsverteilung
Führen Sie den Algorithmus für die statische Bezeichnungsverteilung aus, um Communitys in Netzwerken zu erkennen.
Jedem Knoten im Netzwerk wird zunächst eine eigene Community zugewiesen. Bei jedem Schritt sendet ein Knoten seine Communityzugehörigkeit an alle Nachbarknoten und aktualisiert deren Status auf den Modus „Communityzugehörigkeit eingehender Nachrichten“.
LPA ist ein Standardmäßiger Community-Erkennungsalgorithmus für Diagramme. Es ist preiswert rechnerisch, obwohl (1) Konvergenz nicht garantiert ist und (2) man mit trivialen Lösungen enden kann (alle Knoten identifizieren sich in einer einzigen Gemeinschaft).
val result = g.labelPropagation.maxIter(5).run()
display(result.orderBy("label"))
PageRank
Identifizieren Sie wichtige Scheitelpunkte in einem Diagramm basierend auf Verbindungen.
// Run PageRank until convergence to tolerance "tol".
val results = g.pageRank.resetProbability(0.15).tol(0.01).run()
display(results.vertices)
display(results.edges)
// Run PageRank for a fixed number of iterations.
val results2 = g.pageRank.resetProbability(0.15).maxIter(10).run()
display(results2.vertices)
// Run PageRank personalized for vertex "a"
val results3 = g.pageRank.resetProbability(0.15).maxIter(10).sourceId("a").run()
display(results3.vertices)
Kürzeste Pfade
Berechnet die kürzesten Pfade zur angegebenen Gruppe von Landmarkscheitelpunkten, wobei die Landmarks anhand der Scheitelpunkt-ID angegeben werden.
val paths = g.shortestPaths.landmarks(Seq("a", "d")).run()
display(paths)
Triangle Counting
Berechnet die Anzahl der Dreiecke, die durch jeden Scheitelpunkt gehen.
import org.graphframes.examples
val g: GraphFrame = examples.Graphs.friends // get example graph
val results = g.triangleCount.run()
results.select("id", "count").show()