Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add downgrade cancellation e2e tests #19252

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 75 additions & 18 deletions tests/e2e/cluster_downgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ package e2e
import (
"context"
"fmt"
"math/rand"
"testing"
"time"

"github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/client/pkg/v3/fileutil"
Expand All @@ -43,41 +45,66 @@ const (
noCancellation CancellationState = iota
cancelRightBeforeEnable
cancelRightAfterEnable
cancelAfterDowngrading
)

func TestDowngradeUpgradeClusterOf1(t *testing.T) {
testDowngradeUpgrade(t, 1, false, noCancellation)
testDowngradeUpgrade(t, 1, 1, false, noCancellation)
}

func TestDowngradeUpgradeClusterOf3(t *testing.T) {
testDowngradeUpgrade(t, 3, false, noCancellation)
testDowngradeUpgrade(t, 3, 3, false, noCancellation)
}

func TestDowngradeUpgradeClusterOf1WithSnapshot(t *testing.T) {
testDowngradeUpgrade(t, 1, true, noCancellation)
testDowngradeUpgrade(t, 1, 1, true, noCancellation)
}

func TestDowngradeUpgradeClusterOf3WithSnapshot(t *testing.T) {
testDowngradeUpgrade(t, 3, true, noCancellation)
testDowngradeUpgrade(t, 3, 3, true, noCancellation)
}

func TestDowngradeCancellationWithoutEnablingClusterOf1(t *testing.T) {
testDowngradeUpgrade(t, 1, false, cancelRightBeforeEnable)
testDowngradeUpgrade(t, 0, 1, false, cancelRightBeforeEnable)
}

func TestDowngradeCancellationRightAfterEnablingClusterOf1(t *testing.T) {
testDowngradeUpgrade(t, 1, false, cancelRightAfterEnable)
testDowngradeUpgrade(t, 0, 1, false, cancelRightAfterEnable)
}

func TestDowngradeCancellationWithoutEnablingClusterOf3(t *testing.T) {
testDowngradeUpgrade(t, 3, false, cancelRightBeforeEnable)
testDowngradeUpgrade(t, 0, 3, false, cancelRightBeforeEnable)
}

func TestDowngradeCancellationRightAfterEnablingClusterOf3(t *testing.T) {
testDowngradeUpgrade(t, 3, false, cancelRightAfterEnable)
testDowngradeUpgrade(t, 0, 3, false, cancelRightAfterEnable)
}

func testDowngradeUpgrade(t *testing.T, clusterSize int, triggerSnapshot bool, triggerCancellation CancellationState) {
func TestDowngradeCancellationAfterDowngrading1InClusterOf3(t *testing.T) {
testDowngradeUpgrade(t, 1, 3, false, cancelAfterDowngrading)
}

func TestDowngradeCancellationAfterDowngrading2InClusterOf3(t *testing.T) {
testDowngradeUpgrade(t, 2, 3, false, cancelAfterDowngrading)
}

func TestDowngradeCancellationAfterDowngrading1InClusterOf5(t *testing.T) {
testDowngradeUpgrade(t, 1, 5, false, cancelAfterDowngrading)
}

func TestDowngradeCancellationAfterDowngrading2InClusterOf5(t *testing.T) {
testDowngradeUpgrade(t, 2, 5, false, cancelAfterDowngrading)
}

func TestDowngradeCancellationAfterDowngrading3InClusterOf5(t *testing.T) {
testDowngradeUpgrade(t, 3, 5, false, cancelAfterDowngrading)
}

func TestDowngradeCancellationAfterDowngrading4InClusterOf5(t *testing.T) {
testDowngradeUpgrade(t, 4, 5, false, cancelAfterDowngrading)
}

func testDowngradeUpgrade(t *testing.T, numberOfMembersToDowngrade int, clusterSize int, triggerSnapshot bool, triggerCancellation CancellationState) {
currentEtcdBinary := e2e.BinPath.Etcd
lastReleaseBinary := e2e.BinPath.EtcdLastRelease
if !fileutil.Exist(lastReleaseBinary) {
Expand Down Expand Up @@ -135,7 +162,7 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int, triggerSnapshot bool, t
t.Logf("Cancelling downgrade before enabling")
e2e.DowngradeCancel(t, epc)
Copy link
Contributor Author

@henrybear327 henrybear327 Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I was running e2e tests locally, the DowngradeCancel call times out (Error: etcdserver: request timed out) quite often, esp. with more nodes downgraded in the cluster.

Reference on CI: https://prow.k8s.io/view/gs/kubernetes-ci-logs/pr-logs/pull/etcd-io_etcd/19252/pull-etcd-e2e-amd64/1884558107229556736

t.Log("Downgrade cancelled, validating if cluster is in the right state")
e2e.ValidateMemberVersions(t, epc, generateIdenticalVersions(clusterSize, currentVersionStr))
e2e.ValidateMemberVersions(t, epc, generateIdenticalVersions(clusterSize, currentVersion))

return // No need to perform downgrading, end the test here
}
Expand All @@ -144,14 +171,19 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int, triggerSnapshot bool, t
t.Logf("Cancelling downgrade right after enabling (no node is downgraded yet)")
e2e.DowngradeCancel(t, epc)
t.Log("Downgrade cancelled, validating if cluster is in the right state")
e2e.ValidateMemberVersions(t, epc, generateIdenticalVersions(clusterSize, currentVersionStr))
e2e.ValidateMemberVersions(t, epc, generateIdenticalVersions(clusterSize, currentVersion))
return // No need to perform downgrading, end the test here
}

membersToChange := rand.Perm(len(epc.Procs))[:numberOfMembersToDowngrade]
t.Logf(fmt.Sprintln("Elect members for operations"), zap.Any("members", membersToChange))

t.Logf("Starting downgrade process to %q", lastVersionStr)
err = e2e.DowngradeUpgradeMembers(t, nil, epc, len(epc.Procs), currentVersion, lastClusterVersion)
err = e2e.DowngradeUpgradeMembersByID(t, nil, epc, membersToChange, currentVersion, lastClusterVersion)
require.NoError(t, err)
e2e.AssertProcessLogs(t, leader(t, epc), "the cluster has been downgraded")
if len(membersToChange) == len(epc.Procs) {
e2e.AssertProcessLogs(t, leader(t, epc), "the cluster has been downgraded")
}

t.Log("Downgrade complete")
afterMembers, afterKV := getMembersAndKeys(t, cc)
Expand All @@ -162,6 +194,13 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int, triggerSnapshot bool, t
t.Log("Waiting health interval to required to make membership changes")
time.Sleep(etcdserver.HealthInterval)
}

if triggerCancellation == cancelAfterDowngrading {
e2e.DowngradeCancel(t, epc)
t.Log("Downgrade cancelled, validating if cluster is in the right state")
e2e.ValidateMemberVersions(t, epc, generatePartialCancellationVersions(clusterSize, membersToChange, lastClusterVersion))
}

t.Log("Adding learner to test membership, but avoid breaking quorum")
resp, err = cc.MemberAddAsLearner(context.Background(), "fake2", []string{"http://127.0.0.1:1002"})
require.NoError(t, err)
Expand All @@ -176,7 +215,7 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int, triggerSnapshot bool, t
beforeMembers, beforeKV = getMembersAndKeys(t, cc)

t.Logf("Starting upgrade process to %q", currentVersionStr)
err = e2e.DowngradeUpgradeMembers(t, nil, epc, len(epc.Procs), lastClusterVersion, currentVersion)
err = e2e.DowngradeUpgradeMembersByID(t, nil, epc, membersToChange, lastClusterVersion, currentVersion)
henrybear327 marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)
t.Log("Upgrade complete")

