-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathpool_test.go
337 lines (278 loc) · 10 KB
/
pool_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
package connection_test
import (
"fmt"
"net"
"sync/atomic"
"testing"
"time"
connection "github.com/moov-io/iso8583-connection"
"github.com/stretchr/testify/require"
)
func TestPool(t *testing.T) {
// Given
// servers started
var servers []*testServer
// servers to start
serversToStart := 2
// list of addresses of the servers
var addrs []string
// each server on connect will increment connectionsCnt counter
var connectionsCnt int32
// start all servers
for i := 0; i < serversToStart; i++ {
server, err := NewTestServer()
require.NoError(t, err)
addrs = append(addrs, server.Addr)
servers = append(servers, server)
// increase counter when server receives connection
server.Server.AddConnectionHandler(func(_ net.Conn) {
atomic.AddInt32(&connectionsCnt, 1)
})
}
// close all servers on exit
defer func() {
for _, server := range servers {
server.Close()
}
}()
// And a factory method that will build connection for the pool
factory := func(addr string) (*connection.Connection, error) {
// all our addresses have same configs, but if you need to use
// different TLS config, you can define config out of this
// function like:
// tlsConfigs := map[string]*tls.Config{
// "127.0.0.1": &tls.Config{},
// "127.0.0.2": &tls.Config{},
// }
// and inside factory, just pick your config based on the address
// tlsConfig := tlsConfigs[addr]
c, err := connection.New(
addr,
testSpec,
readMessageLength,
writeMessageLength,
// set short connect timeout so we can test re-connects
connection.ConnectTimeout(500*time.Millisecond),
)
if err != nil {
return nil, fmt.Errorf("building iso 8583 connection: %w", err)
}
return c, nil
}
// And the pool of connections
// one of our tests
reconnectWait := 500 * time.Millisecond
pool, err := connection.NewPool(
factory,
addrs,
connection.PoolReconnectWait(reconnectWait),
)
require.NoError(t, err)
defer pool.Close()
// pool is Down by default
require.False(t, pool.IsUp())
t.Run("Connect() establishes connections to all servers", func(t *testing.T) {
// When we Connect pool
err := pool.Connect()
require.NoError(t, err)
// Then pool builds and connects connections to all servers
require.Eventually(t, func() bool {
// we expect connectionsCnt counter to be incremented by both servers
return atomic.LoadInt32(&connectionsCnt) == int32(serversToStart) //nolint:gosec // disable G115 as it's a false positive
}, 500*time.Millisecond, 50*time.Millisecond, "%d expected connections established, but got %d", serversToStart, atomic.LoadInt32(&connectionsCnt))
// And pool has serversCnt connections
require.Len(t, pool.Connections(), serversToStart)
})
t.Run("Get returns next connection from the pool looping over the list of connections", func(t *testing.T) {
// When we call Get() len(pool.Connections()) * N times
var n = 3
var connections []*connection.Connection
for i := 0; i < len(pool.Connections())*n; i++ {
conn, err := pool.Get()
require.NoError(t, err)
connections = append(connections, conn)
}
// Then each connection should be returned N times
counter := map[*connection.Connection]int{}
for _, conn := range connections {
if conn == nil {
continue
}
counter[conn]++
}
require.NotEmpty(t, counter)
for _, returns := range counter {
require.Equal(t, n, returns)
}
})
t.Run("when one of the servers is down it re-connects when server returns", func(t *testing.T) {
connectionsCntBeforeServerShutdown := len(pool.Connections())
// when we shutdown one of the servers
servers[0].Close()
// then we have one less connection
require.Eventually(t, func() bool {
return len(pool.Connections()) == connectionsCntBeforeServerShutdown-1
}, 500*time.Millisecond, 50*time.Millisecond, "expect to have one less connection")
// when we start server again
server, err := NewTestServerWithAddr(servers[0].Addr)
require.NoError(t, err)
// so we will not forget to close server on exit
servers[0] = server
// then we have one more connection (the same as it was before
// we shut down the server)
require.Eventually(t, func() bool {
return len(pool.Connections()) == connectionsCntBeforeServerShutdown
}, 2000*time.Millisecond, 50*time.Millisecond, "expect to have one less connection")
})
t.Run("when MaxReconnectWait is set reconnect wait time will exponentially increase", func(t *testing.T) {
// Context: MaxReconnectWait is set to 400ms and initial
// ReconnectWait is 100ms. The server is offline, so the pool
// will try to reconnect endlessly with exponential backoff
// until we close the pool. We will wait for 850ms before
// checking the number of reconnects. Expected number of
// reconnects within 850ms is 3. The sequence of reconnect
// waits is: 100ms, 200ms, 400ms (total 700ms). Next reconnect
// would take 400ms, which exceeds 1 second. Thus, 3 reconnect
// attempts are expected.
// Counter for connection attempts
var connectAttempts atomic.Int32
// Factory method will be called on each connection attempt
factory := func(addr string) (*connection.Connection, error) {
// Increment connection attempts counter
connectAttempts.Add(1)
c, err := connection.New(
addr,
testSpec,
readMessageLength,
writeMessageLength,
)
if err != nil {
return nil, fmt.Errorf("building iso 8583 connection: %w", err)
}
return c, nil
}
pool, err := connection.NewPool(
factory,
[]string{"no-live-server-address"}, // connect only to the first server
connection.PoolReconnectWait(100*time.Millisecond),
connection.PoolMaxReconnectWait(400*time.Millisecond),
// let pool start even without connections (it will start reconnecting)
connection.PoolMinConnections(0),
)
require.NoError(t, err)
// There should be no error even if we try to connect to a non-existing server
// as the min connections is set to 0
err = pool.Connect()
require.NoError(t, err)
defer pool.Close()
time.Sleep(850 * time.Millisecond)
require.Equal(t, int32(4), connectAttempts.Load(), "expected 4 connection attempts (3 reconnects + 1 initial connect)")
})
t.Run("Close() closes all connections", func(t *testing.T) {
require.NotZero(t, pool.Connections())
// when we close the pool
err := pool.Close()
require.NoError(t, err)
// then pool has no connections
require.Zero(t, pool.Connections())
})
// when pool is closed
t.Run("Get() returns error when no connections", func(t *testing.T) {
// given pool is closed
err := pool.Close()
require.NoError(t, err)
_, err = pool.Get()
require.EqualError(t, err, "pool is closed")
})
t.Run("Connect() returns error when number or established connections is less than MinConnections", func(t *testing.T) {
// we have only 2 servers running, so we can't establish 3 connections
pool, err := connection.NewPool(factory, addrs, connection.PoolMinConnections(3))
require.NoError(t, err)
err = pool.Connect()
require.Error(t, err)
require.Contains(t, err.Error(), "minimum 3 connections is required, established: 2")
require.Zero(t, len(pool.Connections()), "all connections should be closed")
})
t.Run("Connect() returns no error when established >= MinConnections and later re-establish failed connections", func(t *testing.T) {
// when we shutdown one of the servers
servers[0].Close()
// we have only 1 of serversToStart servers running, so serversToStart-1 connection should be established
pool, err := connection.NewPool(
factory,
addrs,
connection.PoolMinConnections(1),
connection.PoolReconnectWait(reconnectWait),
)
require.NoError(t, err)
// when we connect
err = pool.Connect()
// no error should be returned here as MinConnections is 1
require.NoError(t, err)
defer pool.Close()
require.Equal(t, len(pool.Connections()), serversToStart-1)
// when server gets back
server, err := NewTestServerWithAddr(servers[0].Addr)
require.NoError(t, err)
// return server back to the list so we will not forget to
// close server on exit
servers[0] = server
// then connection should be established
require.Eventually(t, func() bool {
return len(pool.Connections()) == serversToStart
}, 2000*time.Millisecond, 50*time.Millisecond, "expect to have one less connection")
})
t.Run("Get() returns filtered connections", func(t *testing.T) {
var onConnectCalled int32
// set status `online` (value) only for the first connection
// keeping second connection with status `offline`
onConnect := func(conn *connection.Connection) error {
if atomic.AddInt32(&onConnectCalled, 1) == 1 {
conn.SetStatus(connection.StatusOnline)
}
return nil
}
// And a factory method that will build connection for the pool
factory := func(addr string) (*connection.Connection, error) {
c, err := connection.New(
addr,
testSpec,
readMessageLength,
writeMessageLength,
// set status `online` (value) only for the first connection
connection.OnConnect(onConnect),
)
if err != nil {
return nil, fmt.Errorf("building iso 8583 connection: %w", err)
}
return c, nil
}
// filterOnlineConnections returns only connections with status `online`
filterOnlineConnections := func(conn *connection.Connection) bool {
return conn.Status() == connection.StatusOnline
}
pool, err := connection.NewPool(
factory,
addrs,
connection.PoolConnectionsFilter(filterOnlineConnections),
)
require.NoError(t, err)
// when we connect
err = pool.Connect()
require.NoError(t, err)
defer pool.Close()
// we expect to have to connections
require.Equal(t, len(pool.Connections()), serversToStart)
// Get should return the same connection twice (filter not `online` connections)
conn, err := pool.Get()
require.NoError(t, err)
require.Equal(t, conn.Status(), connection.StatusOnline)
// Get should return the same connection as the first one (filter not `online` connections)
for i := 0; i < serversToStart; i++ {
connN, err := pool.Get()
require.NoError(t, err)
require.Equal(t, conn, connN)
}
// should be degraded
require.True(t, pool.IsDegraded())
})
}