-
Notifications
You must be signed in to change notification settings - Fork 489
/
Copy pathmdcliapi.java
135 lines (115 loc) · 3.42 KB
/
mdcliapi.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package guide;
import java.util.Formatter;
import org.zeromq.*;
/**
* Majordomo Protocol Client API, Java version Implements the MDP/Worker spec at
* http://rfc.zeromq.org/spec:7.
*
*/
public class mdcliapi
{
private final String broker;
private final ZContext ctx;
private ZMQ.Socket client;
private long timeout = 2500;
private int retries = 3;
private final boolean verbose;
private final Formatter log = new Formatter(System.out);
public long getTimeout()
{
return timeout;
}
public void setTimeout(long timeout)
{
this.timeout = timeout;
}
public int getRetries()
{
return retries;
}
public void setRetries(int retries)
{
this.retries = retries;
}
public mdcliapi(String broker, boolean verbose)
{
this.broker = broker;
this.verbose = verbose;
ctx = new ZContext();
reconnectToBroker();
}
/**
* Connect or reconnect to broker
*/
void reconnectToBroker()
{
if (client != null) {
client.close();
}
client = ctx.createSocket(SocketType.REQ);
client.connect(broker);
if (verbose)
log.format("I: connecting to broker at %s\n", broker);
}
/**
* Send request to broker and get reply by hook or crook Takes ownership of
* request message and destroys it when sent. Returns the reply message or
* NULL if there was no reply.
*
* @param service
* @param request
* @return
*/
public ZMsg send(String service, ZMsg request)
{
request.push(new ZFrame(service));
request.push(MDP.C_CLIENT.newFrame());
if (verbose) {
log.format("I: send request to '%s' service: \n", service);
request.dump(log.out());
}
ZMsg reply = null;
int retriesLeft = retries;
while (retriesLeft > 0 && !Thread.currentThread().isInterrupted()) {
request.duplicate().send(client);
// Poll socket for a reply, with timeout
ZMQ.Poller items = ctx.createPoller(1);
items.register(client, ZMQ.Poller.POLLIN);
if (items.poll(timeout) == -1)
break; // Interrupted
if (items.pollin(0)) {
ZMsg msg = ZMsg.recvMsg(client);
if (verbose) {
log.format("I: received reply: \n");
msg.dump(log.out());
}
// Don't try to handle errors, just assert noisily
assert (msg.size() >= 3);
ZFrame header = msg.pop();
assert (MDP.C_CLIENT.equals(header.toString()));
header.destroy();
ZFrame replyService = msg.pop();
assert (service.equals(replyService.toString()));
replyService.destroy();
reply = msg;
break;
}
else {
items.unregister(client);
if (--retriesLeft == 0) {
log.format("W: permanent error, abandoning\n");
break;
}
log.format("W: no reply, reconnecting\n");
reconnectToBroker();
}
items.close();
}
request.destroy();
return reply;
}
public void destroy()
{
ctx.destroy();
}
}