Expand Down Expand Up @@ -276,16 +315,34 @@ func getMembersAndKeys(t *testing.T, cc *e2e.EtcdctlV3) (*clientv3.MemberListRes
return members, kvs
}

func generateIdenticalVersions(clusterSize int, currentVersion string) []*version.Versions {
func generateIdenticalVersions(clusterSize int, ver *semver.Version) []*version.Versions {
ret := make([]*version.Versions, clusterSize)

for i := range clusterSize {
ret[i] = &version.Versions{
Cluster: currentVersion,
Server: currentVersion,
Storage: currentVersion,
Cluster: ver.String(),
Server: ver.String(),
Storage: ver.String(),
}
}

return ret
}

func generatePartialCancellationVersions(clusterSize int, membersToChange []int, ver *semver.Version) []*version.Versions {
ret := make([]*version.Versions, clusterSize)

for i := range clusterSize {
ret[i] = &version.Versions{
Cluster: ver.String(),
Server: e2e.OffsetMinor(ver, 1).String(),
Storage: "",
henrybear327 marked this conversation as resolved.
Show resolved Hide resolved
}
}

for i := range membersToChange {
ret[membersToChange[i]].Server = ver.String()
}

return ret
}
17 changes: 11 additions & 6 deletions tests/framework/e2e/downgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func DowngradeEnable(t *testing.T, epc *EtcdProcessCluster, ver *semver.Version)
for i := 0; i < len(epc.Procs); i++ {
ValidateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{
Cluster: ver.String(),
Server: offsetMinor(ver, 1).String(),
Server: OffsetMinor(ver, 1).String(),
Storage: ver.String(),
})
AssertProcessLogs(t, epc.Procs[i], "The server is ready to downgrade")
Expand All @@ -65,6 +65,13 @@ func DowngradeCancel(t *testing.T, epc *EtcdProcessCluster) {
}

func DowngradeUpgradeMembers(t *testing.T, lg *zap.Logger, clus *EtcdProcessCluster, numberOfMembersToChange int, currentVersion, targetVersion *semver.Version) error {
membersToChange := rand.Perm(len(clus.Procs))[:numberOfMembersToChange]
t.Logf(fmt.Sprintln("Elect members for operations"), zap.Any("members", membersToChange))

return DowngradeUpgradeMembersByID(t, lg, clus, membersToChange, currentVersion, targetVersion)
}

func DowngradeUpgradeMembersByID(t *testing.T, lg *zap.Logger, clus *EtcdProcessCluster, membersToChange []int, currentVersion, targetVersion *semver.Version) error {
henrybear327 marked this conversation as resolved.
Show resolved Hide resolved
if lg == nil {
lg = clus.lg
}
Expand All @@ -75,8 +82,6 @@ func DowngradeUpgradeMembers(t *testing.T, lg *zap.Logger, clus *EtcdProcessClus
opString = "downgrading"
newExecPath = BinPath.EtcdLastRelease
}
membersToChange := rand.Perm(len(clus.Procs))[:numberOfMembersToChange]
lg.Info(fmt.Sprintf("Test %s members", opString), zap.Any("members", membersToChange))

// Need to wait health interval for cluster to prepare for downgrade/upgrade
time.Sleep(etcdserver.HealthInterval)
Expand All @@ -100,7 +105,7 @@ func DowngradeUpgradeMembers(t *testing.T, lg *zap.Logger, clus *EtcdProcessClus
lg.Info("Validating versions")
for _, memberID := range membersToChange {
member := clus.Procs[memberID]
if isDowngrade || numberOfMembersToChange == len(clus.Procs) {
if isDowngrade || len(membersToChange) == len(clus.Procs) {
ValidateVersion(t, clus.Cfg, member, version.Versions{
Cluster: targetVersion.String(),
Server: targetVersion.String(),
Expand Down Expand Up @@ -143,8 +148,8 @@ func ValidateVersion(t *testing.T, cfg *EtcdProcessClusterConfig, member EtcdPro
})
}

// offsetMinor returns the version with offset from the original minor, with the same major.
func offsetMinor(v *semver.Version, offset int) *semver.Version {
// OffsetMinor returns the version with offset from the original minor, with the same major.
func OffsetMinor(v *semver.Version, offset int) *semver.Version {
var minor int64
if offset >= 0 {
minor = v.Minor + int64(offset)
Expand Down