Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit a805a43

Browse files
committedMar 5, 2025··
GODRIVER-3173 Complete pending reads on conn checkout
1 parent ee212da commit a805a43

File tree

12 files changed

+1388
-273
lines changed

12 files changed

+1388
-273
lines changed
 

‎event/monitoring.go

+19-14
Original file line numberDiff line numberDiff line change
@@ -75,17 +75,20 @@ const (
7575

7676
// strings for pool command monitoring types
7777
const (
78-
ConnectionPoolCreated = "ConnectionPoolCreated"
79-
ConnectionPoolReady = "ConnectionPoolReady"
80-
ConnectionPoolCleared = "ConnectionPoolCleared"
81-
ConnectionPoolClosed = "ConnectionPoolClosed"
82-
ConnectionCreated = "ConnectionCreated"
83-
ConnectionReady = "ConnectionReady"
84-
ConnectionClosed = "ConnectionClosed"
85-
ConnectionCheckOutStarted = "ConnectionCheckOutStarted"
86-
ConnectionCheckOutFailed = "ConnectionCheckOutFailed"
87-
ConnectionCheckedOut = "ConnectionCheckedOut"
88-
ConnectionCheckedIn = "ConnectionCheckedIn"
78+
ConnectionPoolCreated = "ConnectionPoolCreated"
79+
ConnectionPoolReady = "ConnectionPoolReady"
80+
ConnectionPoolCleared = "ConnectionPoolCleared"
81+
ConnectionPoolClosed = "ConnectionPoolClosed"
82+
ConnectionCreated = "ConnectionCreated"
83+
ConnectionReady = "ConnectionReady"
84+
ConnectionClosed = "ConnectionClosed"
85+
ConnectionCheckOutStarted = "ConnectionCheckOutStarted"
86+
ConnectionCheckOutFailed = "ConnectionCheckOutFailed"
87+
ConnectionCheckedOut = "ConnectionCheckedOut"
88+
ConnectionCheckedIn = "ConnectionCheckedIn"
89+
ConnectionPendingReadStarted = "ConnectionPendingReadStarted"
90+
ConnectionPendingReadSucceeded = "ConnectionPendingReadSucceeded"
91+
ConnectionPendingReadFailed = "ConnectionPendingReadFailed"
8992
)
9093

9194
// MonitorPoolOptions contains pool options as formatted in pool events
@@ -105,9 +108,11 @@ type PoolEvent struct {
105108
Reason string `json:"reason"`
106109
// ServiceID is only set if the Type is PoolCleared and the server is deployed behind a load balancer. This field
107110
// can be used to distinguish between individual servers in a load balanced deployment.
108-
ServiceID *bson.ObjectID `json:"serviceId"`
109-
Interruption bool `json:"interruptInUseConnections"`
110-
Error error `json:"error"`
111+
ServiceID *bson.ObjectID `json:"serviceId"`
112+
Interruption bool `json:"interruptInUseConnections"`
113+
Error error `json:"error"`
114+
RequestID int32 `json:"requestId"`
115+
RemainingTime time.Duration `json:"remainingTime"`
111116
}
112117

113118
// PoolMonitor is a function that allows the user to gain access to events occurring in the pool

‎internal/driverutil/context.go

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// Copyright (C) MongoDB, Inc. 2025-present.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
// not use this file except in compliance with the License. You may obtain
5+
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
7+
package driverutil
8+
9+
import "context"
10+
11+
// ContextKey is a custom type used for the keys in context values to avoid
12+
// collisions.
13+
type ContextKey string
14+
15+
const (
16+
// ContextKeyHasMaxTimeMS represents a boolean value that indicates if
17+
// maxTimeMS will be set on the wire message for an operation.
18+
ContextKeyHasMaxTimeMS ContextKey = "hasMaxTimeMS"
19+
20+
// ContextKeyRequestID is the requestID for a given operation. This is used to
21+
// propagate the requestID for a pending read during connection check out.
22+
ContextKeyRequestID ContextKey = "requestID"
23+
)
24+
25+
// WithValueHasMaxTimeMS returns a copy of the parent context with an added
26+
// value indicating whether an operation will append maxTimeMS to the wire
27+
// message.
28+
func WithValueHasMaxTimeMS(parentCtx context.Context, val bool) context.Context {
29+
return context.WithValue(parentCtx, ContextKeyHasMaxTimeMS, val)
30+
}
31+
32+
// WithRequestID returns a copy of the parent context with an added request ID
33+
// value.
34+
func WithRequestID(parentCtx context.Context, requestID int32) context.Context {
35+
return context.WithValue(parentCtx, ContextKeyRequestID, requestID)
36+
}
37+
38+
// HasMaxTimeMS checks if the context is for an operation that will append
39+
// maxTimeMS to the wire message.
40+
func HasMaxTimeMS(ctx context.Context) bool {
41+
return ctx.Value(ContextKeyHasMaxTimeMS) != nil
42+
}
43+
44+
// GetRequestID retrieves the request ID from the context if it exists.
45+
func GetRequestID(ctx context.Context) (int32, bool) {
46+
val, ok := ctx.Value(ContextKeyRequestID).(int32)
47+
48+
return val, ok
49+
}

‎internal/integration/client_test.go

+38-20
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"os"
1414
"reflect"
1515
"strings"
16+
"sync"
1617
"testing"
1718
"time"
1819

@@ -675,9 +676,9 @@ func TestClient(t *testing.T) {
675676
},
676677
}
677678

