Compartir a través de


Guía del usuario de GraphFrames: Scala

En este artículo se muestran ejemplos de la Guía de usuario de GraphFrames.

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._

Creación de graphFrames

Puede crear GraphFrames a partir de DataFrames de vértices y aristas.

  • DataFrame de vértice: un dataframe de vértice debe contener una columna especial denominada id que especifica identificadores únicos para cada vértice del gráfico.
  • Trama de datos perimetral: un dataframe perimetral debe contener dos columnas especiales: src (id. de vértice de origen del borde) y dst (id. de vértice de destino del borde).

Ambos DataFrames pueden tener otras columnas arbitrarias. Esas columnas pueden representar atributos de vértices y bordes.

Creación de los vértices y bordes

// Vertex DataFrame
val v = spark.createDataFrame(List(
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60)
)).toDF("id", "name", "age")
// Edge DataFrame
val e = spark.createDataFrame(List(
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend")
)).toDF("src", "dst", "relationship")

Vamos a crear un grafo a partir de estos vértices y estos bordes:

val g = GraphFrame(v, e)
// This example graph also comes with the GraphFrames package.
// val g = examples.Graphs.friends

Consultas básicas de grafos y tramas de datos

GraphFrames proporciona consultas de grafos simples, como el grado de nodo.

Además, dado que GraphFrames representa gráficos como pares de vértices y dataframes perimetrales, es fácil crear consultas eficaces directamente en los dataframes de vértices y bordes. Los DataFrames están disponibles como campos de vértices y bordes en el GraphFrame.

display(g.vertices)
display(g.edges)

Grado entrante de los vértices:

display(g.inDegrees)

Grado de salida de los vértices:

display(g.outDegrees)

Grado de los vértices:

display(g.degrees)

Puede ejecutar consultas directamente en el DataFrame de vértices. Por ejemplo, podemos encontrar la edad de la persona más joven en el gráfico:

val youngest = g.vertices.groupBy().min("age")
display(youngest)

Del mismo modo, puede ejecutar consultas en el dataframe perimetral. Por ejemplo, vamos a contar el número de relaciones "follow" en el gráfico:

val numFollows = g.edges.filter("relationship = 'follow'").count()

Búsqueda de motivos

Crear relaciones más complejas que impliquen bordes y vértices mediante motivos. La celda siguiente busca los pares de vértices con bordes en ambas direcciones entre ellos. El resultado es un DataFrame, en el que los nombres de columna son claves de motivo.

Consulte la Guía de usuario de GraphFrame para obtener más detalles sobre la API.

// Search for pairs of vertices with edges in both directions between them.
val motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
display(motifs)

Dado que el resultado es un DataFrame, puede crear consultas más complejas sobre el motivo. Vamos a encontrar todas las relaciones recíprocas en las que una persona tiene más de 30 años:

val filtered = motifs.filter("b.age > 30")
display(filtered)

Consultas con estado

La mayoría de las consultas de motivos son independientes y sencillas de expresar, como en los ejemplos anteriores. En los ejemplos siguientes se muestran consultas más complejas que llevan la información a lo largo de un camino en el patrón. Exprese estas consultas mediante la combinación de la búsqueda de motivos de GraphFrame con filtros aplicados al resultado, donde los filtros utilizan operaciones secuenciales para construir una serie de columnas de DataFrame.

Por ejemplo, supongamos que desea identificar una cadena de 4 vértices con alguna propiedad definida por una secuencia de funciones. Es decir, entre las cadenas de 4 vértices a->b->c->d, identifique el subconjunto de cadenas que coinciden con este filtro complejo:

  • Inicialice el estado en la ruta de acceso.
  • Actualice el estado en función del vértice a.
  • Actualice el estado en función del vértice b.
  • Etc. para c y d.
  • Si el estado final coincide con alguna condición, el filtro acepta la cadena.

Los siguientes fragmentos de código muestran este proceso, donde identificamos cadenas de 4 vértices de modo que al menos 2 de los 3 bordes son relaciones "amigas". En este ejemplo, el estado es el recuento actual de bordes "friend"; en general, podría ser cualquier columna DataFrame.

// Find chains of 4 vertices.
val chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")

// Query on sequence, with state (cnt)
//  (a) Define method for updating state given the next element of the motif.
def sumFriends(cnt: Column, relationship: Column): Column = {
  when(relationship === "friend", cnt + 1).otherwise(cnt)
}
//  (b) Use sequence operation to apply method to sequence of elements in motif.
//      In this case, the elements are the 3 edges.
val condition = Seq("ab", "bc", "cd").
  foldLeft(lit(0))((cnt, e) => sumFriends(cnt, col(e)("relationship")))
