-
Notifications
You must be signed in to change notification settings - Fork 489
/
Copy pathclonesrv2.java
128 lines (106 loc) · 4.14 KB
/
clonesrv2.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package guide;
import java.nio.ByteBuffer;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Poller;
import org.zeromq.ZMQ.Socket;
import org.zeromq.ZThread;
import org.zeromq.ZThread.IAttachedRunnable;
/**
* Clone server Model Two
*
* @author Danish Shrestha <[email protected]>
*
*/
public class clonesrv2
{
public void run()
{
try (ZContext ctx = new ZContext()) {
Socket publisher = ctx.createSocket(SocketType.PUB);
publisher.bind("tcp://*:5557");
Socket updates = ZThread.fork(ctx, new StateManager());
Random random = new Random();
long sequence = 0;
while (!Thread.currentThread().isInterrupted()) {
long currentSequenceNumber = ++sequence;
int key = random.nextInt(10000);
int body = random.nextInt(1000000);
ByteBuffer b = ByteBuffer.allocate(4);
b.asIntBuffer().put(body);
kvsimple kvMsg = new kvsimple(String.valueOf(key), currentSequenceNumber, b.array()
);
kvMsg.send(publisher);
kvMsg.send(updates); // send a message to State Manager thread.
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
}
}
System.out.printf(" Interrupted\n%d messages out\n", sequence);
}
}
public static class StateManager implements IAttachedRunnable
{
private static final Map<String, kvsimple> kvMap = new LinkedHashMap<>();
@Override
public void run(Object[] args, ZContext ctx, Socket pipe)
{
pipe.send("READY"); // optional
Socket snapshot = ctx.createSocket(SocketType.ROUTER);
snapshot.bind("tcp://*:5556");
Poller poller = ctx.createPoller(2);
poller.register(pipe, ZMQ.Poller.POLLIN);
poller.register(snapshot, ZMQ.Poller.POLLIN);
long stateSequence = 0;
while (!Thread.currentThread().isInterrupted()) {
if (poller.poll() < 0)
break; // Context has been shut down
// apply state updates from main thread
if (poller.pollin(0)) {
kvsimple kvMsg = kvsimple.recv(pipe);
if (kvMsg == null)
break;
StateManager.kvMap.put(kvMsg.getKey(), kvMsg);
stateSequence = kvMsg.getSequence();
}
// execute state snapshot request
if (poller.pollin(1)) {
byte[] identity = snapshot.recv(0);
if (identity == null)
break;
String request = new String(snapshot.recv(0), ZMQ.CHARSET);
if (!request.equals("ICANHAZ?")) {
System.out.println("E: bad request, aborting");
break;
}
for (Entry<String, kvsimple> entry : kvMap.entrySet()) {
kvsimple msg = entry.getValue();
System.out.println("Sending message " + entry.getValue().getSequence());
this.sendMessage(msg, identity, snapshot);
}
// now send end message with getSequence number
System.out.println("Sending state snapshot = " + stateSequence);
snapshot.send(identity, ZMQ.SNDMORE);
kvsimple message = new kvsimple("KTHXBAI", stateSequence, ZMQ.MESSAGE_SEPARATOR);
message.send(snapshot);
}
}
}
private void sendMessage(kvsimple msg, byte[] identity, Socket snapshot)
{
snapshot.send(identity, ZMQ.SNDMORE);
msg.send(snapshot);
}
}
public static void main(String[] args)
{
new clonesrv2().run();
}
}