-
Notifications
You must be signed in to change notification settings - Fork 0
/
subscription_model_test.go
102 lines (88 loc) · 2.87 KB
/
subscription_model_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
//go:build integration
package goomerang_test
import (
"testing"
"time"
"go.eloylp.dev/goomerang/client"
"go.eloylp.dev/goomerang/example/protos"
"go.eloylp.dev/goomerang/internal/test"
"go.eloylp.dev/goomerang/message"
"go.eloylp.dev/goomerang/server"
)
func TestSubscriptionsFromClientSide(t *testing.T) {
t.Parallel()
arbiter := test.NewArbiter(t)
s, run := Server(t, server.WithOnErrorHook(noErrorHook(arbiter)))
s.RegisterMessage(defaultMsg().Payload)
run()
defer s.Shutdown(defaultCtx)
c1, connect1 := Client(t,
client.WithServerAddr(s.Addr()),
client.WithOnErrorHook(noErrorHook(arbiter)),
)
c1.Handle(defaultMsg().Payload, message.HandlerFunc(func(_ message.Sender, msg *message.Message) {
_ = msg.Payload.(*protos.MessageV1)
arbiter.ItsAFactThat("CLIENT1_RECEIVED_MESSAGE")
}))
connect1()
failIfErr(t, c1.Subscribe("topic.a"))
defer c1.Close(defaultCtx)
c2, connect2 := Client(t,
client.WithServerAddr(s.Addr()),
client.WithOnErrorHook(noErrorHook(arbiter)),
)
c2.Handle(defaultMsg().Payload, message.HandlerFunc(func(_ message.Sender, msg *message.Message) {
_ = msg.Payload.(*protos.MessageV1)
arbiter.ItsAFactThat("CLIENT2_RECEIVED_MESSAGE")
}))
connect2()
failIfErr(t, c2.Subscribe("topic.a"))
defer c2.Close(defaultCtx)
c3, connect3 := Client(t,
client.WithServerAddr(s.Addr()),
client.WithOnErrorHook(noErrorHook(arbiter)),
)
connect3()
if _, err := c3.Publish("topic.a", defaultMsg()); err != nil {
failIfErr(t, err)
}
defer c3.Close(defaultCtx)
arbiter.RequireNoErrors()
arbiter.RequireHappened("CLIENT1_RECEIVED_MESSAGE")
arbiter.RequireHappened("CLIENT2_RECEIVED_MESSAGE")
arbiter.RequireNotHappened("CLIENT3_RECEIVED_MESSAGE")
}
func TestServerPublish(t *testing.T) {
t.Parallel()
arbiter := test.NewArbiter(t)
s, run := Server(t, server.WithOnErrorHook(noErrorHook(arbiter)))
run()
defer s.Shutdown(defaultCtx)
c1, connect1 := Client(t,
client.WithServerAddr(s.Addr()),
client.WithOnErrorHook(noErrorHook(arbiter)),
)
c1.Handle(defaultMsg().Payload, message.HandlerFunc(func(_ message.Sender, msg *message.Message) {
_ = msg.Payload.(*protos.MessageV1)
arbiter.ItsAFactThat("CLIENT1_RECEIVED_MESSAGE")
}))
connect1()
failIfErr(t, c1.Subscribe("topic.a"))
defer c1.Close(defaultCtx)
c2, connect2 := Client(t,
client.WithServerAddr(s.Addr()),
client.WithOnErrorHook(noErrorHook(arbiter)),
)
c2.Handle(defaultMsg().Payload, message.HandlerFunc(func(_ message.Sender, msg *message.Message) {
_ = msg.Payload.(*protos.MessageV1)
arbiter.ItsAFactThat("CLIENT2_RECEIVED_MESSAGE")
}))
connect2()
failIfErr(t, c2.Subscribe("topic.a"))
defer c2.Close(defaultCtx)
time.Sleep(500 * time.Millisecond) // wait for subscriptions to happen
failIfErr(t, s.Publish("topic.a", defaultMsg()))
arbiter.RequireNoErrors()
arbiter.RequireHappened("CLIENT1_RECEIVED_MESSAGE")
arbiter.RequireHappened("CLIENT2_RECEIVED_MESSAGE")
}