Condividi tramite


Guida dell'utente di GraphFrames - Scala

Questo articolo illustra esempi della guida utente GraphFrames.

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._

Creazione di graphframe

È possibile creare GraphFrames da DataFrame di vertici e archi.

  • DataFrame vertice: un dataframe dei vertici deve contenere una colonna speciale denominata id che specifica gli ID univoci per ogni vertice nel grafico.
  • DataFrame arco: un dataframe arco deve contenere due colonne speciali: src (ID del nodo sorgente dell'arco) e dst (ID del nodo di destinazione dell'arco).

Entrambi i dataframe possono avere altre colonne arbitrarie. Queste colonne possono rappresentare attributi di vertice e di arco.

Creare i vertici e i bordi

// Vertex DataFrame
val v = spark.createDataFrame(List(
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60)
)).toDF("id", "name", "age")
// Edge DataFrame
val e = spark.createDataFrame(List(
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend")
)).toDF("src", "dst", "relationship")

Si creerà un grafo da questi vertici e i bordi seguenti:

val g = GraphFrame(v, e)
// This example graph also comes with the GraphFrames package.
// val g = examples.Graphs.friends

Query di base su grafo e dataframe

GraphFrames fornisce semplici query a grafo, ad esempio il grado di nodo.

Inoltre, poiché i GraphFrames rappresentano i grafici come coppie di DataFrame di vertici e bordi, è facile eseguire query potenti direttamente sui DataFrame di vertici e di bordi. Tali dataframe sono disponibili come vertici e campi archi nel GraphFrame.

display(g.vertices)
display(g.edges)

Grado d'ingresso dei vertici

display(g.inDegrees)

Il grado di uscita dei vertici:

display(g.outDegrees)

Grado dei vertici:

display(g.degrees)

È possibile eseguire query direttamente nel dataframe dei vertici. Ad esempio, è possibile trovare l'età della persona più giovane nel grafico:

val youngest = g.vertices.groupBy().min("age")
display(youngest)

Analogamente, è possibile eseguire query sui bordi del dataframe. Si supponga, ad esempio, di contare il numero di relazioni "follow" nel grafico:

val numFollows = g.edges.filter("relationship = 'follow'").count()

Ricerca di motivi

Creare relazioni più complesse che coinvolgono archi e vertici utilizzando motivi. La cella seguente trova le coppie di vertici con archi in entrambe le direzioni tra di esse. Il risultato è un DataFrame, in cui i nomi delle colonne sono chiavi del motivo.

Per altri dettagli sull'API, vedere GraphFrame User Guide (Guida utente di GraphFrame).

// Search for pairs of vertices with edges in both directions between them.
val motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
display(motifs)

Poiché il risultato è un dataframe, è possibile creare query più complesse sopra il motivo. Cerchiamo di trovare tutte le relazioni reciproche in cui una persona ha più di 30 anni:

val filtered = motifs.filter("b.age > 30")
display(filtered)

Query con stato

La maggior parte delle query sui motivi sono senza stato e semplici da esprimere, come negli esempi precedenti. Gli esempi successivi illustrano query più complesse che trasportano lo stato lungo un percorso nel motivo. Effettua queste query combinando la ricerca di motivi GraphFrame con filtri sul risultato, in cui i filtri utilizzano operazioni di sequenza per costruire una serie di colonne in un DataFrame.

Si supponga, ad esempio, di voler identificare una catena di 4 vertici con una proprietà definita da una sequenza di funzioni. Ovvero, tra le catene di 4 vertici a->b->c->d, identificare il subset di catene corrispondenti a questo filtro complesso:

  • Inizializzare lo stato sul percorso.
  • Aggiornare lo stato in base al vertice a.
  • Aggiornare lo stato in base al vertice b.
  • Eccetera per c e d.
  • Se lo stato finale corrisponde a una condizione, il filtro accetta la catena.

I frammenti di codice seguenti illustrano questo processo, in cui vengono identificate catene di 4 vertici in modo che almeno 2 dei 3 bordi siano relazioni "friend". In questo esempio, lo stato è il conteggio corrente dei bordi "friend" ; in generale, potrebbe trattarsi di qualsiasi colonna dataframe.

// Find chains of 4 vertices.
val chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")

// Query on sequence, with state (cnt)
//  (a) Define method for updating state given the next element of the motif.
def sumFriends(cnt: Column, relationship: Column): Column = {
  when(relationship === "friend", cnt + 1).otherwise(cnt)
}
//  (b) Use sequence operation to apply method to sequence of elements in motif.
//      In this case, the elements are the 3 edges.
val condition = Seq("ab", "bc", "cd").
  foldLeft(lit(0))((cnt, e) => sumFriends(cnt, col(e)("relationship")))
