Partilhar via


Guia do usuário do GraphFrames - Scala

Este artigo demonstra exemplos do Guia do Usuário do GraphFrames.

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._

Criando GraphFrames

Você pode criar GraphFrames a partir de DataFrames de vértice e borda.

  • Vertex DataFrame: Um DataFrame de vértice deve conter um column especial chamado id que especifica IDs exclusivos para cada vértice no gráfico.
  • Edge DataFrame: Um DataFrame de borda deve conter dois columnsespeciais: src (ID de vértice de origem da borda) e dst (ID do vértice de destino da borda).

Ambos os DataFrames podem ter outros columnsarbitrários. Esses columns podem representar atributos de vértice e borda.

Crie os vértices e arestas

// Vertex DataFrame
val v = spark.createDataFrame(List(
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60)
)).toDF("id", "name", "age")
// Edge DataFrame
val e = spark.createDataFrame(List(
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend")
)).toDF("src", "dst", "relationship")

Vamos criar um gráfico a partir destes vértices e destas arestas:

val g = GraphFrame(v, e)
// This example graph also comes with the GraphFrames package.
// val g = examples.Graphs.friends

Consultas básicas de gráficos e DataFrame

Os GraphFrames fornecem consultas gráficas simples, como o grau de nodo.

Além disso, como GraphFrames representam gráficos como pares de DataFrames de vértice e borda, é fácil fazer consultas poderosas diretamente nos DataFrames de vértice e borda. Esses DataFrames estão disponíveis como campos de vértices e arestas no GraphFrame.

display(g.vertices)
display(g.edges)

O grau de entrada dos vértices:

display(g.inDegrees)

O grau de saída dos vértices:

display(g.outDegrees)

O grau dos vértices:

display(g.degrees)

Você pode executar consultas diretamente no DataFrame de vértices. Por exemplo, podemos encontrar a idade da pessoa mais jovem no gráfico:

val youngest = g.vertices.groupBy().min("age")
display(youngest)

Da mesma forma, você pode executar consultas nas bordas DataFrame. Por exemplo, vamos contar o número de relações de 'seguimento' no gráfico.

val numFollows = g.edges.filter("relationship = 'follow'").count()

Descoberta de motivos

Construa relações mais complexas envolvendo arestas e vértices usando motivos. A célula seguinte encontra os pares de vértices com arestas em ambas as direções entre eles. O resultado é um DataFrame, no qual os nomes column são chaves de motivos.

Confira o Guia do Usuário do GraphFrame para obter mais detalhes sobre a API.

// Search for pairs of vertices with edges in both directions between them.
val motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
display(motifs)

Como o resultado é um DataFrame, você pode criar consultas mais complexas sobre o motivo. Vamos encontrar todas as relações recíprocas em que uma pessoa tem mais de 30 anos:

val filtered = motifs.filter("b.age > 30")
display(filtered)

Consultas com estado

A maioria das consultas de padrões não mantém estado e são simples de expressar, como nos exemplos acima. Os exemplos seguintes demonstram consultas mais complexas que mantêm o estado ao longo de um caminho dentro do motivo. Expresse essas consultas combinando a localização do motivo GraphFrame com filtros no resultado, where os filtros usam operações de sequência para construir uma série de columnsDataFrame.

Por exemplo, suponha que você queira identificar uma cadeia de 4 vértices com alguma propriedade definida por uma sequência de funções. Ou seja, entre cadeias de 4 vértices a->b->c->d, identifique o subconjunto de cadeias correspondentes a este filtro complexo:

  • Inicializar o estado no caminho.
  • Update estado baseado no vértice a.
  • Update estado baseado no vértice b.
  • Etc. para c e d.
  • Se o estado final corresponder a alguma condição, o filtro aceita a cadeia.

Os trechos de código a seguir demonstram esse processo, where identificamos cadeias de 4 vértices de tal forma que pelo menos 2 das 3 arestas são relações "amigas". Neste exemplo, o estado é a contagem atual de arestas "amigas"; em geral, pode ser qualquer DataFrame column.

// Find chains of 4 vertices.
val chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")

// Query on sequence, with state (cnt)
//  (a) Define method for updating state given the next element of the motif.
def sumFriends(cnt: Column, relationship: Column): Column = {
  when(relationship === "friend", cnt + 1).otherwise(cnt)
}
//  (b) Use sequence operation to apply method to sequence of elements in motif.
//      In this case, the elements are the 3 edges.
val condition = Seq("ab", "bc", "cd").
  foldLeft(lit(0))((cnt, e) => sumFriends(cnt, col(e)("relationship")))
