Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
В этой статье приведены примеры из руководства пользователя GraphFrames .
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._
Создание граффреймов
GraphFrames можно создавать из вершин и пограничных кадров данных.
- Кадр данных вершин. Кадр вершин должен содержать специальный столбец с именем
idкоторый задает уникальные идентификаторы для каждой вершины в графе. - Графовые кадры данных: кадр данных для ребер должен содержать два специальных столбца:
src(исходный идентификатор вершины ребра) иdst(идентификатор конечной вершины ребра).
Оба DataFrame-ы могут иметь произвольные дополнительные столбцы. Эти столбцы могут представлять атрибуты вершин и ребер.
Создание вершин и ребер
// Vertex DataFrame
val v = spark.createDataFrame(List(
("a", "Alice", 34),
("b", "Bob", 36),
("c", "Charlie", 30),
("d", "David", 29),
("e", "Esther", 32),
("f", "Fanny", 36),
("g", "Gabby", 60)
)).toDF("id", "name", "age")
// Edge DataFrame
val e = spark.createDataFrame(List(
("a", "b", "friend"),
("b", "c", "follow"),
("c", "b", "follow"),
("f", "c", "follow"),
("e", "f", "follow"),
("e", "d", "friend"),
("d", "a", "friend"),
("a", "e", "friend")
)).toDF("src", "dst", "relationship")
Давайте создадим граф из этих вершин и этих ребер:
val g = GraphFrame(v, e)
// This example graph also comes with the GraphFrames package.
// val g = examples.Graphs.friends
Базовые запросы графа и кадра данных
GraphFrames предоставляют простые запросы графа, такие как степень узла.
Кроме того, так как Граффреймы представляют графы как пары вершин и пограничных кадров данных, легко создавать мощные запросы непосредственно на вершинах и пограничных кадрах данных. Эти DataFrame доступны как вершины и рёбра полей в GraphFrame.
display(g.vertices)
display(g.edges)
Входящие степени вершин:
display(g.inDegrees)
Исходящая степень вершин:
display(g.outDegrees)
Степень вершин:
display(g.degrees)
Запросы можно выполнять непосредственно на вершинах DataFrame. Например, можно найти возраст самого молодого человека в графе:
val youngest = g.vertices.groupBy().min("age")
display(youngest)
Аналогичным образом можно выполнять запросы на пограничных кадрах данных. Например, давайте подсчитаем количество связей "следования" в графе.
val numFollows = g.edges.filter("relationship = 'follow'").count()
Поиск мотивов
Создавайте более сложные связи с ребрами и вершинами с помощью мотивов. Следующая ячейка находит пары вершин с краями в обоих направлениях между ними. Результатом является DataFrame, в котором имена столбцов являются ключами мотивов.
Дополнительные сведения об API можно найти в руководстве пользователя GraphFrame .
// Search for pairs of vertices with edges in both directions between them.
val motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
display(motifs)
Поскольку результатом является DataFrame, вы можете строить более сложные запросы на основе данного шаблона. Давайте найдем все взаимные отношения, в которых один человек старше 30:
val filtered = motifs.filter("b.age > 30")
display(filtered)
Запросы с отслеживанием состояния
Большинство запросов мотивов не имеют состояния и их просто выразить, как в приведенных выше примерах. В следующих примерах показаны более сложные запросы, которые перемещают состояние вдоль пути внутри мотива. Разместите эти запросы, сочетая поиск мотивов GraphFrame с фильтрами в результате, где фильтры используют операции последовательности для создания ряда столбцов Кадра данных.
Например, предположим, что необходимо определить цепочку 4 вершин с некоторыми свойствами, определенными последовательностью функций. То есть среди цепей из 4 вершин a->b->c->d, определите подмножество цепей, соответствующих этому сложному фильтру:
- Инициализация состояния по пути.
- Обновление состояния на основе вершины a.
- Обновление состояния на основе вершины b.
- И т. д. для c и d.
- Если окончательное состояние соответствует определенному условию, фильтр принимает цепочку.
Приведенные ниже фрагменты кода демонстрируют этот процесс, где мы определяем цепочки из 4 вершин так, что по крайней мере 2 из 3 ребер являются «дружественными» отношениями. В этом примере состояние — текущее число ребер "друг"; Как правило, это может быть любой столбец DataFrame.
// Find chains of 4 vertices.
val chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")
// Query on sequence, with state (cnt)
// (a) Define method for updating state given the next element of the motif.
def sumFriends(cnt: Column, relationship: Column): Column = {
when(relationship === "friend", cnt + 1).otherwise(cnt)
}
// (b) Use sequence operation to apply method to sequence of elements in motif.
// In this case, the elements are the 3 edges.
val condition = Seq("ab", "bc", "cd").
foldLeft(lit(0))((cnt, e) => sumFriends(cnt, col(e)("relationship")))
// (c) Apply filter to DataFrame.
val chainWith2Friends2 = chain4.where(condition >= 2)
display(chainWith2Friends2)
Подграфы
GraphFrames предоставляет API-интерфейсы для создания подграфов путем фильтрации по краям и вершинам. Эти фильтры могут создаваться вместе. Например, следующий подграф содержит только людей, которые являются друзьями и которые старше 30 лет.
// Select subgraph of users older than 30, and edges of type "friend"
val g2 = g
.filterEdges("relationship = 'friend'")
.filterVertices("age > 30")
.dropIsolatedVertices()
Сложные тройные фильтры
В следующем примере показано, как выбрать подграф на основе тройных фильтров, работающих на краю и вершинах src и dst. Расширение этого примера с помощью более сложных мотивов для выхода за рамки троек просто.
// Select subgraph based on edges "e" of type "follow"
// pointing from a younger user "a" to an older user "b".
val paths = g.find("(a)-[e]->(b)")
.filter("e.relationship = 'follow'")
.filter("a.age < b.age")
// "paths" contains vertex info. Extract the edges.
val e2 = paths.select("e.src", "e.dst", "e.relationship")
// In Spark 1.5+, the user may simplify this call:
// val e2 = paths.select("e.*")
// Construct the subgraph
val g2 = GraphFrame(g.vertices, e2)
display(g2.vertices)
display(g2.edges)
Стандартные алгоритмы графа
В этом разделе описываются стандартные алгоритмы графов, встроенные в GraphFrames.
Поиск по ширине (BFS)
Выполните поиск по запросу "Эстер" для пользователей < 32 лет.
val paths: DataFrame = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32").run()
display(paths)
Поиск также может ограничить пограничные фильтры и максимальную длину пути.
val filteredPaths = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32")
.edgeFilter("relationship != 'friend'")
.maxPathLength(3)
.run()
display(filteredPaths)
Подключенные компоненты
Вычислить принадлежность каждой вершины к связанной компоненте и вернуть граф с каждой вершиной, назначенной идентификатор компонента.
val result = g.connectedComponents.run() // doesn't work on Spark 1.4
display(result)
Сильно связанные компоненты
Вычислить сильно связанную компоненту (SCC) каждой вершины и вернуть граф, где каждая вершина назначена SCC, содержащей эту вершину.
val result = g.stronglyConnectedComponents.maxIter(10).run()
display(result.orderBy("component"))
Распространение меток
Запустите алгоритм распространения статических меток для обнаружения сообществ в сетях.
Каждый узел в сети изначально назначается собственному сообществу. На каждом супершаге узлы отправляют свою принадлежность к сообществу всем соседям и обновляют свое состояние в соответствии с наиболее частой принадлежностью к сообществу среди входящих сообщений.
LPA — это стандартный алгоритм обнаружения сообщества для графов. Это недорогое вычисление, хотя (1) конвергенция не гарантируется и (2) в конечном итоге может быть тривиальными решениями (все узлы определяются в одном сообществе).
val result = g.labelPropagation.maxIter(5).run()
display(result.orderBy("label"))
Ранг страницы
Определите важные вершины в графе на основе соединений.
// Run PageRank until convergence to tolerance "tol".
val results = g.pageRank.resetProbability(0.15).tol(0.01).run()
display(results.vertices)
display(results.edges)
// Run PageRank for a fixed number of iterations.
val results2 = g.pageRank.resetProbability(0.15).maxIter(10).run()
display(results2.vertices)
// Run PageRank personalized for vertex "a"
val results3 = g.pageRank.resetProbability(0.15).maxIter(10).sourceId("a").run()
display(results3.vertices)
Кратчайшие пути
Вычисляет кратчайшие пути к заданному набору опорных вершин, где ориентиры указываются по идентификатору вершин.
val paths = g.shortestPaths.landmarks(Seq("a", "d")).run()
display(paths)
Подсчет треугольников
Вычисляет количество треугольников, проходящих через каждую вершину.
import org.graphframes.examples
val g: GraphFrame = examples.Graphs.friends // get example graph
val results = g.triangleCount.run()
results.select("id", "count").show()