Guide d’utilisation de GraphFrames - Scala

Cet article présente des exemples du Guide d’utilisation de GraphFrames.

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._

Création de GraphFrames

Vous pouvez créer des GraphFrames à partir de DataFrames de sommet et d’arête.

  • DataFrame de sommet : un DataFrame de sommet doit contenir une colonne spéciale nommée id qui spécifie des ID uniques pour chaque sommet du graphe.
  • DataFrame d’arête : un DataFrame d’arête doit contenir deux colonnes spéciales : src (ID d’arête de sommet source) et dst (ID d’arête de sommet de destination).

Les deux DataFrames peuvent avoir d’autres colonnes arbitraires. Ces colonnes peuvent représenter des attributs de sommet et d’arête.

Création des sommets et de arêtes

// Vertex DataFrame
val v = spark.createDataFrame(List(
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60)
)).toDF("id", "name", "age")
// Edge DataFrame
val e = spark.createDataFrame(List(
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend")
)).toDF("src", "dst", "relationship")

Nous allons créer un graphe à partir des sommets et arêtes suivants :

val g = GraphFrame(v, e)
// This example graph also comes with the GraphFrames package.
// val g = examples.Graphs.friends

Requêtes de graphe et de DataFrame de base

Les GraphFrames fournissent des requêtes de graphe simples, telles que le degré d’un nœud.

En outre, étant donné que les GraphFrames représentent des graphes en tant que paires de DataFrames de sommet et d’arête, il est facile de créer des requêtes puissantes directement sur les DataFrames de sommet et d’arête. Ces DataFrames sont disponibles sous forme de champs de sommets et d’arêtes dans le GraphFrame.

display(g.vertices)
display(g.edges)

Degré entrant des sommets :

display(g.inDegrees)

Degré sortant des sommets :

display(g.outDegrees)

Degré des sommets :

display(g.degrees)

Vous pouvez exécuter des requêtes directement sur le DataFrame de sommets. Par exemple, nous pouvons trouver l’âge de la personne la plus jeune dans le graphe :

val youngest = g.vertices.groupBy().min("age")
display(youngest)

De même, vous pouvez exécuter des requêtes sur le DataFrame d’arêtes. Par exemple, nous comptons le nombre de relations « follow » dans le graphe :

val numFollows = g.edges.filter("relationship = 'follow'").count()

Recherche de motifs

Créez des relations plus complexes impliquant des arêtes et des sommets en utilisant des motifs. La cellule suivante recherche les paires de sommets avec des arêtes dans les deux directions entre elles. Le résultat est un DataFrame, dans lequel les noms de colonne sont des clés de motif.

Pour plus d’informations sur l’API, consultez le Guide d’utilisation des GraphFrames.

// Search for pairs of vertices with edges in both directions between them.
val motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
display(motifs)

Étant donné que le résultat est un DataFrame, vous pouvez générer des requêtes plus complexes à partir du motif. Trouvons toutes les relations réciproques dans lesquelles une personne est âgée de plus de 30 ans :

val filtered = motifs.filter("b.age > 30")
display(filtered)

Requêtes avec état

La plupart des requêtes de motifs sont sans état et simples à exprimer, comme dans les exemples ci-dessus. Les exemples suivants illustrent des requêtes plus complexes qui transmettent l’état le long d’un chemin dans le motif. Exprimez ces requêtes en combinant la recherche de motifs de GraphFrame et des filtres sur le résultat, où les filtres utilisent des opérations de séquence pour construire une série de colonnes de DataFrame.

Supposons, par exemple, que vous souhaitiez identifier une chaîne de 4 sommets avec une propriété définie par une séquence de fonctions. Autrement dit, parmi les chaînes de 4 sommets a->b->c->d, identifiez le sous-ensemble de chaînes correspondant à ce filtre complexe :

  • Initialisez l’état sur le chemin.
  • Mettez à jour l’état en fonction du sommet a.
  • Mettez à jour l’état en fonction du sommet b.
  • Même chose pour c et d.
  • Si l’état final correspond à une condition, le filtre accepte la chaîne.

Les extraits de code suivants illustrent ce processus, où nous identifions les chaînes de 4 sommets de telle sorte qu’au moins 2 des 3 arêtes soient des relations « friend ». Dans cet exemple, l’état est le nombre actuel d’arêtes « friend ». En général, il peut s’agir de n’importe quelle colonne de DataFrame.

// Find chains of 4 vertices.
val chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")

