Jaa


GraphFrames user guide - Scala

This article demonstrates examples from the GraphFrames User Guide.

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._

Creating GraphFrames

You can create GraphFrames from vertex and edge DataFrames.

  • Vertex DataFrame: A vertex DataFrame should contain a special column named id which specifies unique IDs for each vertex in the graph.
  • Edge DataFrame: An edge DataFrame should contain two special columns: src (source vertex ID of edge) and dst (destination vertex ID of edge).

Both DataFrames can have arbitrary other columns. Those columns can represent vertex and edge attributes.

Create the vertices and edges

// Vertex DataFrame
val v = spark.createDataFrame(List(
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60)
)).toDF("id", "name", "age")
// Edge DataFrame
val e = spark.createDataFrame(List(
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend")
)).toDF("src", "dst", "relationship")

Let’s create a graph from these vertices and these edges:

val g = GraphFrame(v, e)
// This example graph also comes with the GraphFrames package.
// val g = examples.Graphs.friends

Basic graph and DataFrame queries

GraphFrames provide simple graph queries, such as node degree.

Also, since GraphFrames represent graphs as pairs of vertex and edge DataFrames, it is easy to make powerful queries directly on the vertex and edge DataFrames. Those DataFrames are available as vertices and edges fields in the GraphFrame.

display(g.vertices)
display(g.edges)

The incoming degree of the vertices:

display(g.inDegrees)

The outgoing degree of the vertices:

display(g.outDegrees)

The degree of the vertices:

display(g.degrees)

You can run queries directly on the vertices DataFrame. For example, we can find the age of the youngest person in the graph:

val youngest = g.vertices.groupBy().min("age")
display(youngest)

Likewise, you can run queries on the edges DataFrame. For example, let us count the number of ‘follow’ relationships in the graph:

val numFollows = g.edges.filter("relationship = 'follow'").count()

Motif finding

Build more complex relationships involving edges and vertices using motifs. The following cell finds the pairs of vertices with edges in both directions between them. The result is a DataFrame, in which the column names are motif keys.

Check out the GraphFrame User Guide for more details on the API.

// Search for pairs of vertices with edges in both directions between them.
val motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
display(motifs)

Since the result is a DataFrame, you can build more complex queries can on top of the motif. Let us find all the reciprocal relationships in which one person is older than 30:

val filtered = motifs.filter("b.age > 30")
display(filtered)

Stateful queries

Most motif queries are stateless and simple to express, as in the examples above. The next examples demonstrate more complex queries which carry state along a path in the motif. Express these queries by combining GraphFrame motif finding with filters on the result, where the filters use sequence operations to construct a series of DataFrame columns.

For example, suppose you want to identify a chain of 4 vertices with some property defined by a sequence of functions. That is, among chains of 4 vertices a->b->c->d, identify the subset of chains matching this complex filter:

  • Initialize state on path.
  • Update state based on vertex a.
  • Update state based on vertex b.
  • Etc. for c and d.
  • If final state matches some condition, then the filter accepts the chain.

The following code snippets demonstrate this process, where we identify chains of 4 vertices such that at least 2 of the 3 edges are “friend” relationships. In this example, the state is the current count of “friend” edges; in general, it could be any DataFrame column.

// Find chains of 4 vertices.
val chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")

// Query on sequence, with state (cnt)
//  (a) Define method for updating state given the next element of the motif.
def sumFriends(cnt: Column, relationship: Column): Column = {
  when(relationship === "friend", cnt + 1).otherwise(cnt)
}
//  (b) Use sequence operation to apply method to sequence of elements in motif.
//      In this case, the elements are the 3 edges.
val condition = Seq("ab", "bc", "cd").
  foldLeft(lit(0))((cnt, e) => sumFriends(cnt, col(e)("relationship")))
//  (c) Apply filter to DataFrame.
val chainWith2Friends2 = chain4.where(condition >= 2)
display(chainWith2Friends2)

Subgraphs

GraphFrames provides APIs for building subgraphs by filtering on edges and vertices. These filters can composed together. For example, the following subgraph contains only people who are friends and who are more than 30 years old.

// Select subgraph of users older than 30, and edges of type "friend"
val g2 = g
  .filterEdges("relationship = 'friend'")
  .filterVertices("age > 30")
  .dropIsolatedVertices()

Complex triplet filters

The following example shows how to select a subgraph based upon triplet filters that operate on an edge and its “src” and “dst” vertices. Extending this example to go beyond triplets by using more complex motifs is simple.

// Select subgraph based on edges "e" of type "follow"
// pointing from a younger user "a" to an older user "b".
val paths = g.find("(a)-[e]->(b)")
  .filter("e.relationship = 'follow'")
  .filter("a.age < b.age")
// "paths" contains vertex info. Extract the edges.
val e2 = paths.select("e.src", "e.dst", "e.relationship")
// In Spark 1.5+, the user may simplify this call:
//  val e2 = paths.select("e.*")

// Construct the subgraph
val g2 = GraphFrame(g.vertices, e2)
display(g2.vertices)
display(g2.edges)

Standard graph algorithms

This section describes the standard graph algorithms built into GraphFrames.

Breadth-first search (BFS)

Search from “Esther” for users of age < 32.

val paths: DataFrame = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32").run()
display(paths)

The search may also limit edge filters and maximum path lengths.

val filteredPaths = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32")
  .edgeFilter("relationship != 'friend'")
  .maxPathLength(3)
  .run()
display(filteredPaths)

Connected components

Compute the connected component membership of each vertex and return a graph with each vertex assigned a component ID.

val result = g.connectedComponents.run() // doesn't work on Spark 1.4
display(result)

Strongly connected components

Compute the strongly connected component (SCC) of each vertex and return a graph with each vertex assigned to the SCC containing that vertex.

val result = g.stronglyConnectedComponents.maxIter(10).run()
display(result.orderBy("component"))

Label propagation

Run static Label Propagation Algorithm for detecting communities in networks.

Each node in the network is initially assigned to its own community. At every superstep, nodes send their community affiliation to all neighbors and update their state to the mode community affiliation of incoming messages.

LPA is a standard community detection algorithm for graphs. It is inexpensive computationally, although (1) convergence is not guaranteed and (2) one can end up with trivial solutions (all nodes identify into a single community).

val result = g.labelPropagation.maxIter(5).run()
display(result.orderBy("label"))

PageRank

Identify important vertices in a graph based on connections.

// Run PageRank until convergence to tolerance "tol".
val results = g.pageRank.resetProbability(0.15).tol(0.01).run()
display(results.vertices)
display(results.edges)
// Run PageRank for a fixed number of iterations.
val results2 = g.pageRank.resetProbability(0.15).maxIter(10).run()
display(results2.vertices)
// Run PageRank personalized for vertex "a"
val results3 = g.pageRank.resetProbability(0.15).maxIter(10).sourceId("a").run()
display(results3.vertices)

Shortest paths

Computes shortest paths to the given set of landmark vertices, where landmarks specify by vertex ID.

val paths = g.shortestPaths.landmarks(Seq("a", "d")).run()
display(paths)

Triangle counting

Computes the number of triangles passing through each vertex.

import org.graphframes.examples
val g: GraphFrame = examples.Graphs.friends  // get example graph

val results = g.triangleCount.run()
results.select("id", "count").show()