Sdílet prostřednictvím


Uživatelská příručka k GraphFrames – Scala

Tento článek ukazuje příklady z uživatelské příručky GraphFrames.

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._

Vytváření grafových rámců

GraphFrames můžete vytvářet z vrcholů a hraničních datových rámců.

  • Datový rámec vrcholu: Datový rámec vrcholu by měl obsahovat speciální sloupec s názvem id, který určuje jedinečná ID pro každý vrchol v grafu.
  • Datový rámec hrany: Datový rámec hrany by měl obsahovat dva speciální sloupce: src (ID zdrojového vrcholu hrany) a dst (ID cílového vrcholu hrany).

Oba datové rámce můžou mít libovolné jiné sloupce. Tyto sloupce můžou představovat vrchol a hraniční atributy.

Vytvoření vrcholů a hran

// Vertex DataFrame
val v = spark.createDataFrame(List(
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60)
)).toDF("id", "name", "age")
// Edge DataFrame
val e = spark.createDataFrame(List(
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend")
)).toDF("src", "dst", "relationship")

Pojďme vytvořit graf z těchto vrcholů a těchto hran:

val g = GraphFrame(v, e)
// This example graph also comes with the GraphFrames package.
// val g = examples.Graphs.friends

Základní dotazy na grafy a datové rámce

GraphFrames poskytují jednoduché dotazy na grafy, jako je například stupeň uzlu.

Vzhledem k tomu, že GraphFrames představují grafy jako dvojice vrcholů a hraničních datových rámců, je snadné vytvářet výkonné dotazy přímo na vrchol a hraniční datové rámce. Tyto datové rámce jsou k dispozici jako vrcholy a hrany v GraphFrame.

display(g.vertices)
display(g.edges)

Příchozí stupeň vrcholů:

display(g.inDegrees)

Výstupní stupeň vrcholů:

display(g.outDegrees)

Stupeň vrcholů:

display(g.degrees)

Dotazy můžete spouštět přímo na datovém rámci vrcholů. Můžeme například najít věk nejmenší osoby v grafu:

val youngest = g.vertices.groupBy().min("age")
display(youngest)

Podobně můžete spouštět dotazy na hranách DataFrame. Pojďme například spočítat počet relací sledování v grafu:

val numFollows = g.edges.filter("relationship = 'follow'").count()

Hledání motivů

Vytvářejte složitější vztahy zahrnující hrany a vrcholy pomocí motivů. Následující buňka najde dvojice vrcholů s hranami v obou směrech mezi nimi. Výsledkem je datový rámec, kde klíče motivu tvoří názvy sloupců.

Další podrobnosti o rozhraní API najdete v uživatelské příručce GraphFrame.

// Search for pairs of vertices with edges in both directions between them.
val motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
display(motifs)

Vzhledem k tomu, že výsledkem je datový rámec, můžete na motivu vytvořit složitější dotazy. Pojďme najít všechny vzájemné vztahy, ve kterých je jedna osoba starší než 30:

val filtered = motifs.filter("b.age > 30")
display(filtered)

Stavové dotazy

Většina dotazů motivů je bezstavová a jednoduchá, jak je znázorněno v příkladech výše. Další příklady ukazují složitější dotazy, které nesou stav podél cesty v rámci motivu. Tyto dotazy můžete vyjádřit kombinováním hledání motivu GraphFrame s filtry ve výsledku, kdy filtry používají sekvenční operace k vytvoření řady sloupců datového rámce.

Předpokládejme například, že chcete identifikovat řetězec 4 vrcholů s určitou vlastností definovanou posloupností funkcí. To znamená, že mezi řetězci 4 vrcholů a->b->c->d, identifikujte podmnožinu řetězců odpovídajících tomuto komplexnímu filtru:

  • Inicializace stavu v cestě
  • Aktualizujte stav na základě vrcholu A.
  • Aktualizujte stav podle vrcholu b.
  • A tak dále pro c a d.
  • Pokud konečný stav odpovídá určité podmínce, filtr přijme řetězec.

Následující úryvky kódu demonstrují tento proces, kdy identifikujeme řetězy se 4 vrcholy tak, že alespoň 2 ze 3 hran jsou vztahy „přítel“. V tomto příkladu je stav aktuálním počtem "přátelských" hran; Obecně může být libovolný sloupec datového rámce.

// Find chains of 4 vertices.
val chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")

