Compartilhar via


Guia do usuário do GraphFrames – Scala

Este artigo demonstra exemplos do Guia do usuário do GraphFrames.

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._

Como criar GraphFrames

Você pode criar GraphFrames com base em DataFrames de vértice e de borda.

  • DataFrame de vértice: um DataFrame de vértice deve conter uma coluna especial chamada id que especifica IDs exclusivas para cada vértice no grafo.
  • DataFrame de borda: um DataFrame de borda deve conter duas colunas especiais: src (ID de vértice de origem de borda) e dst (ID de vértice de destino da borda).

Os dois DataFrames podem ter outras colunas arbitrárias. Essas colunas podem representar os atributos de vértice e de borda.

Criar os vértices e as bordas

// Vertex DataFrame
val v = spark.createDataFrame(List(
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60)
)).toDF("id", "name", "age")
// Edge DataFrame
val e = spark.createDataFrame(List(
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend")
)).toDF("src", "dst", "relationship")

Vamos criar um grafo com base nestes vértices e nestas bordas:

val g = GraphFrame(v, e)
// This example graph also comes with the GraphFrames package.
// val g = examples.Graphs.friends

Consultas básicas de grafo e de DataFrame

Os GraphFrames fornecem consultas de grafo simples, como o grau de nó.

Além disso, como os GraphFrames representam grafos como pares de DataFrames de vértice e de borda, é fácil criar consultas eficientes diretamente nos DataFrames de vértice e de borda. Esses DataFrames estão disponíveis como campos de vértices e de bordas no GraphFrame.

display(g.vertices)
display(g.edges)

O grau de entrada dos vértices:

display(g.inDegrees)

O grau de saída dos vértices:

display(g.outDegrees)

O grau dos vértices:

display(g.degrees)

Você pode executar consultas diretamente no DataFrame de vértices. Por exemplo, podemos encontrar a idade da pessoa mais jovem no grafo:

val youngest = g.vertices.groupBy().min("age")
display(youngest)

Da mesma forma, você pode executar consultas no DataFrame de bordas. Por exemplo, vamos contar o número de relações de ‘seguidores’ no grafo:

val numFollows = g.edges.filter("relationship = 'follow'").count()

Localização de motivo

Crie relações mais complexas envolvendo bordas e vértices usando motivos. A célula a seguir localiza os pares de vértices com bordas em ambas as direções entre eles. O resultado é um DataFrame no qual os nomes das colunas são chaves de motivo.

Confira o Guia do usuário do GraphFrame para obter mais detalhes sobre a API.

// Search for pairs of vertices with edges in both directions between them.
val motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
display(motifs)

Como o resultado é um DataFrame, você pode criar consultas mais complexas com base no motivo. Vamos encontrar todas as relações recíprocas nas quais uma pessoa tem mais de 30 anos:

val filtered = motifs.filter("b.age > 30")
display(filtered)

Consultas com estado

A maioria das consultas de motivo é sem estado e simples de ser expressa, como nos exemplos acima. Os exemplos a seguir demonstram consultas mais complexas que carregam o estado ao longo de um caminho no motivo. Expresse essas consultas combinando a localização de motivo do GraphFrame com filtros no resultado, em que os filtros usam operações de sequência para construir uma série de colunas de DataFrame.

Por exemplo, suponha que você queira identificar uma cadeia de quatro vértices com uma propriedade definida por uma sequência de funções. Ou seja, entre cadeias de quatro vértices a->b->c->d, identifique o subconjunto de cadeias correspondentes a este filtro complexo:

  • Inicialize o estado no caminho.
  • Atualize o estado com base no vértice a.
  • Atualize o estado com base no vértice b.
  • Mesmo procedimento para c e d.
  • Se o estado final corresponder a alguma condição, o filtro aceitará a cadeia.

Os snippets de código a seguir demonstram esse processo, em que identificamos cadeias de quatro vértices de modo que, pelo menos, duas das três bordas sejam relações de “amigos”. Neste exemplo, o estado é a contagem atual de bordas de “amigos”. Em geral, pode ser qualquer coluna de DataFrame.

// Find chains of 4 vertices.
val chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")

// Query on sequence, with state (cnt)
//  (a) Define method for updating state given the next element of the motif.
def sumFriends(cnt: Column, relationship: Column): Column = {
  when(relationship === "friend", cnt + 1).otherwise(cnt)
}
//  (b) Use sequence operation to apply method to sequence of elements in motif.
//      In this case, the elements are the 3 edges.
val condition = Seq("ab", "bc", "cd").
  foldLeft(lit(0))((cnt, e) => sumFriends(cnt, col(e)("relationship")))
