-
Notifications
You must be signed in to change notification settings - Fork 489
/
Copy pathespresso.java
105 lines (93 loc) · 3.56 KB
/
espresso.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package guide;
import java.util.Random;
import org.zeromq.*;
import org.zeromq.ZMQ.Socket;
import org.zeromq.ZThread.IAttachedRunnable;
// Espresso Pattern
// This shows how to capture data using a pub-sub proxy
public class espresso
{
// The subscriber thread requests messages starting with
// A and B, then reads and counts incoming messages.
private static class Subscriber implements IAttachedRunnable
{
@Override
public void run(Object[] args, ZContext ctx, Socket pipe)
{
// Subscribe to "A" and "B"
Socket subscriber = ctx.createSocket(SocketType.SUB);
subscriber.connect("tcp://localhost:6001");
subscriber.subscribe("A".getBytes(ZMQ.CHARSET));
subscriber.subscribe("B".getBytes(ZMQ.CHARSET));
int count = 0;
while (count < 5) {
String string = subscriber.recvStr();
if (string == null)
break; // Interrupted
count++;
}
subscriber.close();
}
}
// .split publisher thread
// The publisher sends random messages starting with A-J:
private static class Publisher implements IAttachedRunnable
{
@Override
public void run(Object[] args, ZContext ctx, Socket pipe)
{
Socket publisher = ctx.createSocket(SocketType.PUB);
publisher.bind("tcp://*:6000");
Random rand = new Random(System.currentTimeMillis());
while (!Thread.currentThread().isInterrupted()) {
String string = String.format("%c-%05d", 'A' + rand.nextInt(10), rand.nextInt(100000));
if (!publisher.send(string))
break; // Interrupted
try {
Thread.sleep(100); // Wait for 1/10th second
}
catch (InterruptedException e) {
}
}
publisher.close();
}
}
// .split listener thread
// The listener receives all messages flowing through the proxy, on its
// pipe. In CZMQ, the pipe is a pair of ZMQ_PAIR sockets that connect
// attached child threads. In other languages your mileage may vary:
private static class Listener implements IAttachedRunnable
{
@Override
public void run(Object[] args, ZContext ctx, Socket pipe)
{
// Print everything that arrives on pipe
while (true) {
ZFrame frame = ZFrame.recvFrame(pipe);
if (frame == null)
break; // Interrupted
frame.print(null);
frame.destroy();
}
}
}
// .split main thread
// The main task starts the subscriber and publisher, and then sets
// itself up as a listening proxy. The listener runs as a child thread:
public static void main(String[] argv)
{
try (ZContext ctx = new ZContext()) {
// Start child threads
ZThread.fork(ctx, new Publisher());
ZThread.fork(ctx, new Subscriber());
Socket subscriber = ctx.createSocket(SocketType.XSUB);
subscriber.connect("tcp://localhost:6000");
Socket publisher = ctx.createSocket(SocketType.XPUB);
publisher.bind("tcp://*:6001");
Socket listener = ZThread.fork(ctx, new Listener());
ZMQ.proxy(subscriber, publisher, listener);
System.out.println(" interrupted");
// NB: child threads exit here when the context is closed
}
}
}