-
Notifications
You must be signed in to change notification settings - Fork 489
/
Copy pathlbbroker.java
178 lines (144 loc) · 6.4 KB
/
lbbroker.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
package guide;
import java.util.LinkedList;
import java.util.Queue;
import org.zeromq.SocketType;
import org.zeromq.ZMQ.Poller;
import org.zeromq.ZMQ.Socket;
import org.zeromq.ZContext;
public class lbbroker
{
private static final int NBR_CLIENTS = 10;
private static final int NBR_WORKERS = 3;
/**
* Basic request-reply client using REQ socket
*/
private static class ClientTask extends Thread
{
@Override
public void run()
{
// Prepare our context and sockets
try (ZContext context = new ZContext()) {
Socket client = context.createSocket(SocketType.REQ);
ZHelper.setId(client); // Set a printable identity
client.connect("ipc://frontend.ipc");
// Send request, get reply
client.send("HELLO");
String reply = client.recvStr();
System.out.println("Client: " + reply);
}
}
}
/**
* While this example runs in a single process, that is just to make
* it easier to start and stop the example. Each thread has its own
* context and conceptually acts as a separate process.
* This is the worker task, using a REQ socket to do load-balancing.
*/
private static class WorkerTask extends Thread
{
@Override
public void run()
{
// Prepare our context and sockets
try (ZContext context = new ZContext()) {
Socket worker = context.createSocket(SocketType.REQ);
ZHelper.setId(worker); // Set a printable identity
worker.connect("ipc://backend.ipc");
// Tell backend we're ready for work
worker.send("READY");
while (!Thread.currentThread().isInterrupted()) {
String address = worker.recvStr();
String empty = worker.recvStr();
assert (empty.isEmpty());
// Get request, send reply
String request = worker.recvStr();
System.out.println("Worker: " + request);
worker.sendMore(address);
worker.sendMore("");
worker.send("OK");
}
}
}
}
/**
* This is the main task. It starts the clients and workers, and then
* routes requests between the two layers. Workers signal READY when
* they start; after that we treat them as ready when they reply with
* a response back to a client. The load-balancing data structure is
* just a queue of next available workers.
*/
public static void main(String[] args)
{
// Prepare our context and sockets
try (ZContext context = new ZContext()) {
Socket frontend = context.createSocket(SocketType.ROUTER);
Socket backend = context.createSocket(SocketType.ROUTER);
frontend.bind("ipc://frontend.ipc");
backend.bind("ipc://backend.ipc");
int clientNbr;
for (clientNbr = 0; clientNbr < NBR_CLIENTS; clientNbr++)
new ClientTask().start();
for (int workerNbr = 0; workerNbr < NBR_WORKERS; workerNbr++)
new WorkerTask().start();
// Here is the main loop for the least-recently-used queue. It has
// two sockets; a frontend for clients and a backend for workers.
// It polls the backend in all cases, and polls the frontend only
// when there are one or more workers ready. This is a neat way to
// use 0MQ's own queues to hold messages we're not ready to process
// yet. When we get a client reply, we pop the next available
// worker, and send the request to it, including the originating
// client identity. When a worker replies, we re-queue that worker,
// and we forward the reply to the original client, using the reply
// envelope.
// Queue of available workers
Queue<String> workerQueue = new LinkedList<>();
while (!Thread.currentThread().isInterrupted()) {
// Initialize poll set
Poller items = context.createPoller(2);
// Always poll for worker activity on backend
items.register(backend, Poller.POLLIN);
// Poll front-end only if we have available workers
if (!workerQueue.isEmpty())
items.register(frontend, Poller.POLLIN);
if (items.poll() < 0)
break; // Interrupted
// Handle worker activity on backend
if (items.pollin(0)) {
// Queue worker address for LRU routing
workerQueue.add(backend.recvStr());
// Second frame is empty
String empty = backend.recvStr();
assert (empty.isEmpty());
// Third frame is READY or else a client reply address
String clientAddr = backend.recvStr();
// If client reply, send rest back to frontend
if (!clientAddr.equals("READY")) {
empty = backend.recvStr();
assert (empty.isEmpty());
String reply = backend.recvStr();
frontend.sendMore(clientAddr);
frontend.sendMore("");
frontend.send(reply);
if (--clientNbr == 0)
break;
}
}
if (items.pollin(1)) {
// Now get next client request, route to LRU worker
// Client request is [address][empty][request]
String clientAddr = frontend.recvStr();
String empty = frontend.recvStr();
assert (empty.isEmpty());
String request = frontend.recvStr();
String workerAddr = workerQueue.poll();
backend.sendMore(workerAddr);
backend.sendMore("");
backend.sendMore(clientAddr);
backend.sendMore("");
backend.send(request);
}
}
}
}
}