679+
_, err := mt.Coll.InsertOne(context.Background(), bson.D{})
678680
for _, tc := range testCases {
679681
mt.Run(tc.desc, func(mt *mtest.T) {
680-
_, err := mt.Coll.InsertOne(context.Background(), bson.D{})
681682
require.NoError(mt, err)
682683

683684
mt.SetFailPoint(failpoint.FailPoint{
@@ -692,30 +693,47 @@ func TestClient(t *testing.T) {
692693

693694
mt.ClearEvents()
694695

696+
wg := sync.WaitGroup{}
697+
wg.Add(50)
698+
695699
for i := 0; i < 50; i++ {
696-
// Run 50 operations, each with a timeout of 50ms. Expect
700+
// Run 50 concurrent operations, each with a timeout of 50ms. Expect
697701
// them to all return a timeout error because the failpoint
698-
// blocks find operations for 500ms. Run 50 to increase the
702+
// blocks find operations for 50ms. Run 50 to increase the
699703
// probability that an operation will time out in a way that
700704
// can cause a retry.
701-
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
702-
err = tc.operation(ctx, mt.Coll)
703-
cancel()
704-
assert.ErrorIs(mt, err, context.DeadlineExceeded)
705-
assert.True(mt, mongo.IsTimeout(err), "expected mongo.IsTimeout(err) to be true")
706-
707-
// Assert that each operation reported exactly one command
708-
// started events, which means the operation did not retry
709-
// after the context timeout.
710-
evts := mt.GetAllStartedEvents()
711-
require.Len(mt,
712-
mt.GetAllStartedEvents(),
713-
1,
714-
"expected exactly 1 command started event per operation, but got %d after %d iterations",
715-
len(evts),
716-
i)
717-
mt.ClearEvents()
705+
go func() {
706+
ctx, cancel := context.WithTimeout(context.Background(), 150*time.Millisecond)
707+
err := tc.operation(ctx, mt.Coll)
708+
cancel()
709+
assert.ErrorIs(mt, err, context.DeadlineExceeded)
710+
assert.True(mt, mongo.IsTimeout(err), "expected mongo.IsTimeout(err) to be true")
711+
712+
wg.Done()
713+
}()
718714
}
715+
716+
wg.Wait()
717+
718+
// Since an operation requires checking out a connection and because we
719+
// attempt a pending read for socket timeouts and since the test forces
720+
// 50 concurrent socket timeouts, then it's possible that an
721+
// operation checks out a connection that has a pending read. In this
722+
// case the operation will time out when checking out a connection, and
723+
// a started event will not be propagated. So instead of
724+
// checking that we got exactly 50 started events, we should instead
725+
// ensure that the number of started events is equal to the number of
726+
// unique connections used to process the operations.
727+
pendingReadConns := mt.NumberConnectionsPendingReadStarted()
728+
evts := mt.GetAllStartedEvents()
729+
730+
require.Equal(mt,
731+
len(evts)+pendingReadConns,
732+
50,
733+
"expected exactly 1 command started event per operation (50), but got %d",
734+
len(evts)+pendingReadConns)
735+
mt.ClearEvents()
736+
mt.ClearFailPoints()
719737
})
720738
}
721739
})

0 commit comments

Comments
 (0)
Please sign in to comment.