-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgraph.py
54 lines (40 loc) · 2.04 KB
/
graph.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row
from pyspark.sql.functions import *
from datetime import *
from dateutil import parser as dt
def total_seconds(timedelta):
return (timedelta.microseconds + 0.0 +
(timedelta.seconds + timedelta.days * 24 * 3600) * 10 ** 6) / 10 ** 6
def getWeight(source, dest, retweet_count):
diff = dt.parse(source) - dt.parse(dest)
return str(retweet_count / total_seconds(diff))
if __name__ == "__main__":
sc = SparkContext(appName="MDISN")
sqlContext = SQLContext(sc)
data = sqlContext.read.json("s3n://mdisn/data/paris0.txt")
# Variables used for formatting output
nodeLeftDelim = '\tnode\n\t[\n\t\t'
nodeRightDelim = '"\n\t]'
edgeLeftDelim = '\tedge\n\t[\n\t\t'
edgeRightDelim = '\n\t]'
header = sc.parallelize(['graph\n['])
footer = sc.parallelize([']'])
# Get users/edges from data
users = data.filter(data.retweeted_status.retweet_count > 1000).select(
data.user.screen_name.alias('user'),
data.retweeted_status.user.screen_name.alias('other_user'),
data.created_at.alias('source_time'),
data.retweeted_status.created_at.alias('dest_time'),
data.retweeted_status.retweet_count.alias('retweet_count')
).dropna()
edges = users.map(lambda x: edgeLeftDelim + 'source ' + str(x.user) +
'\n\t\ttarget ' + str(x.other_user) + edgeRightDelim)
nodes = users.map(lambda x: nodeLeftDelim + 'id ' + str(x.other_user) +
'\n\t\tlabel "' + str(x.other_user) + nodeRightDelim) +\
users.map(lambda x: nodeLeftDelim + 'id ' + str(x.user) +
'\n\t\tlabel "' + str(x.user) + nodeRightDelim)
# Output data in Gephi format
output = header + nodes + edges + footer
output.coalesce(1).saveAsTextFile('s3n://mdisn/output/twitter-graph' +
datetime.now().strftime('%s') + '.gml')