Skip to content

Commit b04a5ef

Browse files
committedAug 13, 2017
Spark GraphX social graph analysis
1 parent 78f0f7a commit b04a5ef

File tree

10 files changed

+13189
-0
lines changed

10 files changed

+13189
-0
lines changed
 

‎.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,5 @@
11
*.class
22
*.log
3+
/.idea/
4+
/target/
5+
/project/target/

‎build.sbt

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
name := "spark-course"
2+
3+
version := "1.0"
4+
5+
scalaVersion := "2.11.8"
6+
7+
libraryDependencies ++= Seq(
8+
"org.apache.spark" %% "spark-core" % "2.0.0",
9+
"org.apache.spark" %% "spark-graphx" % "2.0.0"
10+
)

‎project/build.properties

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
sbt.version = 0.13.15

‎project/plugins.sbt

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
logLevel := Level.Warn
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
sbt.internals.DslEntry

‎src/main/resources/UserGraph.tsv

+6,589
Large diffs are not rendered by default.

‎src/main/resources/UserNames.tsv

+6,486
Large diffs are not rendered by default.

‎src/main/scala/graphx/GraphX.scala

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package graphx
2+
3+
import org.apache.spark._
4+
import org.apache.spark.graphx.{Graph, _}
5+
6+
object InputDataFlow {
7+
8+
def parseNames(line: String): Option[(VertexId, String)] = {
9+
val fields = line.split('\t')
10+
if (fields.length > 1)
11+
Some(fields(0).trim().toLong, fields(1))
12+
else None
13+
}
14+
15+
def makeEdges(line: String) : List[Edge[Int]] = {
16+
import scala.collection.mutable.ListBuffer
17+
var edges = new ListBuffer[Edge[Int]]()
18+
val fields = line.split(" ")
19+
val origin = fields(0)
20+
(1 until fields.length)
21+
.foreach { p => edges += Edge(origin.toLong, fields(p).toLong, 0) }
22+
edges.toList
23+
}
24+
25+
}
26+
27+
class GraphX(sc: SparkContext) {
28+
private def verts = sc.textFile(USER_NAMES_FILE).flatMap(InputDataFlow.parseNames)
29+
30+
private def edges = sc.textFile(USER_GRAPH_FILE).flatMap(InputDataFlow.makeEdges)
31+
32+
private def graph = Graph(verts, edges).cache()
33+
34+
35+
def getMostConnectedUsers(amount:Int): Array[(VertexId, (PartitionID, String))] = {
36+
graph.degrees.join(verts)
37+
.sortBy( {case( (_, (userName, _))) => userName }, ascending=false ).take(amount)
38+
}
39+
40+
private def getBfs(root:VertexId) = {
41+
val initialGraph = graph.mapVertices((id, _) => if (id == root) 0.0 else Double.PositiveInfinity)
42+
43+
val bfs = initialGraph.pregel(Double.PositiveInfinity, 10)(
44+
(_, attr, msg) => math.min(attr, msg),
45+
triplet => {
46+
if (triplet.srcAttr != Double.PositiveInfinity) {
47+
Iterator((triplet.dstId, triplet.srcAttr+1))
48+
} else {
49+
Iterator.empty
50+
}
51+
},
52+
(a,b) => math.min(a,b)).cache()
53+
bfs
54+
}
55+
56+
def degreeOfSeparationSingleUser(root:VertexId) = {
57+
getBfs(root).vertices.join(verts).take(100)
58+
}
59+
60+
def degreeOfSeparationTwoUser(firstUser:VertexId, secondUser:VertexId ) = {
61+
getBfs(firstUser)
62+
.vertices
63+
.filter{case (vertexId, _) => vertexId == secondUser}
64+
.collect
65+
}
66+
}
+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package graphx
2+
3+
import org.apache.log4j.{Level, Logger}
4+
import org.apache.spark.SparkContext
5+
6+
object GraphXDemo {
7+
8+
def main(args: Array[String]): Unit = {
9+
Logger.getLogger("org").setLevel(Level.ERROR)
10+
val sc = new SparkContext("local[*]", "GraphX")
11+
12+
val graph = new GraphX(sc)
13+
14+
println("\nTop 10 most-connected users:")
15+
graph.getMostConnectedUsers(10) foreach println
16+
17+
println("\nComputing degrees of separation for user Arch")
18+
graph.degreeOfSeparationSingleUser(5306) foreach println
19+
20+
println("\nComputing degrees of separation for user Arch and Fred")
21+
graph.degreeOfSeparationTwoUser(5306, 14) foreach println
22+
}
23+
}

‎src/main/scala/graphx/package.scala

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package object graphx {
2+
3+
val USER_NAMES_FILE_NAME = "/UserNames.tsv"
4+
val USER_GRAPH_FILE_NAME = "/UserGraph.tsv"
5+
6+
val USER_NAMES_FILE: String = getClass.getResource(USER_NAMES_FILE_NAME).getPath
7+
val USER_GRAPH_FILE: String = getClass.getResource(USER_GRAPH_FILE_NAME).getPath
8+
9+
}

0 commit comments

Comments
 (0)
Please sign in to comment.