Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.4] tests/e2e: backport WaitLeader #17398

Merged
merged 1 commit into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions tests/e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package e2e

import (
"context"
"fmt"
"io/ioutil"
"net/url"
Expand Down Expand Up @@ -501,3 +502,71 @@ func (epc *etcdProcessCluster) WithStopSignal(sig os.Signal) (ret os.Signal) {
}
return ret
}

// WaitLeader returns index of the member in c.Members() that is leader
// or fails the test (if not established in 30s).
func (epc *etcdProcessCluster) WaitLeader(t testing.TB) int {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return epc.WaitMembersForLeader(ctx, t, epc.procs)
}

// WaitMembersForLeader waits until given members agree on the same leader,
// and returns its 'index' in the 'membs' list
func (epc *etcdProcessCluster) WaitMembersForLeader(ctx context.Context, t testing.TB, membs []etcdProcess) int {
cc := NewEtcdctl(epc.EndpointsV3(), epc.cfg.clientTLS, epc.cfg.isClientAutoTLS, epc.cfg.enableV2)

// ensure leader is up via linearizable get
for {
select {
case <-ctx.Done():
t.Fatal("WaitMembersForLeader timeout")
default:
}
_, err := cc.Get("0")
if err == nil || strings.Contains(err.Error(), "Key not found") {
break
}
t.Logf("WaitMembersForLeader Get err: %v", err)
}

leaders := make(map[uint64]struct{})
members := make(map[uint64]int)
for {
select {
case <-ctx.Done():
t.Fatal("WaitMembersForLeader timeout")
default:
}
for i := range membs {
resp, err := membs[i].Etcdctl(epc.cfg.clientTLS, epc.cfg.isClientAutoTLS, epc.cfg.enableV2).Status()
if err != nil {
if strings.Contains(err.Error(), "connection refused") {
// if member[i] has stopped
continue
} else {
t.Fatal(err)
}
}
members[resp[0].Header.MemberId] = i
leaders[resp[0].Leader] = struct{}{}
}
// members agree on the same leader
if len(leaders) == 1 {
break
}
leaders = make(map[uint64]struct{})
members = make(map[uint64]int)
// From main branch 10 * config.TickDuration (10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
}
for l := range leaders {
if index, ok := members[l]; ok {
t.Logf("members agree on a leader, members:%v , leader:%v", members, l)
return index
}
t.Fatalf("members agree on a leader which is not one of members, members:%v , leader:%v", members, l)
}
t.Fatal("impossible path of execution")
return -1
}
6 changes: 6 additions & 0 deletions tests/e2e/etcd_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ type etcdProcess interface {
PeerProxy() proxy.Server
Failpoints() *BinaryFailpoints
IsRunning() bool

Etcdctl(connType clientConnType, isAutoTLS bool, v2 bool) *Etcdctl
}

type logsExpect interface {
Expand Down Expand Up @@ -223,6 +225,10 @@ func (ep *etcdServerProcess) IsRunning() bool {
return false
}

func (ep *etcdServerProcess) Etcdctl(connType clientConnType, isAutoTLS, v2 bool) *Etcdctl {
return NewEtcdctl(ep.EndpointsV3(), connType, isAutoTLS, v2)
}

type BinaryFailpoints struct {
member etcdProcess
availableCache map[string]string
Expand Down
16 changes: 16 additions & 0 deletions tests/e2e/etcdctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,22 @@ func (ctl *Etcdctl) Compact(rev int64) (*clientv3.CompactResponse, error) {
return nil, spawnWithExpect(args, fmt.Sprintf("compacted revision %v", rev))
}

func (ctl *Etcdctl) Status() ([]*clientv3.StatusResponse, error) {
var epStatus []*struct {
Endpoint string
Status *clientv3.StatusResponse
}
err := ctl.spawnJsonCmd(&epStatus, "endpoint", "status")
if err != nil {
return nil, err
}
resp := make([]*clientv3.StatusResponse, len(epStatus))
for i, e := range epStatus {
resp[i] = e.Status
}
return resp, err
}

func (ctl *Etcdctl) spawnJsonCmd(output interface{}, expectedOutput string, args ...string) error {
args = append(args, "-w", "json")
cmd, err := spawnCmd(append(ctl.cmdArgs(), args...))
Expand Down
Loading