-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathslave.py
48 lines (36 loc) · 1.35 KB
/
slave.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import sys
import time
import gevent
from message import Message
from baserunner import BaseRunner
class SlaveRunner(BaseRunner):
where = 'slave'
def __init__(self, client):
super(BaseRunner, self).__init__()
self._client = client
self._id = self._client.get_id()
def loop(self):
self.send_to_master('register_slave')
# infinite loop for slave to handle tasks, it will stop until got stop message
def _handle_operation(msg):
print("received message %s from slave %s" % (msg.type, msg.node_id))
getattr(self, 'on_%s' % msg.type)(msg)
while True:
msg = self._client.recv()
if msg.node_id not in (None, self._id):
continue
gevent.spawn(_handle_operation, msg)
def send_to_master(self, msg_type, msg_data=None):
print("send message %s to master" % msg_type)
self._client.send(Message(msg_type, msg_data, self._id))
def on_operation_start(self, msg):
''' execute the specified operation '''
result = self._execute(*msg.data)
self.send_to_master("receive_result", result)
def on_cancel(self, msg):
self.cancel()
def on_stop(self, msg):
sys.stderr.write('got stop message, close slave ...')
self.stop()
def stop(self):
self._client.close()