//  (c) Apply filter to DataFrame.
val chainWith2Friends2 = chain4.where(condition >= 2)
display(chainWith2Friends2)

Sottogrammi

GraphFrames fornisce API per la compilazione di sottogrammi filtrando i bordi e i vertici. Questi filtri possono essere composti insieme. Ad esempio, il sottografo seguente contiene solo persone che sono amici e che hanno più di 30 anni.

// Select subgraph of users older than 30, and edges of type "friend"
val g2 = g
  .filterEdges("relationship = 'friend'")
  .filterVertices("age > 30")
  .dropIsolatedVertices()

Filtri triplet complessi

Nell'esempio seguente viene illustrato come selezionare un sottografo in base ai filtri triplet che operano su un bordo e sui relativi vertici "src" e "dst". L'estensione di questo esempio per andare oltre le triplette usando motivi più complessi è semplice.

// Select subgraph based on edges "e" of type "follow"
// pointing from a younger user "a" to an older user "b".
val paths = g.find("(a)-[e]->(b)")
  .filter("e.relationship = 'follow'")
  .filter("a.age < b.age")
// "paths" contains vertex info. Extract the edges.
val e2 = paths.select("e.src", "e.dst", "e.relationship")
// In Spark 1.5+, the user may simplify this call:
//  val e2 = paths.select("e.*")

// Construct the subgraph
val g2 = GraphFrame(g.vertices, e2)
display(g2.vertices)
display(g2.edges)

Algoritmi di grafi standard

Questa sezione descrive gli algoritmi del grafo standard incorporati in GraphFrame.

Ricerca in ampiezza (BFS)

Cerca utenti di età < 32 con nome "Esther".

val paths: DataFrame = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32").run()
display(paths)

La ricerca può anche limitare i filtri perimetrali e la lunghezza massima del percorso.

val filteredPaths = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32")
  .edgeFilter("relationship != 'friend'")
  .maxPathLength(3)
  .run()
display(filteredPaths)

Componenti connessi

Calcolare l'appartenenza al componente connesso di ogni vertice e restituire un grafo con ogni vertice a cui è stato assegnato un ID componente.

val result = g.connectedComponents.run() // doesn't work on Spark 1.4
display(result)

Componenti fortemente connessi

Calcolare il componente fortemente connesso (SCC) di ogni vertice e restituire un grafo con ogni vertice assegnato al SCC contenente tale vertice.

val result = g.stronglyConnectedComponents.maxIter(10).run()
display(result.orderBy("component"))

Propagazione delle etichette

Eseguire l'algoritmo di propagazione delle etichette statiche per rilevare le community nelle reti.

Ogni nodo della rete viene inizialmente assegnato alla propria community. A ogni passaggio superiore, i nodi inviano l'affiliazione della community a tutti i vicini e aggiornano il proprio stato all'affiliazione della community in modalità dei messaggi in arrivo.

LPA è un algoritmo standard di rilevamento delle comunità per i grafi. È poco costoso dal punto di vista computazionale, anche se (1) la convergenza non è garantita e (2) si può finire con soluzioni semplici (tutti i nodi identificano in una singola community).

val result = g.labelPropagation.maxIter(5).run()
display(result.orderBy("label"))

PageRank

Identificare vertici importanti in un grafico in base alle connessioni.

// Run PageRank until convergence to tolerance "tol".
val results = g.pageRank.resetProbability(0.15).tol(0.01).run()
display(results.vertices)
display(results.edges)
// Run PageRank for a fixed number of iterations.
val results2 = g.pageRank.resetProbability(0.15).maxIter(10).run()
display(results2.vertices)
// Run PageRank personalized for vertex "a"
val results3 = g.pageRank.resetProbability(0.15).maxIter(10).sourceId("a").run()
display(results3.vertices)

Percorsi più brevi

Calcola i percorsi più brevi verso il set specificato di vertici di riferimento, dove i punti di riferimento sono specificati tramite l'ID del vertice.

val paths = g.shortestPaths.landmarks(Seq("a", "d")).run()
display(paths)

Conteggio triangoli

Calcola il numero di triangoli che passano attraverso ogni vertice.

import org.graphframes.examples
val g: GraphFrame = examples.Graphs.friends  // get example graph

val results = g.triangleCount.run()
results.select("id", "count").show()