Skip to content

Commit a11640f

Browse files
committed
Endless query execution fix proposal
1 parent c75ff5f commit a11640f

File tree

3 files changed

+76
-2
lines changed

3 files changed

+76
-2
lines changed

cassandra_test.go

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ import (
4444
"time"
4545
"unicode"
4646

47-
inf "gopkg.in/inf.v0"
47+
"gopkg.in/inf.v0"
4848

4949
"github.com/stretchr/testify/require"
5050
)
@@ -3955,3 +3955,71 @@ func TestRoutingKeyCacheUsesOverriddenKeyspace(t *testing.T) {
39553955

39563956
session.Query("DROP KEYSPACE IF EXISTS gocql_test_routing_key_cache").Exec()
39573957
}
3958+
3959+
func DownedHostSelectionPolicy() HostSelectionPolicy {
3960+
return &downedHostSelectionPolicy{}
3961+
//return &roundRobinHostPolicy{}
3962+
}
3963+
3964+
type downedHostSelectionPolicy struct {
3965+
hosts cowHostList
3966+
lastUsedHostIdx uint64
3967+
}
3968+
3969+
func (r *downedHostSelectionPolicy) IsLocal(*HostInfo) bool { return true }
3970+
func (r *downedHostSelectionPolicy) KeyspaceChanged(KeyspaceUpdateEvent) {}
3971+
func (r *downedHostSelectionPolicy) SetPartitioner(partitioner string) {}
3972+
func (r *downedHostSelectionPolicy) Init(*Session) {}
3973+
3974+
func (r *downedHostSelectionPolicy) Pick(qry ExecutableQuery) NextHost {
3975+
return dummyPolicy()
3976+
}
3977+
3978+
func dummyPolicy() NextHost {
3979+
return func() SelectedHost {
3980+
h := &HostInfo{
3981+
state: NodeDown,
3982+
connectAddress: net.ParseIP("172.30.0.2"),
3983+
}
3984+
return (*selectedHost)(h)
3985+
3986+
}
3987+
}
3988+
3989+
func (r *downedHostSelectionPolicy) AddHost(host *HostInfo) {
3990+
r.hosts.add(host)
3991+
}
3992+
3993+
func (r *downedHostSelectionPolicy) RemoveHost(host *HostInfo) {
3994+
r.hosts.remove(host.ConnectAddress())
3995+
}
3996+
3997+
func (r *downedHostSelectionPolicy) HostUp(host *HostInfo) {
3998+
r.AddHost(host)
3999+
}
4000+
4001+
func (r *downedHostSelectionPolicy) HostDown(host *HostInfo) {
4002+
r.RemoveHost(host)
4003+
}
4004+
4005+
func TestQuery_Do(t *testing.T) {
4006+
cluster := createCluster()
4007+
session := createSessionFromCluster(cluster, t)
4008+
defer session.Close()
4009+
4010+
if err := createTable(session, `CREATE TABLE gocql_test.policy (id int primary key)`); err != nil {
4011+
t.Fatal(err)
4012+
}
4013+
4014+
session.executor.policy = DownedHostSelectionPolicy()
4015+
err := session.Query("select * from gocql_test.policy").Exec()
4016+
if err != ErrNoConnections {
4017+
t.Fatalf("expected ErrNoConnections, got %v", err)
4018+
}
4019+
4020+
sp := &SimpleSpeculativeExecution{NumAttempts: 3, TimeoutDelay: 200 * time.Millisecond}
4021+
err = session.Query("select * from gocql_test.policy").SetSpeculativeExecutionPolicy(sp).Idempotent(true).Exec()
4022+
if err == nil {
4023+
t.Fatalf("expected ErrNoConnections, got %v", err)
4024+
}
4025+
}

policies.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ func (host *selectedHost) Info() *HostInfo {
323323
func (host *selectedHost) Mark(err error) {}
324324

325325
// NextHost is an iteration function over picked hosts
326+
// Should return nil if SelectedHost is not Up to prevent endless query execution.
326327
type NextHost func() SelectedHost
327328

328329
// RoundRobinHostPolicy is a round-robin load balancing policy, where each host

query_executor.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,12 +150,17 @@ func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
150150
func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery, hostIter NextHost) *Iter {
151151
selectedHost := hostIter()
152152
rt := qry.retryPolicy()
153+
count := 0
154+
q.pool.mu.Lock()
155+
threshold := len(q.pool.hostConnPools)
156+
q.pool.mu.Unlock()
153157

154158
var lastErr error
155159
var iter *Iter
156-
for selectedHost != nil {
160+
for selectedHost != nil && count < threshold {
157161
host := selectedHost.Info()
158162
if host == nil || !host.IsUp() {
163+
count++
159164
selectedHost = hostIter()
160165
continue
161166
}

0 commit comments

Comments
 (0)