//  (c) Apply filter to DataFrame.
val chainWith2Friends2 = chain4.where(condition >= 2)
display(chainWith2Friends2)

Subgrafos

Os GraphFrames fornecem APIs para criar subgrafos filtrando bordas e vértices. Esses filtros podem ser compostos juntos. Por exemplo, o subgrafo a seguir contém apenas pessoas que são amigos e que têm mais de 30 anos.

// Select subgraph of users older than 30, and edges of type "friend"
val g2 = g
  .filterEdges("relationship = 'friend'")
  .filterVertices("age > 30")
  .dropIsolatedVertices()

Filtros triplos complexos

O exemplo a seguir mostra como selecionar um subgrafo com base nos filtros triplos que operam em uma borda e nos vértices “src” e “dst”. É simples estender este exemplo para além dos triplos usando motivos mais complexos.

// Select subgraph based on edges "e" of type "follow"
// pointing from a younger user "a" to an older user "b".
val paths = g.find("(a)-[e]->(b)")
  .filter("e.relationship = 'follow'")
  .filter("a.age < b.age")
// "paths" contains vertex info. Extract the edges.
val e2 = paths.select("e.src", "e.dst", "e.relationship")
// In Spark 1.5+, the user may simplify this call:
//  val e2 = paths.select("e.*")

// Construct the subgraph
val g2 = GraphFrame(g.vertices, e2)
display(g2.vertices)
display(g2.edges)

Algoritmos de grafo padrão

Esta seção descreve os algoritmos de grafo padrão incorporados ao GraphFrames.

BFS (pesquisa de balanceamento em largura)

Pesquise “Marina” para usuários com a idade < 32.

val paths: DataFrame = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32").run()
display(paths)

A pesquisa também pode limitar os filtros de borda e os comprimentos máximos de caminho.

val filteredPaths = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32")
  .edgeFilter("relationship != 'friend'")
  .maxPathLength(3)
  .run()
display(filteredPaths)

Componentes conectados

Calcule a associação de componente conectada de cada vértice e retorne um grafo com cada vértice atribuído a uma ID de componente.

val result = g.connectedComponents.run() // doesn't work on Spark 1.4
display(result)

Componentes com conexão forte

Calcule o SCC (componente com conexão forte) de cada vértice e retorne um grafo com cada vértice atribuído ao SCC que contém esse vértice.

val result = g.stronglyConnectedComponents.maxIter(10).run()
display(result.orderBy("component"))

Propagação de rótulo

Execute o Algoritmo de Propagação de Rótulo estático para detectar comunidades em redes.

Cada nó na rede é inicialmente atribuído à respectiva comunidade. Em cada superetapa, os nós enviam a afiliação à comunidade para todos os vizinhos e atualizam o estado para a afiliação da comunidade do modo das mensagens de entrada.

O LPA é um algoritmo de detecção de comunidade padrão para grafos. Ele é de baixo custo em termos computacionais, embora (1) a convergência não seja garantida e (2) seja possível acabar com soluções triviais (todos os nós se identificam em uma só comunidade).

val result = g.labelPropagation.maxIter(5).run()
display(result.orderBy("label"))

PageRank

Identifique vértices importantes em um grafo com base nas conexões.

// Run PageRank until convergence to tolerance "tol".
val results = g.pageRank.resetProbability(0.15).tol(0.01).run()
display(results.vertices)
display(results.edges)
// Run PageRank for a fixed number of iterations.
val results2 = g.pageRank.resetProbability(0.15).maxIter(10).run()
display(results2.vertices)
// Run PageRank personalized for vertex "a"
val results3 = g.pageRank.resetProbability(0.15).maxIter(10).sourceId("a").run()
display(results3.vertices)

Caminhos mais curtos

Calcula caminhos mais curtos para o conjunto especificado de vértices de pontos de referência, em que os pontos de referência são especificados pela ID de vértice.

val paths = g.shortestPaths.landmarks(Seq("a", "d")).run()
display(paths)

Contagem de triângulos

Calcula o número de triângulos que passam por vértice.

import org.graphframes.examples
val g: GraphFrame = examples.Graphs.friends  // get example graph

val results = g.triangleCount.run()
results.select("id", "count").show()