forked from qntfy/trawler
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtrawler_kafka.py
80 lines (65 loc) · 2.37 KB
/
trawler_kafka.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
"""
For generic interaction with a kafka stream.
"""
from pykafka import KafkaClient
try:
import ujson as json #Faster for some operations
except:
import json
class TrawlerKafka:
def __init__(self, host, port, topic='trawler'):
"""
Connect to a running Kafka instance on `host` and
`port`, and return closures around this to easily
push tweets to it.
"""
kafka_client = KafkaClient(hosts="%s:%s" % (host,port))
topic = kafka_client.topics[topic]
producer = topic.get_producer()
self.client = kafka_client
self.topic = topic
self.producer = producer
"""self.consumer = topic.get_simple_consumer(consumer_group='mygroup',
#auto_offset_reset=OffsetType.EARLIEST,
reset_offset_on_start=False)
"""
#self.consumer.start()
#self.consumer.
self.consumer = topic.get_simple_consumer()
def send_individual_tweets( self, tweets):
"""
Send each tweet in `tweets` as an individual Kafka message.
"""
for tweet in tweets:
self.producer.produce([json.dumps(tweet)])
def send_individual_tweet( self, tweet):
"""
Send this tweet as a message.
"""
self.send_individual_tweets([tweet])
def send_bulk_tweets( self, tweets):
"""
Send `tweets` in bulk as a single kafka message.
"""
self.producer.produce(json.dumps(tweets))
def get_tweets(self):
"""
Get and reconstitute tweets from a kafka queue
"""
for message in self.consumer:
if message is not None:
tweet = json.loads(message.value)
yield tweet
if __name__ == '__main__':
"""
Run standalone to test
"""
trawler_kafka = TrawlerKafka( 'localhost',9092)
import datetime as dt
tweets = [{'text':'tweet tweet','timestamp':dt.datetime.now().isoformat()},{'text':'tweety tweet tweet'},{'text':'tweety tweet tweety tweet'}]
for i in range(10):
trawler_kafka.send_individual_tweets(tweets)
print "------------------------"
trawler_kafka2 = TrawlerKafka('localhost',9092)
for m in trawler_kafka2.get_tweets():
print m.keys()