Compartir vía


Guía del usuario de GraphFrames: Scala

En este artículo, se muestran ejemplos de la Guía del usuario de GraphFrames.

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._

Creación de GraphFrames

Puede crear GraphFrames a partir de DataFrames de vértice y de borde.

  • DataFrame de vértice: un DataFrame de vértice debe contener una columna especial denominada id, que especifica identificadores únicos para cada vértice del grafo.
  • DataFrame de borde: un DataFrame de borde debe contener dos columnas especiales: src (id. del vértice de origen del borde) y dst (id. del vértice de destino del borde).

Ambos DataFrames pueden tener otras columnas arbitrarias. Esas columnas pueden representar atributos de vértice y de borde.

Cree los vértices y los bordes.

// Vertex DataFrame
val v = spark.createDataFrame(List(
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60)
)).toDF("id", "name", "age")
// Edge DataFrame
val e = spark.createDataFrame(List(
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend")
)).toDF("src", "dst", "relationship")

Vamos a crear un grafo a partir de estos vértices y bordes:

val g = GraphFrame(v, e)
// This example graph also comes with the GraphFrames package.
// val g = examples.Graphs.friends

Consultas básicas de grafo y DataFrame

Los GraphFrames proporcionan consultas de grafo sencillas, como el grado de nodo.

Además, dado que los GraphFrames representan los grafos como pares de DataFrames de vértice y de borde, es fácil hacer consultas eficaces directamente en los DataFrames de vértice y de borde. Esos DataFrames están disponibles como campos de vértices y bordes en el GraphFrame.

display(g.vertices)
display(g.edges)

Grado de entrada de los vértices:

display(g.inDegrees)

Grado de salida de los vértices:

display(g.outDegrees)

Grado de los vértices:

display(g.degrees)

Puede ejecutar consultas directamente en el DataFrame de vértices. Por ejemplo, podemos buscar la edad de la persona más joven del grafo:

val youngest = g.vertices.groupBy().min("age")
display(youngest)

De igual modo, puede ejecutar consultas en el DataFrame de bordes. Por ejemplo, contemos el número de relaciones “follow” del grafo:

val numFollows = g.edges.filter("relationship = 'follow'").count()

Búsqueda con motifs

Cree relaciones más complejas que impliquen bordes y vértices mediante el uso de motifs. La siguiente celda busca los pares de vértices con bordes en ambas direcciones entre ellos. El resultado es un DataFrame, en el que los nombres de columna son claves de motifs.

Consulte la Guía del usuario de GraphFrame para obtener más detalles sobre la API.

// Search for pairs of vertices with edges in both directions between them.
val motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
display(motifs)

Dado que el resultado es un DataFrame, puede crear consultas más complejas con el motif. Busquemos todas las relaciones recíprocas en las que una persona tiene más de 30 años:

val filtered = motifs.filter("b.age > 30")
display(filtered)

Consultas con estado

La mayoría de las consultas con motif son sin estado y sencillas de expresar, como en los ejemplos anteriores. En los ejemplos siguientes, se muestran consultas más complejas que llevan el estado a lo largo de una ruta de acceso en el motif. Exprese estas consultas combinando la búsqueda con motif de GraphFrame con filtros en el resultado, donde los filtros usen operaciones de secuencia para construir una serie de columnas de DataFrame.

Por ejemplo, supongamos que quiere identificar una cadena de 4 vértices con alguna propiedad definida por una secuencia de funciones. Es decir, entre las cadenas de 4 vértices a->b->c->d, identifique el subconjunto de cadenas que coinciden con este filtro complejo:

  • Inicializar el estado en la ruta de acceso.
  • Actualizar el estado en función del vértice a.
  • Actualizar el estado en función del vértice b.
  • Etc. para c y d.
  • Si el estado final coincide con alguna condición, el filtro acepta la cadena.

Los fragmentos de código siguientes muestran este proceso, donde identificamos cadenas de 4 vértices de forma que al menos 2 de los 3 bordes sean relaciones “friend”. En este ejemplo, el estado es el recuento actual de los bordes “friend”. En general, podría ser cualquier columna de DataFrame.

// Find chains of 4 vertices.
val chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")

// Query on sequence, with state (cnt)
//  (a) Define method for updating state given the next element of the motif.
def sumFriends(cnt: Column, relationship: Column): Column = {
  when(relationship === "friend", cnt + 1).otherwise(cnt)
}
//  (b) Use sequence operation to apply method to sequence of elements in motif.
//      In this case, the elements are the 3 edges.
val condition = Seq("ab", "bc", "cd").
  foldLeft(lit(0))((cnt, e) => sumFriends(cnt, col(e)("relationship")))