// Query on sequence, with state (cnt)
//  (a) Define method for updating state given the next element of the motif.
def sumFriends(cnt: Column, relationship: Column): Column = {
  when(relationship === "friend", cnt + 1).otherwise(cnt)
}
//  (b) Use sequence operation to apply method to sequence of elements in motif.
//      In this case, the elements are the 3 edges.
val condition = Seq("ab", "bc", "cd").
  foldLeft(lit(0))((cnt, e) => sumFriends(cnt, col(e)("relationship")))
//  (c) Apply filter to DataFrame.
val chainWith2Friends2 = chain4.where(condition >= 2)
display(chainWith2Friends2)

Sous-graphes

GraphFrames fournit des API pour générer des sous-graphes en filtrant sur les arêtes et les sommets. Ces filtres peuvent être combinés. Par exemple, le sous-graphe suivant contient uniquement les personnes qui sont des amis et qui ont plus de 30 ans.

// Select subgraph of users older than 30, and edges of type "friend"
val g2 = g
  .filterEdges("relationship = 'friend'")
  .filterVertices("age > 30")
  .dropIsolatedVertices()

Filtres de triplet complexes

L’exemple suivant montre comment sélectionner un sous-graphe basé sur des filtres de triplet qui fonctionnent sur une arête et ses sommets « src » et « dst ». L’extension de cet exemple pour aller au-delà des triplets en utilisant des motifs plus complexes est simple.

// Select subgraph based on edges "e" of type "follow"
// pointing from a younger user "a" to an older user "b".
val paths = g.find("(a)-[e]->(b)")
  .filter("e.relationship = 'follow'")
  .filter("a.age < b.age")
// "paths" contains vertex info. Extract the edges.
val e2 = paths.select("e.src", "e.dst", "e.relationship")
// In Spark 1.5+, the user may simplify this call:
//  val e2 = paths.select("e.*")

// Construct the subgraph
val g2 = GraphFrame(g.vertices, e2)
display(g2.vertices)
display(g2.edges)

Algorithmes de graphe standard

Cette section décrit les algorithmes de graphe standard intégrés à GraphFrames.

Recherche à largeur prioritaire (BFS)

Recherchez à partir de « Esther » les utilisateurs dont l’âge < 32.

val paths: DataFrame = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32").run()
display(paths)

La recherche peut également limiter les filtres d’arête et les longueurs de chemin maximales.

val filteredPaths = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32")
  .edgeFilter("relationship != 'friend'")
  .maxPathLength(3)
  .run()
display(filteredPaths)

Composants connectés

Calculez l’appartenance à un composant connecté de chaque sommet et retournez un graphe dont chaque sommet se voit affecter un ID de composant.

val result = g.connectedComponents.run() // doesn't work on Spark 1.4
display(result)

Composants fortement connectés

Calculez le composant fortement connecté (SCC) de chaque sommet et retournez un graphe dont chaque sommet est affecté au SCC contenant ce sommet.

val result = g.stronglyConnectedComponents.maxIter(10).run()
display(result.orderBy("component"))

Propagation des étiquettes

Exécutez l’algorithme de propagation des étiquettes statiques pour détecter les communautés dans les réseaux.

Chaque nœud du réseau est initialement attribué à sa propre communauté. À chaque super-étape, les nœuds envoient leur affiliation de communauté à tous les voisins et définissent leur état sur l’affiliation de communauté de mode des messages entrants.

L’algorithme de propagation des étiquettes est un algorithme de détection de communauté standard pour les graphes. Il n’est pas coûteux en calcul, même si (1) la convergence n’est pas garantie et (2) et qu’il peut aboutir à des solutions triviales (identification de tous les nœuds dans une seule communauté).

val result = g.labelPropagation.maxIter(5).run()
display(result.orderBy("label"))

PageRank

Identifiez les sommets importants d’un graphe en fonction des connexions.

// Run PageRank until convergence to tolerance "tol".
val results = g.pageRank.resetProbability(0.15).tol(0.01).run()
display(results.vertices)
display(results.edges)
// Run PageRank for a fixed number of iterations.
val results2 = g.pageRank.resetProbability(0.15).maxIter(10).run()
display(results2.vertices)
// Run PageRank personalized for vertex "a"
val results3 = g.pageRank.resetProbability(0.15).maxIter(10).sourceId("a").run()
display(results3.vertices)

Chemins les plus courts

Calcule les chemins les plus courts vers l’ensemble donné de sommets de points de repère, où les points de repère sont spécifiés par ID de sommet.

val paths = g.shortestPaths.landmarks(Seq("a", "d")).run()
display(paths)

Comptage des triangles

Calcule le nombre de triangles passant par chaque sommet.

import org.graphframes.examples
val g: GraphFrame = examples.Graphs.friends  // get example graph

val results = g.triangleCount.run()
results.select("id", "count").show()