//  (c) Apply filter to DataFrame.
val chainWith2Friends2 = chain4.where(condition >= 2)
display(chainWith2Friends2)

Subgráficos

GraphFrames proporciona API para crear subgráficos mediante el filtrado de bordes y vértices. Estos filtros se pueden componer juntos. Por ejemplo, el siguiente subgráfico contiene solo personas que son amigos y que tienen más de 30 años de edad.

// Select subgraph of users older than 30, and edges of type "friend"
val g2 = g
  .filterEdges("relationship = 'friend'")
  .filterVertices("age > 30")
  .dropIsolatedVertices()

Filtros triples complejos

En el ejemplo siguiente se muestra cómo seleccionar un subgráfico basado en filtros triples que operan en un borde y sus vértices "src" y "dst". Extender este ejemplo para ir más allá de los tripletes mediante motivos más complejos es simple.

// Select subgraph based on edges "e" of type "follow"
// pointing from a younger user "a" to an older user "b".
val paths = g.find("(a)-[e]->(b)")
  .filter("e.relationship = 'follow'")
  .filter("a.age < b.age")
// "paths" contains vertex info. Extract the edges.
val e2 = paths.select("e.src", "e.dst", "e.relationship")
// In Spark 1.5+, the user may simplify this call:
//  val e2 = paths.select("e.*")

// Construct the subgraph
val g2 = GraphFrame(g.vertices, e2)
display(g2.vertices)
display(g2.edges)

Algoritmos de grafos estándar

En esta sección se describen los algoritmos de grafos estándar integrados en GraphFrames.

Búsqueda por amplitud (BFS)

Busque "Esther" para usuarios menores de 32 años <.

val paths: DataFrame = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32").run()
display(paths)

La búsqueda también puede limitar los filtros perimetrales y las longitudes máximas de trayectorias.

val filteredPaths = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32")
  .edgeFilter("relationship != 'friend'")
  .maxPathLength(3)
  .run()
display(filteredPaths)

Componentes conectados

Calcule la pertenencia a componentes conectados de cada vértice y devuelva un grafo con cada vértice asignado a un identificador de componente.

val result = g.connectedComponents.run() // doesn't work on Spark 1.4
display(result)

Componentes fuertemente conectados

Calcule el componente fuertemente conectado (SCC) de cada vértice y devuelva un gráfico con cada vértice asignado al SCC que contiene ese vértice.

val result = g.stronglyConnectedComponents.maxIter(10).run()
display(result.orderBy("component"))

Propagación de etiquetas

Ejecute el algoritmo estático de propagación de etiquetas para detectar comunidades en redes.

Cada nodo de la red se asigna inicialmente a su propia comunidad. En cada superpaso, los nodos envían su afiliación de la comunidad a todos los vecinos y actualizan su estado a la afiliación de la comunidad en modo de mensajes entrantes.

LPA es un algoritmo de detección de comunidad estándar para gráficos. Es barato computacionalmente, aunque (1) la convergencia no está garantizada y (2) uno puede acabar con soluciones triviales (todos los nodos identifican en una sola comunidad).

val result = g.labelPropagation.maxIter(5).run()
display(result.orderBy("label"))

PageRank

Identificar vértices importantes en un grafo en función de las conexiones.

// Run PageRank until convergence to tolerance "tol".
val results = g.pageRank.resetProbability(0.15).tol(0.01).run()
display(results.vertices)
display(results.edges)
// Run PageRank for a fixed number of iterations.
val results2 = g.pageRank.resetProbability(0.15).maxIter(10).run()
display(results2.vertices)
// Run PageRank personalized for vertex "a"
val results3 = g.pageRank.resetProbability(0.15).maxIter(10).sourceId("a").run()
display(results3.vertices)

Rutas más cortas

Calcula las rutas más cortas al conjunto dado de vértices de referencia, donde los puntos de referencia se especifican por el ID del vértice.

val paths = g.shortestPaths.landmarks(Seq("a", "d")).run()
display(paths)

Recuento de triángulos

Calcula el número de triángulos que pasan por cada vértice.

import org.graphframes.examples
val g: GraphFrame = examples.Graphs.friends  // get example graph

val results = g.triangleCount.run()
results.select("id", "count").show()