//  (c) Apply filter to DataFrame.
val chainWith2Friends2 = chain4.where(condition >= 2)
display(chainWith2Friends2)

Subgrafos

GraphFrames fornece APIs para a criação de subgráficos filtrando em bordas e vértices. Estes filtros podem ser compostos em conjunto. Por exemplo, o subgráfico a seguir contém apenas pessoas que são amigas e que têm mais de 30 anos de idade.

// Select subgraph of users older than 30, and edges of type "friend"
val g2 = g
  .filterEdges("relationship = 'friend'")
  .filterVertices("age > 30")
  .dropIsolatedVertices()

Filtros trigêmeos complexos

O exemplo a seguir mostra como select um subgrafo com base em filtros de tripletos que operam numa aresta juntamente com os seus vértices "src" e "dst". Estender este exemplo para ir além dos trigêmeos usando motivos mais complexos é simples.

// Select subgraph based on edges "e" of type "follow"
// pointing from a younger user "a" to an older user "b".
val paths = g.find("(a)-[e]->(b)")
  .filter("e.relationship = 'follow'")
  .filter("a.age < b.age")
// "paths" contains vertex info. Extract the edges.
val e2 = paths.select("e.src", "e.dst", "e.relationship")
// In Spark 1.5+, the user may simplify this call:
//  val e2 = paths.select("e.*")

// Construct the subgraph
val g2 = GraphFrame(g.vertices, e2)
display(g2.vertices)
display(g2.edges)

Algoritmos de gráficos padrão

Esta seção descreve os algoritmos de gráficos padrão incorporados em GraphFrames.

Pesquisa ampliada (BFS)

Pesquise em "Esther" por usuários de idade < 32 anos.

val paths: DataFrame = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32").run()
display(paths)

A pesquisa também pode limit filtros de borda e comprimentos máximos de caminho.

val filteredPaths = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32")
  .edgeFilter("relationship != 'friend'")
  .maxPathLength(3)
  .run()
display(filteredPaths)

Componentes conectados

Calcule a associação do componente conectado de cada vértice e retorne um gráfico com cada vértice atribuído a um ID de componente.

val result = g.connectedComponents.run() // doesn't work on Spark 1.4
display(result)

Componentes fortemente conectados

Calcule o componente fortemente conectado (SCC) de cada vértice e retorne um gráfico com cada vértice atribuído ao SCC que contém esse vértice.

val result = g.stronglyConnectedComponents.maxIter(10).run()
display(result.orderBy("component"))

Propagação do rótulo

Execute o algoritmo estático de propagação de rótulos para detetar comunidades em redes.

Cada nó na rede é inicialmente atribuído à sua própria comunidade. Em cada superetapa, os nós enviam sua afiliação à comunidade para todos os vizinhos e update seu estado para o modo de afiliação da comunidade de mensagens recebidas.

LPA é um algoritmo de deteção de comunidade padrão para gráficos. É barato computacionalmente, embora (1) a convergência não seja garantida e (2) pode-se acabar com soluções triviais (todos os nós se identificam em uma única comunidade).

val result = g.labelPropagation.maxIter(5).run()
display(result.orderBy("label"))

PageRank

Identificar vértices importantes em um gráfico com base em connections.

// Run PageRank until convergence to tolerance "tol".
val results = g.pageRank.resetProbability(0.15).tol(0.01).run()
display(results.vertices)
display(results.edges)
// Run PageRank for a fixed number of iterations.
val results2 = g.pageRank.resetProbability(0.15).maxIter(10).run()
display(results2.vertices)
// Run PageRank personalized for vertex "a"
val results3 = g.pageRank.resetProbability(0.15).maxIter(10).sourceId("a").run()
display(results3.vertices)

Caminhos mais curtos

Calcula os caminhos mais curtos para a set dada de vértices de pontos de referência, where pontos de referência especificados por ID de vértice.

val paths = g.shortestPaths.landmarks(Seq("a", "d")).run()
display(paths)

Contagem de triângulos

Calcula o número de triângulos que passam por cada vértice.

import org.graphframes.examples
val g: GraphFrame = examples.Graphs.friends  // get example graph

val results = g.triangleCount.run()
results.select("id", "count").show()