GraphFrames-gebruikershandleiding - Scala
In dit artikel ziet u voorbeelden uit de GraphFrames-gebruikershandleiding.
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._
GraphFrames maken
U kunt GraphFrames maken op basis van hoekpunt en edge DataFrames.
- Hoekpunt DataFrame: Een hoekpunt DataFrame moet een speciale kolom bevatten met de naam
id
waarmee unieke id's voor elk hoekpunt in de grafiek worden opgegeven. - Edge DataFrame: Een edge DataFrame moet twee speciale kolommen bevatten:
src
(bronpunt-id van edge) endst
(doelpunt-id van edge).
Beide DataFrames kunnen willekeurige andere kolommen hebben. Deze kolommen kunnen hoekpunt- en randkenmerken vertegenwoordigen.
De hoekpunten en randen maken
// Vertex DataFrame
val v = spark.createDataFrame(List(
("a", "Alice", 34),
("b", "Bob", 36),
("c", "Charlie", 30),
("d", "David", 29),
("e", "Esther", 32),
("f", "Fanny", 36),
("g", "Gabby", 60)
)).toDF("id", "name", "age")
// Edge DataFrame
val e = spark.createDataFrame(List(
("a", "b", "friend"),
("b", "c", "follow"),
("c", "b", "follow"),
("f", "c", "follow"),
("e", "f", "follow"),
("e", "d", "friend"),
("d", "a", "friend"),
("a", "e", "friend")
)).toDF("src", "dst", "relationship")
Laten we een grafiek maken op basis van deze hoekpunten en deze randen:
val g = GraphFrame(v, e)
// This example graph also comes with the GraphFrames package.
// val g = examples.Graphs.friends
Basisgrafiek- en DataFrame-query's
GraphFrames bieden eenvoudige grafenquery's, zoals knooppuntgraad.
Omdat GraphFrames grafieken vertegenwoordigt als paren van hoekpunt en edge DataFrames, is het ook eenvoudig om krachtige query's rechtstreeks op het hoekpunt en edge DataFrames te maken. Deze DataFrames zijn beschikbaar als hoekpunten en randen in het GraphFrame.
display(g.vertices)
display(g.edges)
De binnenkomende graad van de hoekpunten
display(g.inDegrees)
De uitgaande graad van de hoekpunten:
display(g.outDegrees)
De graad van de hoekpunten.
display(g.degrees)
U kunt query's rechtstreeks uitvoeren op het dataframe hoekpunten. We kunnen bijvoorbeeld de leeftijd van de jongste persoon in de grafiek vinden:
val youngest = g.vertices.groupBy().min("age")
display(youngest)
Op dezelfde manier kunt u query's uitvoeren op het randen-DataFrame. Laten we bijvoorbeeld het aantal 'follow'-relaties in de grafiek tellen:
val numFollows = g.edges.filter("relationship = 'follow'").count()
Motiefdetectie
Bouw complexere relaties met randen en hoekpunten met behulp van motieven. In de volgende cel worden de paren van hoekpunten met zijdes in beide richtingen tussen hen gevonden. Het resultaat is een DataFrame waarin de kolomnamen motiefsleutels zijn.
Raadpleeg de GraphFrame-gebruikershandleiding voor meer informatie over de API.
// Search for pairs of vertices with edges in both directions between them.
val motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
display(motifs)
Omdat het resultaat een DataFrame is, kunt u complexere query's bouwen op het motief. Laten we alle wederzijdse relaties vinden waarin één persoon ouder is dan 30:
val filtered = motifs.filter("b.age > 30")
display(filtered)
Stateful queries
De meeste patroonzoekopdrachten zijn stateloos en eenvoudig uit te voeren, zoals in de bovenstaande voorbeelden. In de volgende voorbeelden ziet u complexere query's die een toestand langs een pad in het motief dragen. Express deze queries door het vinden van GraphFrame-motieven te combineren met filters op het resultaat, waarbij de filters gebruikmaken van reeksbewerkingen om een reeks DataFrame-kolommen te maken.
Stel dat u een keten van vier hoekpunten wilt identificeren met een bepaalde eigenschap die is gedefinieerd door een reeks functies. Tussen ketens van 4 hoekpunten a->b->c->d
identificeert u de subset van ketens die overeenkomen met dit complexe filter:
- Initialiseer de status op het pad.
- Werk de status bij op basis van hoekpunt a.
- Status bijwerken op basis van hoekpunt b.
- Enzovoort voor c en d.
- Als de uiteindelijke status overeenkomt met een bepaalde voorwaarde, accepteert het filter de keten.
De volgende codefragmenten laten dit proces zien, waarbij we ketens van vier hoekpunten identificeren, zodat ten minste 2 van de drie randen 'vriend'-relaties zijn. In dit voorbeeld is de status het huidige aantal 'vriend'-randen; over het algemeen kan het een DataFrame-kolom zijn.
// Find chains of 4 vertices.
val chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")
// Query on sequence, with state (cnt)
// (a) Define method for updating state given the next element of the motif.
def sumFriends(cnt: Column, relationship: Column): Column = {
when(relationship === "friend", cnt + 1).otherwise(cnt)
}
// (b) Use sequence operation to apply method to sequence of elements in motif.
// In this case, the elements are the 3 edges.
val condition = Seq("ab", "bc", "cd").
foldLeft(lit(0))((cnt, e) => sumFriends(cnt, col(e)("relationship")))
// (c) Apply filter to DataFrame.
val chainWith2Friends2 = chain4.where(condition >= 2)
display(chainWith2Friends2)
Subgrafen
GraphFrames biedt API's voor het bouwen van subgrafen door te filteren op randen en hoekpunten. Deze filters kunnen samen worden samengesteld. De volgende subgraaf bevat bijvoorbeeld alleen personen die vrienden zijn en die ouder zijn dan 30 jaar.
// Select subgraph of users older than 30, and edges of type "friend"
val g2 = g
.filterEdges("relationship = 'friend'")
.filterVertices("age > 30")
.dropIsolatedVertices()
Complexe drievoudige filters
In het volgende voorbeeld ziet u hoe u een subgraaf selecteert op basis van drievoudige filters die op een rand en de hoekpunten 'src' en 'dst' werken. Het uitbreiden van dit voorbeeld om verder te gaan dan tripletten door complexere motieven te gebruiken, is eenvoudig.
// Select subgraph based on edges "e" of type "follow"
// pointing from a younger user "a" to an older user "b".
val paths = g.find("(a)-[e]->(b)")
.filter("e.relationship = 'follow'")
.filter("a.age < b.age")
// "paths" contains vertex info. Extract the edges.
val e2 = paths.select("e.src", "e.dst", "e.relationship")
// In Spark 1.5+, the user may simplify this call:
// val e2 = paths.select("e.*")
// Construct the subgraph
val g2 = GraphFrame(g.vertices, e2)
display(g2.vertices)
display(g2.edges)
Standaardgrafiekalgoritmen
In deze sectie worden de standaardgrafiekalgoritmen beschreven die zijn ingebouwd in GraphFrames.
Breedte eerst zoeken (BFS)
Zoek vanuit 'Esther' naar gebruikers van leeftijd < 32.
val paths: DataFrame = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32").run()
display(paths)
De zoekopdracht kan ook randfilters en maximale padlengten beperken.
val filteredPaths = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32")
.edgeFilter("relationship != 'friend'")
.maxPathLength(3)
.run()
display(filteredPaths)
Verbonden onderdelen
Bereken het lidmaatschap van het verbonden onderdeel van elk hoekpunt en retourneer een grafiek met elk hoekpunt waaraan een onderdeel-id is toegewezen.
val result = g.connectedComponents.run() // doesn't work on Spark 1.4
display(result)
Sterk verbonden onderdelen
Bereken het sterk verbonden onderdeel (SCC) van elk hoekpunt en retourneer een grafiek met elk hoekpunt dat is toegewezen aan de SCC die dat hoekpunt bevat.
val result = g.stronglyConnectedComponents.maxIter(10).run()
display(result.orderBy("component"))
Labelverspreiding
Voer het algoritme voor het doorgeven van statische labels uit voor het detecteren van community's in netwerken.
Elk knooppunt in het netwerk wordt in eerste instantie toegewezen aan een eigen community. Bij elke superstap sturen knooppunten hun communityrelatie naar alle buren en werken ze hun status bij naar de modus-communityrelatie van binnenkomende berichten.
LPA is een standaard algoritme voor communitydetectie voor grafieken. Het is goedkoop rekenkundig, hoewel (1) convergentie niet gegarandeerd is en (2) één kan eindigen met triviale oplossingen (alle knooppunten identificeren zich in één community).
val result = g.labelPropagation.maxIter(5).run()
display(result.orderBy("label"))
PageRank
Belangrijke hoekpunten in een grafiek identificeren op basis van verbindingen.
// Run PageRank until convergence to tolerance "tol".
val results = g.pageRank.resetProbability(0.15).tol(0.01).run()
display(results.vertices)
display(results.edges)
// Run PageRank for a fixed number of iterations.
val results2 = g.pageRank.resetProbability(0.15).maxIter(10).run()
display(results2.vertices)
// Run PageRank personalized for vertex "a"
val results3 = g.pageRank.resetProbability(0.15).maxIter(10).sourceId("a").run()
display(results3.vertices)
Kortste paden
Berekent kortste paden naar de opgegeven set hoekpunten, waarbij oriëntatiepunten worden opgegeven op basis van hoekpunt-id.
val paths = g.shortestPaths.landmarks(Seq("a", "d")).run()
display(paths)
Driehoeken tellen
Berekent het aantal driehoeken dat langs elk hoekpunt loopt.
import org.graphframes.examples
val g: GraphFrame = examples.Graphs.friends // get example graph
val results = g.triangleCount.run()
results.select("id", "count").show()