本文示範 GraphFrames 使用者指南中的範例。
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._
建立 GraphFrames
您可以從頂點和邊緣資料框建立 GraphFrame。
- 頂點數據框架:頂點 DataFrame 應該包含名為
id
的特殊數據行,指定圖形中每個頂點的唯一標識符。 - Edge DataFrame:邊資料框應包含兩個特殊列:
src
(邊的來源頂點標識符)和dst
(邊的目的地頂點標識符)。
這兩個 DataFrame 都可以有任意的其他數據行。 這些數據行可以代表頂點和邊緣屬性。
建立頂點和邊緣
// Vertex DataFrame
val v = spark.createDataFrame(List(
("a", "Alice", 34),
("b", "Bob", 36),
("c", "Charlie", 30),
("d", "David", 29),
("e", "Esther", 32),
("f", "Fanny", 36),
("g", "Gabby", 60)
)).toDF("id", "name", "age")
// Edge DataFrame
val e = spark.createDataFrame(List(
("a", "b", "friend"),
("b", "c", "follow"),
("c", "b", "follow"),
("f", "c", "follow"),
("e", "f", "follow"),
("e", "d", "friend"),
("d", "a", "friend"),
("a", "e", "friend")
)).toDF("src", "dst", "relationship")
讓我們從這些頂點和這些邊緣建立圖形:
val g = GraphFrame(v, e)
// This example graph also comes with the GraphFrames package.
// val g = examples.Graphs.friends
基本圖形和數據框架查詢
GraphFrame 提供簡單的圖形查詢,例如節點程度。
此外,由於 GraphFrames 將圖形表示為頂點和邊緣 DataFrame 配對,因此可以輕鬆地直接在頂點和邊緣 DataFrame 上進行功能強大的查詢。 這些 DataFrame 可用為 GraphFrame 中的頂點和邊緣欄位。
display(g.vertices)
display(g.edges)
頂點的傳入程度:
display(g.inDegrees)
頂點的外向度:
display(g.outDegrees)
頂點的程度:
display(g.degrees)
您可以直接在頂點 DataFrame 上執行查詢。 例如,我們可以在圖表中找到最年輕的年齡:
val youngest = g.vertices.groupBy().min("age")
display(youngest)
同樣地,您可以在邊緣 DataFrame 上執行查詢。 例如,讓我們計算圖表中的「追蹤」關聯性數目:
val numFollows = g.edges.filter("relationship = 'follow'").count()
尋找主題
使用圖案建立涉及邊緣和頂點的更複雜的關聯性。 下列儲存格會尋找具有雙向邊的頂點配對。 結果是 DataFrame,其中的欄名稱是模板鍵。
如需 API 的詳細資訊,請參閱 GraphFrame 使用者指南。
// Search for pairs of vertices with edges in both directions between them.
val motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
display(motifs)
由於結果是 DataFrame,因此您可以在圖案之上建置更複雜的查詢。 讓我們找出所有其中一人年齡超過 30 歲的相互關係:
val filtered = motifs.filter("b.age > 30")
display(filtered)
具狀態查詢
大部分的圖案查詢都是無狀態且容易表達的,如上述範例所示。 下一個範例示範更複雜的查詢,這些查詢會沿著圖案中的路徑執行狀態。 結合 GraphFrame 圖案尋找與結果上的篩選來表達這些查詢,其中篩選條件會使用時序作業來建構一系列的 DataFrame 數據行。
例如,假設您想要使用函式序列所定義的一些屬性來識別 4 個頂點的鏈結。 也就是說,在 4 個頂點 a->b->c->d
的鏈結中,識別符合此複雜篩選條件的鏈結子集:
- 初始化路徑上的狀態。
- 根據頂點 a 的狀態更新。
- 根據節點 b 更新狀態。
- 等等適用於 c 和 d。
- 如果最終狀態符合某些條件,則濾波器會接受該鏈。
下列程式碼範例展示了這個過程,我們識別出由 4 個頂點組成的鏈結,其中特定 3 條邊中至少有 2 條是「朋友」關係。 在此範例中,狀態是目前 「friend」 邊緣的計數;一般而言,它可以是任何 DataFrame 數據行。
// Find chains of 4 vertices.
val chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")
// Query on sequence, with state (cnt)
// (a) Define method for updating state given the next element of the motif.
def sumFriends(cnt: Column, relationship: Column): Column = {
when(relationship === "friend", cnt + 1).otherwise(cnt)
}
// (b) Use sequence operation to apply method to sequence of elements in motif.
// In this case, the elements are the 3 edges.
val condition = Seq("ab", "bc", "cd").
foldLeft(lit(0))((cnt, e) => sumFriends(cnt, col(e)("relationship")))
// (c) Apply filter to DataFrame.
val chainWith2Friends2 = chain4.where(condition >= 2)
display(chainWith2Friends2)
子圖形
GraphFrames 藉由篩選邊緣和頂點,提供建置子圖形的 API。 這些篩選可以組合在一起。 例如,下列子數據報只包含朋友且年齡超過 30 歲的人。
// Select subgraph of users older than 30, and edges of type "friend"
val g2 = g
.filterEdges("relationship = 'friend'")
.filterVertices("age > 30")
.dropIsolatedVertices()
複雜的三重篩選
下列範例示範如何根據在邊緣及其 「src」 和 「dst」 頂點上操作的三重篩選來選取子圖。 使用更複雜的樣式來擴展此範例以超越三元組是很簡單的。
// Select subgraph based on edges "e" of type "follow"
// pointing from a younger user "a" to an older user "b".
val paths = g.find("(a)-[e]->(b)")
.filter("e.relationship = 'follow'")
.filter("a.age < b.age")
// "paths" contains vertex info. Extract the edges.
val e2 = paths.select("e.src", "e.dst", "e.relationship")
// In Spark 1.5+, the user may simplify this call:
// val e2 = paths.select("e.*")
// Construct the subgraph
val g2 = GraphFrame(g.vertices, e2)
display(g2.vertices)
display(g2.edges)
標準圖演算法
本節說明 GraphFrames 內建的標準圖形演算法。
廣度優先搜尋 (BFS)
從 「Esther」 搜尋 32 歲 < 的使用者。
val paths: DataFrame = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32").run()
display(paths)
搜尋也可能限制邊緣篩選和路徑長度上限。
val filteredPaths = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32")
.edgeFilter("relationship != 'friend'")
.maxPathLength(3)
.run()
display(filteredPaths)
已連線的元件
計算每個頂點的聯機組件成員資格,並傳回每個頂點指派元件標識碼的圖形。
val result = g.connectedComponents.run() // doesn't work on Spark 1.4
display(result)
強聯機的元件
計算每個頂點的強連接元件(SCC),並傳回一個圖,將每個頂點指派到包含該頂點的 SCC。
val result = g.stronglyConnectedComponents.maxIter(10).run()
display(result.orderBy("component"))
標籤傳播
執行靜態標籤傳播演算法,以偵測網路中的社群。
網路中的每個節點一開始都會指派給自己的社群。 在每個超級步驟中,節點都會將其社群隸屬關係傳送給所有鄰居,並將其狀態更新為傳入訊息中最常出現的社群隸屬關係。
LPA 是圖表的標準社群偵測算法。 雖然(1)收斂性不保證,而且(2)可能會得到簡單的解(所有節點都識別為單一社群),但計算成本低廉。
val result = g.labelPropagation.maxIter(5).run()
display(result.orderBy("label"))
PageRank(網頁排名算法)
根據連結識別圖中的重要頂點。
// Run PageRank until convergence to tolerance "tol".
val results = g.pageRank.resetProbability(0.15).tol(0.01).run()
display(results.vertices)
display(results.edges)
// Run PageRank for a fixed number of iterations.
val results2 = g.pageRank.resetProbability(0.15).maxIter(10).run()
display(results2.vertices)
// Run PageRank personalized for vertex "a"
val results3 = g.pageRank.resetProbability(0.15).maxIter(10).sourceId("a").run()
display(results3.vertices)
最短路徑
計算到指定一組地標頂點的最短路徑,其中地標是通過頂點 ID 指定的。
val paths = g.shortestPaths.landmarks(Seq("a", "d")).run()
display(paths)
三角形計數
計算通過每個頂點的三角形數目。
import org.graphframes.examples
val g: GraphFrame = examples.Graphs.friends // get example graph
val results = g.triangleCount.run()
results.select("id", "count").show()