// Query on sequence, with state (cnt)
//  (a) Define method for updating state given the next element of the motif.
def sumFriends(cnt: Column, relationship: Column): Column = {
  when(relationship === "friend", cnt + 1).otherwise(cnt)
}
//  (b) Use sequence operation to apply method to sequence of elements in motif.
//      In this case, the elements are the 3 edges.
val condition = Seq("ab", "bc", "cd").
  foldLeft(lit(0))((cnt, e) => sumFriends(cnt, col(e)("relationship")))
//  (c) Apply filter to DataFrame.
val chainWith2Friends2 = chain4.where(condition >= 2)
display(chainWith2Friends2)

Podgrafy

GraphFrames poskytuje rozhraní API pro vytváření podgrafů filtrováním okrajů a vrcholů. Tyto filtry se můžou skládat dohromady. Následující podgraf například obsahuje jenom lidi, kteří jsou přátelé a kteří jsou starší než 30 let.

// Select subgraph of users older than 30, and edges of type "friend"
val g2 = g
  .filterEdges("relationship = 'friend'")
  .filterVertices("age > 30")
  .dropIsolatedVertices()

Komplexní trojité filtry

Následující příklad ukazuje, jak vybrat podgraf na základě trojitých filtrů, které pracují na okraji a jeho vrcholech "src" a "dst". Rozšíření tohoto příkladu, aby šel nad rámec tripletů pomocí složitějších motivů, je jednoduché.

// Select subgraph based on edges "e" of type "follow"
// pointing from a younger user "a" to an older user "b".
val paths = g.find("(a)-[e]->(b)")
  .filter("e.relationship = 'follow'")
  .filter("a.age < b.age")
// "paths" contains vertex info. Extract the edges.
val e2 = paths.select("e.src", "e.dst", "e.relationship")
// In Spark 1.5+, the user may simplify this call:
//  val e2 = paths.select("e.*")

// Construct the subgraph
val g2 = GraphFrame(g.vertices, e2)
display(g2.vertices)
display(g2.edges)

Standardní grafové algoritmy

Tato část popisuje standardní algoritmy grafů integrované do graphFrames.

Prohledávání do šířky (BFS)

Vyhledejte uživatele ve věku < 32 z "Esther".

val paths: DataFrame = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32").run()
display(paths)

Hledání může také omezit filtry okrajů a maximální délku cesty.

val filteredPaths = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32")
  .edgeFilter("relationship != 'friend'")
  .maxPathLength(3)
  .run()
display(filteredPaths)

Připojené komponenty

Určete příslušnost každého vrcholu k připojené komponentě a vraťte graf s každým vrcholem přiřazeným ID komponenty.

val result = g.connectedComponents.run() // doesn't work on Spark 1.4
display(result)

Silně souvislé komponenty

Vypočítá silně propojenou komponentu (SCC) každého vrcholu a vrátí graf s každým vrcholem přiřazeným K SCC obsahujícímu tento vrchol.

val result = g.stronglyConnectedComponents.maxIter(10).run()
display(result.orderBy("component"))

Šíření štítků

Spusťte statický algoritmus šíření popisků pro detekci komunit v sítích.

Každý uzel v síti je zpočátku přiřazený ke své vlastní komunitě. V každém superkroku odesílají uzly svoji příslušnost ke komunitě ke všem sousedům a aktualizují svůj stav na nejčastější příslušnost ke komunitě příchozích zpráv.

LPA je standardní algoritmus detekce komunity pro grafy. Je to výpočetně nenáročné, i když (1) konvergence není zaručená a (2) může se skončit u triviálních řešení (všechny uzly se spojí do jedné komunity).

val result = g.labelPropagation.maxIter(5).run()
display(result.orderBy("label"))

PageRank

Identifikace důležitých vrcholů v grafu na základě připojení

// Run PageRank until convergence to tolerance "tol".
val results = g.pageRank.resetProbability(0.15).tol(0.01).run()
display(results.vertices)
display(results.edges)
// Run PageRank for a fixed number of iterations.
val results2 = g.pageRank.resetProbability(0.15).maxIter(10).run()
display(results2.vertices)
// Run PageRank personalized for vertex "a"
val results3 = g.pageRank.resetProbability(0.15).maxIter(10).sourceId("a").run()
display(results3.vertices)

Nejkratší cesty

Vypočítá nejkratší cesty k dané sadě vrcholů referenčních bodů, které jsou určeny dle ID vrcholu.

val paths = g.shortestPaths.landmarks(Seq("a", "d")).run()
display(paths)

Počítání trojúhelníků

Vypočítá počet trojúhelníků procházejících každým vrcholem.

import org.graphframes.examples
val g: GraphFrame = examples.Graphs.friends  // get example graph

val results = g.triangleCount.run()
results.select("id", "count").show()