Skip to content

Commit f3459fd

Browse files
authored
Conn pool improvements (#103)
* set read deadline to detect closed connections early * split method into multiline * prevent from calling OnClose multiple times * fix flaky test * use new ConnectionFailedHandler to trigger reconnect in the pool * close connection, if it times out - ignore * do not exit from re-connect loop if factory fails to build connection * implement force close for hanging tls connections * remove close timeout as it is not used * it's not an error when conn is not managed by pool * increase force conn close timeout to 500ms * switch pool back to ConnectionClosedHandler
1 parent 157644d commit f3459fd

5 files changed

Lines changed: 202 additions & 61 deletions

File tree

connection.go

Lines changed: 94 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ type Connection struct {
9191
// to protect following: closing, status
9292
mutex sync.Mutex
9393

94-
// user has called Close
94+
// user has called Close or we are closing connection due to an error
95+
// once closing is set to true, connection is unusable
9596
closing bool
9697

9798
// connection status
@@ -101,7 +102,13 @@ type Connection struct {
101102
var _ io.Writer = (*Connection)(nil)
102103

103104
// New creates and configures Connection. To establish network connection, call `Connect()`.
104-
func New(addr string, spec *iso8583.MessageSpec, mlReader MessageLengthReader, mlWriter MessageLengthWriter, options ...Option) (*Connection, error) {
105+
func New(
106+
addr string,
107+
spec *iso8583.MessageSpec,
108+
mlReader MessageLengthReader,
109+
mlWriter MessageLengthWriter,
110+
options ...Option,
111+
) (*Connection, error) {
105112
opts := GetDefaultOptions()
106113
for _, opt := range options {
107114
if err := opt(&opts); err != nil {
@@ -126,12 +133,20 @@ func New(addr string, spec *iso8583.MessageSpec, mlReader MessageLengthReader, m
126133
// NewFrom accepts conn (net.Conn, or any io.ReadWriteCloser) which will be
127134
// used as a transport for the returned Connection. Returned Connection is
128135
// ready to be used for message sending and receiving
129-
func NewFrom(conn io.ReadWriteCloser, spec *iso8583.MessageSpec, mlReader MessageLengthReader, mlWriter MessageLengthWriter, options ...Option) (*Connection, error) {
136+
func NewFrom(
137+
conn io.ReadWriteCloser,
138+
spec *iso8583.MessageSpec,
139+
mlReader MessageLengthReader,
140+
mlWriter MessageLengthWriter,
141+
options ...Option,
142+
) (*Connection, error) {
130143
c, err := New("", spec, mlReader, mlWriter, options...)
131144
if err != nil {
132145
return nil, fmt.Errorf("creating client: %w", err)
133146
}
147+
134148
c.conn = conn
149+
135150
c.run()
136151
return c, nil
137152
}
@@ -186,10 +201,11 @@ func (c *Connection) ConnectCtx(ctx context.Context) error {
186201

187202
if onConnect != nil {
188203
if err := onConnect(ctx, c); err != nil {
189-
// close connection if OnConnect failed
190-
// but ignore the potential error from Close()
191-
// as it's a rare case
192-
_ = c.CloseCtx(ctx)
204+
// we can do the hard close here by calling TriggerFailure
205+
// If connection is not Online, pool will not return it and
206+
// no one will be able to use it. The rest of the messages
207+
// like echo, ping should be safe to terminate
208+
c.TriggerFailure(fmt.Errorf("on connect callback %s: %w", c.addr, err))
193209

194210
return fmt.Errorf("on connect callback %s: %w", c.addr, err)
195211
}
@@ -230,6 +246,7 @@ func (c *Connection) run() {
230246
go c.readResponseLoop()
231247
}
232248

249+
// handleError is used to track errors that happen during message reading or writing
233250
func (c *Connection) handleError(err error) {
234251
if c.Opts.ErrorHandler == nil {
235252
return
@@ -257,32 +274,33 @@ func (c *Connection) handleConnectionError(err error) {
257274
c.closing = true
258275
c.mutex.Unlock()
259276

260-
// channel to wait for all goroutines to exit
261-
done := make(chan bool)
277+
// first, notify all handlers that connection is closed due to an error
278+
if len(c.Opts.ConnectionFailedHandlers) > 0 {
279+
for _, handler := range c.Opts.ConnectionFailedHandlers {
280+
go handler(c, err)
281+
}
282+
}
262283

263284
c.pendingRequestsMu.Lock()
264285
for _, resp := range c.respMap {
265286
resp.errCh <- ErrConnectionClosed
266287
}
267288
c.pendingRequestsMu.Unlock()
268289

269-
// return error to all Send methods
290+
// Drain requestsCh in background (will stop when c.done closes - which will happen
291+
// when no more requests are sent to the connection). All requests will receive
292+
// ErrConnectionClosed error.
270293
go func() {
271294
for {
272295
select {
273296
case req := <-c.requestsCh:
274297
req.errCh <- ErrConnectionClosed
275-
case <-done:
298+
case <-c.done:
276299
return
277300
}
278301
}
279302
}()
280303

281-
go func() {
282-
c.wg.Wait()
283-
done <- true
284-
}()
285-
286304
// close everything else we close normally
287305
c.close()
288306
}
@@ -294,10 +312,7 @@ func (c *Connection) close() error {
294312
close(c.done)
295313

296314
if c.conn != nil {
297-
err := c.conn.Close()
298-
if err != nil {
299-
return fmt.Errorf("closing connection: %w", err)
300-
}
315+
c.closeConn()
301316
}
302317

303318
if len(c.Opts.ConnectionClosedHandlers) > 0 {
@@ -309,6 +324,29 @@ func (c *Connection) close() error {
309324
return nil
310325
}
311326

327+
func (c *Connection) closeConn() {
328+
t := time.AfterFunc(500*time.Millisecond, c.forceCloseConn)
329+
defer t.Stop()
330+
c.conn.Close()
331+
}
332+
333+
// A tls.Conn.Close can hang for a long time if the peer is unresponsive.
334+
// Try to shut it down more aggressively. This bypasses the TLS protocol
335+
// entirely and forcibly closes the underlying TCP connection. The kernel will
336+
// send a TCP RST, immediately terminating the connection without waiting for
337+
// TLS handshake completion.
338+
// taken from here:
339+
// https://github.com/golang/go/blob/3bea95b2778312dd733c0f13fe9ec20bd2bf2d13/src/net/http/h2_bundle.go#L8424
340+
func (c *Connection) forceCloseConn() {
341+
tc, ok := c.conn.(*tls.Conn)
342+
if !ok {
343+
return
344+
}
345+
if nc := tc.NetConn(); nc != nil {
346+
nc.Close()
347+
}
348+
}
349+
312350
// Close waits for pending requests to complete and then closes network
313351
// connection with ISO 8583 server
314352
func (c *Connection) Close() error {
@@ -318,6 +356,15 @@ func (c *Connection) Close() error {
318356
// CloseCtx waits for pending requests to complete and then closes network
319357
// connection with ISO 8583 server
320358
func (c *Connection) CloseCtx(ctx context.Context) error {
359+
c.mutex.Lock()
360+
// if we are closing already, just return
361+
if c.closing {
362+
c.mutex.Unlock()
363+
return nil
364+
}
365+
c.closing = true
366+
c.mutex.Unlock()
367+
321368
onClose := c.Opts.OnCloseCtx
322369
if onClose == nil && c.Opts.OnClose != nil {
323370
onClose = func(_ context.Context, c *Connection) error {
@@ -331,15 +378,6 @@ func (c *Connection) CloseCtx(ctx context.Context) error {
331378
}
332379
}
333380

334-
c.mutex.Lock()
335-
// if we are closing already, just return
336-
if c.closing {
337-
c.mutex.Unlock()
338-
return nil
339-
}
340-
c.closing = true
341-
c.mutex.Unlock()
342-
343381
return c.close()
344382
}
345383

@@ -378,6 +416,17 @@ type response struct {
378416
//
379417
// conn.Send(msg, connection.SendTimeout(5 * time.Second))
380418
func (c *Connection) Send(message *iso8583.Message, options ...Option) (*iso8583.Message, error) {
419+
c.mutex.Lock()
420+
if c.closing {
421+
c.mutex.Unlock()
422+
return nil, ErrConnectionClosed
423+
}
424+
// calling wg.Add(1) within mutex guarantees that it does not pass the wg.Wait() call in the Close method
425+
// otherwise we will have data race issue
426+
c.wg.Add(1)
427+
c.mutex.Unlock()
428+
defer c.wg.Done()
429+
381430
// use the SendTimeout from the connection options
382431
sendTimeout := c.Opts.SendTimeout
383432

@@ -396,17 +445,6 @@ func (c *Connection) Send(message *iso8583.Message, options ...Option) (*iso8583
396445
sendTimeout = opts.SendTimeout
397446
}
398447

399-
c.mutex.Lock()
400-
if c.closing {
401-
c.mutex.Unlock()
402-
return nil, ErrConnectionClosed
403-
}
404-
// calling wg.Add(1) within mutex guarantees that it does not pass the wg.Wait() call in the Close method
405-
// otherwise we will have data race issue
406-
c.wg.Add(1)
407-
c.mutex.Unlock()
408-
defer c.wg.Done()
409-
410448
// prepare request
411449
reqID, err := c.Opts.RequestIDGenerator.GenerateRequestID(message)
412450
if err != nil {
@@ -641,6 +679,7 @@ func (c *Connection) readLoop() {
641679

642680
r := bufio.NewReader(c.conn)
643681
for {
682+
644683
message, err := c.readMessage(r)
645684
if err != nil {
646685
c.handleError(utils.NewSafeError(err, "failed to read message from connection"))
@@ -794,3 +833,18 @@ func (c *Connection) RejectMessage(rejectedMessage *iso8583.Message, rejectionEr
794833

795834
return nil
796835
}
836+
837+
// TriggerFailure is used to trigger connection failure manually. It will notify
838+
// all ConnectionFailedHandlers and close the connection.
839+
func (c *Connection) TriggerFailure(reason error) {
840+
c.handleConnectionError(reason)
841+
}
842+
843+
// isAlive checks if the connection is alive. Currently, it is used to
844+
// filter out connections that are closing or closed in the pool.
845+
func (c *Connection) isAlive() bool {
846+
c.mutex.Lock()
847+
defer c.mutex.Unlock()
848+
849+
return !c.closing
850+
}

options.go

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,33 +49,45 @@ type Options struct {
4949

5050
// ConnectionClosedHandlers is called after connection is closed by us,
5151
// by the server or when there are network errors during network
52-
// read/write
52+
// read/write.
5353
ConnectionClosedHandlers []func(c *Connection)
5454

55+
// ConnectionFailedHandlers is called only when a connection fails due
56+
// to network issues, e.g. connection refused, connection reset by
57+
// peer, etc. Each of these handlers will be called in a separate
58+
// goroutine.
59+
ConnectionFailedHandlers []ConnectionFailedHandler
60+
5561
// ConnectionEstablishedHandler is called when connection is
56-
// established with the server
62+
// established with the server, after successful OnConnect or
63+
// OnConnectCtx execution. It's called in a separate goroutine
5764
ConnectionEstablishedHandler func(c *Connection)
5865

5966
TLSConfig *tls.Config
6067

6168
// ErrorHandler is called in a goroutine with the errors that can't be
62-
// returned to the caller
69+
// returned to the caller. You can use this handler to log errors
6370
ErrorHandler func(err error)
6471

72+
// OnConnectCtx is called synchronously when a connection is established.
73+
// The purpose of the OnConnect handler is to gracefully handle the
74+
// connection establishment, e.g. to send a sign-on message.
75+
OnConnectCtx func(ctx context.Context, c *Connection) error
76+
6577
// If both OnConnect and OnConnectCtx are set, OnConnectCtx will be used
6678
// OnConnect is called synchronously when a connection is established
6779
OnConnect func(c *Connection) error
6880

69-
// OnConnectCtx is called synchronously when a connection is established
70-
OnConnectCtx func(ctx context.Context, c *Connection) error
81+
// OnCloseCtx is called synchronously before a connection is closed
82+
// by us. It's not called when connection is closed by the other side
83+
// or due to network issues. The purpose of the OnClose handler is to
84+
// gracefully close the connection, e.g. to send sign-off message.
85+
OnCloseCtx func(ctx context.Context, c *Connection) error
7186

7287
// If both OnClose and OnCloseCtx are set, OnCloseCtx will be used
7388
// OnClose is called synchronously before a connection is closed
7489
OnClose func(c *Connection) error
7590

76-
// OnCloseCtx is called synchronously before a connection is closed
77-
OnCloseCtx func(ctx context.Context, c *Connection) error
78-
7991
// RequestIDGenerator is used to generate a unique identifier for a request
8092
// so that responses from the server can be matched to the original request.
8193
RequestIDGenerator RequestIDGenerator
@@ -159,10 +171,26 @@ func PingHandler(handler func(c *Connection)) Option {
159171
}
160172
}
161173

174+
// WithConnectionClosedHandler sets a ConnectionClosedHandler option.
175+
func WithConnectionClosedHandler(handler func(c *Connection)) Option {
176+
return func(o *Options) error {
177+
o.ConnectionClosedHandlers = append(o.ConnectionClosedHandlers, handler)
178+
return nil
179+
}
180+
}
181+
162182
// ConnectionClosedHandler sets a ConnectionClosedHandler option
163183
func ConnectionClosedHandler(handler func(c *Connection)) Option {
184+
return WithConnectionClosedHandler(handler)
185+
}
186+
187+
// ConnectionFailedHandler is a function that will be called when a connection fails due to network issues
188+
type ConnectionFailedHandler func(*Connection, error)
189+
190+
// WithConnectionFailedHandler sets a ConnectionFailedHandler option. Each handler will be called
191+
func WithConnectionFailedHandler(handler ConnectionFailedHandler) Option {
164192
return func(o *Options) error {
165-
o.ConnectionClosedHandlers = append(o.ConnectionClosedHandlers, handler)
193+
o.ConnectionFailedHandlers = append(o.ConnectionFailedHandlers, handler)
166194
return nil
167195
}
168196
}

0 commit comments

Comments
 (0)