-
Notifications
You must be signed in to change notification settings - Fork 489
/
Copy pathclonesrv5.java
175 lines (156 loc) · 6.01 KB
/
clonesrv5.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
package guide;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZLoop;
import org.zeromq.ZLoop.IZLoopHandler;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.PollItem;
import org.zeromq.ZMQ.Socket;
// Clone server - Model Five
public class clonesrv5
{
private final ZContext ctx; // Context wrapper
private final Map<String, kvmsg> kvmap; // Key-value store
private final ZLoop loop; // zloop reactor
private long sequence; // How many updates we're at
private final Socket snapshot; // Handle snapshot requests
private final Socket publisher; // Publish updates to clients
private final Socket collector; // Collect updates from clients
// .split snapshot handler
// This is the reactor handler for the snapshot socket; it accepts
// just the ICANHAZ? request and replies with a state snapshot ending
// with a KTHXBAI message:
private static class Snapshots implements IZLoopHandler
{
@Override
public int handle(ZLoop loop, PollItem item, Object arg)
{
clonesrv5 srv = (clonesrv5) arg;
Socket socket = item.getSocket();
byte[] identity = socket.recv();
if (identity != null) {
// Request is in second frame of message
String request = socket.recvStr();
String subtree = null;
if (request.equals("ICANHAZ?")) {
subtree = socket.recvStr();
}
else System.out.print("E: bad request, aborting\n");
if (subtree != null) {
// Send state socket to client
for (Entry<String, kvmsg> entry : srv.kvmap.entrySet()) {
sendSingle(entry.getValue(), identity, subtree, socket);
}
// Now send END message with getSequence number
System.out.printf("I: sending shapshot=%d\n", srv.sequence);
socket.send(identity, ZMQ.SNDMORE);
kvmsg kvmsg = new kvmsg(srv.sequence);
kvmsg.setKey("KTHXBAI");
kvmsg.setBody(subtree.getBytes(ZMQ.CHARSET));
kvmsg.send(socket);
kvmsg.destroy();
}
}
return 0;
}
}
// .split collect updates
// We store each update with a new getSequence number, and if necessary, a
// time-to-live. We publish updates immediately on our publisher socket:
private static class Collector implements IZLoopHandler
{
@Override
public int handle(ZLoop loop, PollItem item, Object arg)
{
clonesrv5 srv = (clonesrv5) arg;
Socket socket = item.getSocket();
kvmsg msg = kvmsg.recv(socket);
if (msg != null) {
msg.setSequence(++srv.sequence);
msg.send(srv.publisher);
int ttl = Integer.parseInt(msg.getProp("ttl"));
if (ttl > 0)
msg.setProp("ttl", "%d", System.currentTimeMillis() + ttl * 1000);
msg.store(srv.kvmap);
System.out.printf("I: publishing update=%d\n", srv.sequence);
}
return 0;
}
}
private static class FlushTTL implements IZLoopHandler
{
@Override
public int handle(ZLoop loop, PollItem item, Object arg)
{
clonesrv5 srv = (clonesrv5) arg;
if (srv.kvmap != null) {
for (kvmsg msg : new ArrayList<>(srv.kvmap.values())) {
srv.flushSingle(msg);
}
}
return 0;
}
}
public clonesrv5()
{
// Main port we're working on
int port = 5556;
ctx = new ZContext();
kvmap = new HashMap<>();
loop = new ZLoop(ctx);
loop.verbose(false);
// Set up our clone server sockets
snapshot = ctx.createSocket(SocketType.ROUTER);
snapshot.bind(String.format("tcp://*:%d", port));
publisher = ctx.createSocket(SocketType.PUB);
publisher.bind(String.format("tcp://*:%d", port + 1));
collector = ctx.createSocket(SocketType.PULL);
collector.bind(String.format("tcp://*:%d", port + 2));
}
public void run()
{
// Register our handlers with reactor
PollItem poller = new PollItem(snapshot, ZMQ.Poller.POLLIN);
loop.addPoller(poller, new Snapshots(), this);
poller = new PollItem(collector, ZMQ.Poller.POLLIN);
loop.addPoller(poller, new Collector(), this);
loop.addTimer(1000, 0, new FlushTTL(), this);
loop.start();
loop.destroy();
ctx.destroy();
}
// We call this function for each getKey-value pair in our hash table
private static void sendSingle(kvmsg msg, byte[] identity, String subtree, Socket socket)
{
if (msg.getKey().startsWith(subtree)) {
socket.send(identity, // Choose recipient
ZMQ.SNDMORE);
msg.send(socket);
}
}
// .split flush ephemeral values
// At regular intervals, we flush ephemeral values that have expired. This
// could be slow on very large data sets:
// If getKey-value pair has expired, delete it and publish the
// fact to listening clients.
private void flushSingle(kvmsg msg)
{
long ttl = Long.parseLong(msg.getProp("ttl"));
if (ttl > 0 && System.currentTimeMillis() >= ttl) {
msg.setSequence(++sequence);
msg.setBody(ZMQ.MESSAGE_SEPARATOR);
msg.send(publisher);
msg.store(kvmap);
System.out.printf("I: publishing delete=%d\n", sequence);
}
}
public static void main(String[] args)
{
clonesrv5 srv = new clonesrv5();
srv.run();
}
}