//  (c) Apply filter to DataFrame.
val chainWith2Friends2 = chain4.where(condition >= 2)
display(chainWith2Friends2)

Subgrafos

GraphFrames proporciona API para crear subgrafos mediante el filtrado de bordes y vértices. Estos filtros se pueden componer juntos. Por ejemplo, el subgrafo siguiente contiene solo personas que son amigos y que tienen más de 30 años.

// Select subgraph of users older than 30, and edges of type "friend"
val g2 = g
  .filterEdges("relationship = 'friend'")
  .filterVertices("age > 30")
  .dropIsolatedVertices()

Filtros de tripletes complejos

En el ejemplo siguiente, se muestra cómo seleccionar un subgrafo basado en filtros de tripletes que funcionan en un borde y sus vértices “src” y “dst”. Extender este ejemplo para ir más allá de los tripletes mediante el uso de procedimientos más complejos es sencillo.

// Select subgraph based on edges "e" of type "follow"
// pointing from a younger user "a" to an older user "b".
val paths = g.find("(a)-[e]->(b)")
  .filter("e.relationship = 'follow'")
  .filter("a.age < b.age")
// "paths" contains vertex info. Extract the edges.
val e2 = paths.select("e.src", "e.dst", "e.relationship")
// In Spark 1.5+, the user may simplify this call:
//  val e2 = paths.select("e.*")

// Construct the subgraph
val g2 = GraphFrame(g.vertices, e2)
display(g2.vertices)
display(g2.edges)

Algoritmos de grafo estándar

En esta sección, se describen los algoritmos de grafo estándar integrados en GraphFrames.

Búsqueda en amplitud (BFS)

Busque usuarios con menos de 32 años en “Esther”.

val paths: DataFrame = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32").run()
display(paths)

La búsqueda también puede limitar los filtros de borde y las longitudes máximas de las rutas de acceso.

val filteredPaths = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32")
  .edgeFilter("relationship != 'friend'")
  .maxPathLength(3)
  .run()
display(filteredPaths)

Componentes conectados

Calcule la pertenencia a componentes conectados de cada vértice y devuelva un grafo con cada vértice asignado a un identificador de componente.

val result = g.connectedComponents.run() // doesn't work on Spark 1.4
display(result)

Componentes fuertemente conectados

Calcule el componente fuertemente conectado (SCC) de cada vértice y devuelva un grafo con cada vértice asignado al SCC que contiene ese vértice.

val result = g.stronglyConnectedComponents.maxIter(10).run()
display(result.orderBy("component"))

Propagación de etiquetas

Ejecute el algoritmo de propagación de etiquetas (LPA) estático para detectar comunidades en redes.

Cada nodo de la red se asigna inicialmente a su propia comunidad. En cada superpaso, los nodos envían su afiliación a la comunidad a todos los vecinos y actualizan su estado al modo de afiliación de la comunidad de mensajes entrantes.

LPA es un algoritmo de detección de comunidad estándar para grafos. No consume muchos recursos, aunque (1) la convergencia no está garantizada y (2) puede acabar con soluciones triviales (todos los nodos identificados en una sola comunidad).

val result = g.labelPropagation.maxIter(5).run()
display(result.orderBy("label"))

Rango de página

Identifique vértices importantes en un grafo en función de las conexiones.

// Run PageRank until convergence to tolerance "tol".
val results = g.pageRank.resetProbability(0.15).tol(0.01).run()
display(results.vertices)
display(results.edges)
// Run PageRank for a fixed number of iterations.
val results2 = g.pageRank.resetProbability(0.15).maxIter(10).run()
display(results2.vertices)
// Run PageRank personalized for vertex "a"
val results3 = g.pageRank.resetProbability(0.15).maxIter(10).sourceId("a").run()
display(results3.vertices)

Rutas de acceso más cortas

Calcula las rutas de acceso más cortas al conjunto de vértices de punto de referencia especificado, donde los puntos de referencia se especifican por id. de vértice.

val paths = g.shortestPaths.landmarks(Seq("a", "d")).run()
display(paths)

Recuento de triángulos

Calcula el número de triángulos que pasan por cada vértice.

import org.graphframes.examples
val g: GraphFrame = examples.Graphs.friends  // get example graph

val results = g.triangleCount.run()
results.select("id", "count").show()