Användarhandbok för GraphFrames – Scala
Den här artikeln visar exempel från användarhandboken för GraphFrames.
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._
Skapa GraphFrames
Du kan skapa GraphFrames från hörn och gränsdataramar.
- Hörndataram: Ett hörn DataFrame ska innehålla en särskild kolumn med namnet
id
som anger unika ID:er för varje hörn i diagrammet. - Edge DataFrame: En edge-dataram ska innehålla två särskilda kolumner:
src
(källhörn-ID för gränsen) ochdst
(målhörn-ID för gränsen).
Båda DataFrames kan ha godtyckliga andra kolumner. Dessa kolumner kan representera hörn- och kantattribut.
Skapa hörn och kanter
// Vertex DataFrame
val v = spark.createDataFrame(List(
("a", "Alice", 34),
("b", "Bob", 36),
("c", "Charlie", 30),
("d", "David", 29),
("e", "Esther", 32),
("f", "Fanny", 36),
("g", "Gabby", 60)
)).toDF("id", "name", "age")
// Edge DataFrame
val e = spark.createDataFrame(List(
("a", "b", "friend"),
("b", "c", "follow"),
("c", "b", "follow"),
("f", "c", "follow"),
("e", "f", "follow"),
("e", "d", "friend"),
("d", "a", "friend"),
("a", "e", "friend")
)).toDF("src", "dst", "relationship")
Nu ska vi skapa ett diagram från dessa hörn och dessa kanter:
val g = GraphFrame(v, e)
// This example graph also comes with the GraphFrames package.
// val g = examples.Graphs.friends
Grundläggande graf- och DataFrame-frågor
GraphFrames tillhandahåller enkla graffrågor, till exempel nodgrad.
Eftersom GraphFrames representerar grafer som par av hörn- och kantdataramar är det dessutom enkelt att göra kraftfulla frågor direkt på dessa hörn- och kantdataramar. Dessa dataramar är tillgängliga som hörn och kantfält i GraphFrame.
display(g.vertices)
display(g.edges)
De inkommande graderna av noderna:
display(g.inDegrees)
Utgående grad av noderna:
display(g.outDegrees)
Hörnens grad:
display(g.degrees)
Du kan köra frågor direkt på vertices DataFrame. Vi kan till exempel hitta den yngsta personens ålder i diagrammet:
val youngest = g.vertices.groupBy().min("age")
display(youngest)
På samma sätt kan du köra frågor på kanterna DataFrame. Låt oss till exempel räkna antalet "följ"-relationer i diagrammet:
val numFollows = g.edges.filter("relationship = 'follow'").count()
Hitta motiv
Skapa mer komplexa relationer med kanter och hörn med hjälp av motiv. Följande cell identifierar par av noder med kanter i båda riktningarna mellan dem. Resultatet är en DataFrame där kolumnnamnen är motivnycklar.
Läs användarhandboken för GraphFrame för mer information om API:et.
// Search for pairs of vertices with edges in both directions between them.
val motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
display(motifs)
Eftersom resultatet är en DataFrame kan du skapa mer komplexa frågor ovanpå motivet. Låt oss hitta alla ömsesidiga relationer där en person är äldre än 30:
val filtered = motifs.filter("b.age > 30")
display(filtered)
Tillståndskänsliga frågor
De flesta motivfrågor är tillståndslösa och enkla att uttrycka, som i exemplen ovan. De nästa exemplen visar mer komplexa frågor som bär med sig tillstånd längs en väg i motivet. Uttrycka dessa frågor genom att kombinera GraphFrame-motivsökning med filter på resultatet, där filtren använder sekvensåtgärder för att konstruera en serie DataFrame-kolumner.
Anta till exempel att du vill identifiera en kedja med 4 hörn med en egenskap som definieras av en sekvens med funktioner. Det vill säga, bland kedjor med 4 hörn a->b->c->d
, identifiera den delmängd av kedjor som matchar detta komplexa filter:
- Initiera tillståndet på vägen.
- Uppdatera tillstånd baserat på nod a.
- Uppdatera tillstånd baserat på nod b.
- O.s.v. för c och d.
- Om det slutliga tillståndet matchar något villkor accepterar filtret kedjan.
Följande kodfragment visar den här processen, där vi identifierar kedjor med 4 hörn så att minst 2 av de tre kanterna är "vän"-relationer. I det här exemplet är tillståndet det aktuella antalet "vän"-kanter. I allmänhet kan det vara vilken DataFrame-kolumn som helst.
// Find chains of 4 vertices.
val chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")
// Query on sequence, with state (cnt)
// (a) Define method for updating state given the next element of the motif.
def sumFriends(cnt: Column, relationship: Column): Column = {
when(relationship === "friend", cnt + 1).otherwise(cnt)
}
// (b) Use sequence operation to apply method to sequence of elements in motif.
// In this case, the elements are the 3 edges.
val condition = Seq("ab", "bc", "cd").
foldLeft(lit(0))((cnt, e) => sumFriends(cnt, col(e)("relationship")))
// (c) Apply filter to DataFrame.
val chainWith2Friends2 = chain4.where(condition >= 2)
display(chainWith2Friends2)
Undergrafer
GraphFrames tillhandahåller API:er för att skapa undergrafer genom filtrering på kanter och hörn. Dessa filter kan kombineras tillsammans. Följande undergraf innehåller till exempel endast personer som är vänner och som är mer än 30 år gamla.
// Select subgraph of users older than 30, and edges of type "friend"
val g2 = g
.filterEdges("relationship = 'friend'")
.filterVertices("age > 30")
.dropIsolatedVertices()
Komplexa tripletfilter
I följande exempel visas hur du väljer en subgraf baserat på tripelfilter som tillämpas på en kant och dess "src"- och "dst"-hörn. Det är enkelt att utöka det här exemplet så att det går längre än trillingar med hjälp av mer komplexa motiv.
// Select subgraph based on edges "e" of type "follow"
// pointing from a younger user "a" to an older user "b".
val paths = g.find("(a)-[e]->(b)")
.filter("e.relationship = 'follow'")
.filter("a.age < b.age")
// "paths" contains vertex info. Extract the edges.
val e2 = paths.select("e.src", "e.dst", "e.relationship")
// In Spark 1.5+, the user may simplify this call:
// val e2 = paths.select("e.*")
// Construct the subgraph
val g2 = GraphFrame(g.vertices, e2)
display(g2.vertices)
display(g2.edges)
Standardgrafalgoritmer
I det här avsnittet beskrivs de standardgrafalgoritmer som är inbyggda i GraphFrames.
Bredden-först-sökning (BFS)
Sök från "Esther" efter användare i åldern < 32.
val paths: DataFrame = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32").run()
display(paths)
Sökningen kan också begränsa gränsfilter och maximala sökvägslängder.
val filteredPaths = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32")
.edgeFilter("relationship != 'friend'")
.maxPathLength(3)
.run()
display(filteredPaths)
Anslutna komponenter
Beräkna det anslutna komponentmedlemskapet för varje hörn och returnera ett diagram med varje hörn tilldelat ett komponent-ID.
val result = g.connectedComponents.run() // doesn't work on Spark 1.4
display(result)
Starkt anslutna komponenter
Beräkna den starkt anslutna komponenten (SCC) för varje hörn och returnera ett diagram med varje hörn tilldelat till den SCC som innehåller det hörnet.
val result = g.stronglyConnectedComponents.maxIter(10).run()
display(result.orderBy("component"))
Etikettspridning
Kör algoritmen för statisk etikettspridning för att identifiera communities i nätverk.
Varje nod i nätverket tilldelas ursprungligen till sin egen community. Vid varje supersteg skickar noder sin community-anknytning till alla grannar och uppdaterar sitt tillstånd till den normgemenskapstillhörighet som är mest förekommande bland inkommande meddelanden.
LPA är en standardalgoritm för community-detektering för grafer. Det är billigt beräkningsmässigt, även om (1) konvergens inte garanteras och (2) man kan sluta med triviala lösningar (alla noder identifiera i en enda gemenskap).
val result = g.labelPropagation.maxIter(5).run()
display(result.orderBy("label"))
PageRank
Identifiera viktiga hörn i ett diagram baserat på anslutningar.
// Run PageRank until convergence to tolerance "tol".
val results = g.pageRank.resetProbability(0.15).tol(0.01).run()
display(results.vertices)
display(results.edges)
// Run PageRank for a fixed number of iterations.
val results2 = g.pageRank.resetProbability(0.15).maxIter(10).run()
display(results2.vertices)
// Run PageRank personalized for vertex "a"
val results3 = g.pageRank.resetProbability(0.15).maxIter(10).sourceId("a").run()
display(results3.vertices)
Kortaste sökvägar
Beräknar kortaste vägar till den angivna uppsättningen med landmärkeshörn, där landmärken specificeras med hörn-ID.
val paths = g.shortestPaths.landmarks(Seq("a", "d")).run()
display(paths)
Triangelräkning
Beräknar antalet trianglar som passerar genom varje hörn.
import org.graphframes.examples
val g: GraphFrame = examples.Graphs.friends // get example graph
val results = g.triangleCount.run()
results.select("id", "count").show()