Skip to content

Commit

Permalink
add logs for etcd_process
Browse files Browse the repository at this point in the history
  • Loading branch information
tjungblu committed Jun 9, 2023
1 parent 8de543d commit aeaf52b
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 31 deletions.
37 changes: 37 additions & 0 deletions server/lease/lessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"

Expand Down Expand Up @@ -396,6 +397,42 @@ func TestLessorDetach(t *testing.T) {
}
}

func TestLessorGrantAttachRevokeDetach(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
defer be.Close()

le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })

// grant a lease with long term (100 seconds) to
// avoid early termination during the test.
l, err := le.Grant(1, 100)
if err != nil {
t.Fatalf("could not grant lease for 100s ttl (%v)", err)
}

items := []LeaseItem{
{"foo"},
{"bar"},
}

if err := le.Attach(l.ID, items); err != nil {
t.Fatalf("failed to attach items to the lease: %v", err)
}

if err := le.Revoke(l.ID); err != nil {
t.Fatalf("failed to revoke lease: %v", err)
}

err = le.Detach(l.ID, items[0:1])
assert.Equal(t, ErrLeaseNotFound, err)
assert.Equal(t, 0, len(le.leaseMap))
assert.Equal(t, 2, len(le.itemMap))
}

// TestLessorRecover ensures Lessor recovers leases from
// persist backend.
func TestLessorRecover(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ type EtcdProcessClusterConfig struct {
DiscoveryEndpoints []string // v3 discovery
DiscoveryToken string
LogLevel string
SaveLogsOnStop bool

MaxConcurrentStreams uint32 // default is math.MaxUint32
CorruptCheckTime time.Duration
Expand Down Expand Up @@ -349,6 +350,10 @@ func WithPeerProxy(enabled bool) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.PeerProxy = enabled }
}

func WithEnableSaveLogOnStop(enable bool) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.SaveLogsOnStop = enable }
}

// NewEtcdProcessCluster launches a new cluster from etcd processes, returning
// a new EtcdProcessCluster once all nodes are ready to accept client requests.
func NewEtcdProcessCluster(ctx context.Context, t testing.TB, opts ...EPClusterOption) (*EtcdProcessCluster, error) {
Expand Down Expand Up @@ -645,6 +650,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
InitialToken: cfg.InitialToken,
GoFailPort: gofailPort,
Proxy: proxyCfg,
SaveLogOnStop: cfg.SaveLogsOnStop,
}
}

Expand Down
27 changes: 22 additions & 5 deletions tests/framework/e2e/etcd_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type EtcdProcess interface {
PeerProxy() proxy.Server
Failpoints() *BinaryFailpoints
Logs() LogsExpect
LogLines() []string
Kill() error
}

Expand All @@ -67,11 +68,12 @@ type LogsExpect interface {
}

type EtcdServerProcess struct {
cfg *EtcdServerProcessConfig
proc *expect.ExpectProcess
proxy proxy.Server
failpoints *BinaryFailpoints
donec chan struct{} // closed when Interact() terminates
cfg *EtcdServerProcessConfig
proc *expect.ExpectProcess
proxy proxy.Server
failpoints *BinaryFailpoints
donec chan struct{} // closed when Interact() terminates
savedLogLines []string // when SaveLogOnStop=true, it will save all logs from the process here
}

type EtcdServerProcessConfig struct {
Expand All @@ -95,6 +97,7 @@ type EtcdServerProcessConfig struct {
InitialToken string
InitialCluster string
GoFailPort int
SaveLogOnStop bool

Proxy *proxy.ServerConfig
}
Expand Down Expand Up @@ -177,6 +180,9 @@ func (ep *EtcdServerProcess) Stop() (err error) {
return nil
}
defer func() {
if ep.cfg.SaveLogOnStop {
ep.savedLogLines = ep.proc.Lines()
}
ep.proc = nil
}()

Expand Down Expand Up @@ -235,6 +241,17 @@ func (ep *EtcdServerProcess) Logs() LogsExpect {
return ep.proc
}

func (ep *EtcdServerProcess) LogLines() []string {
if ep.proc == nil {
if ep.cfg.SaveLogOnStop {
return ep.savedLogLines
}

ep.cfg.lg.Panic("Please grab logs before process is stopped or enable SaveLogOnStop")
}
return ep.proc.Lines()
}

func (ep *EtcdServerProcess) Kill() error {
ep.cfg.lg.Info("killing server...", zap.String("name", ep.cfg.Name))
return ep.proc.Signal(syscall.SIGKILL)
Expand Down
56 changes: 30 additions & 26 deletions tests/robustness/linearizability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ func TestRobustness(t *testing.T) {
name: "ClusterOfSize1/" + traffic.Name,
failpoint: RandomFailpoint,
traffic: traffic,
cluster: *e2e.NewConfig(
clusterOpts: []e2e.EPClusterOption{
e2e.WithClusterSize(1),
e2e.WithSnapshotCount(100),
e2e.WithGoFailEnabled(true),
e2e.WithCompactionBatchLimit(100), // required for compactBeforeCommitBatch and compactAfterCommitBatch failpoints
e2e.WithWatchProcessNotifyInterval(100*time.Millisecond),
),
e2e.WithWatchProcessNotifyInterval(100 * time.Millisecond),
},
})
clusterOfSize3Options := []e2e.EPClusterOption{
e2e.WithIsPeerTLS(true),
Expand All @@ -68,61 +68,62 @@ func TestRobustness(t *testing.T) {
watch: watchConfig{
expectUniqueRevision: traffic.Traffic.ExpectUniqueRevision(),
},
cluster: *e2e.NewConfig(clusterOfSize3Options...),
clusterOpts: clusterOfSize3Options,
})
}
scenarios = append(scenarios, testScenario{
name: "Issue14370",
failpoint: RaftBeforeSavePanic,
cluster: *e2e.NewConfig(
clusterOpts: []e2e.EPClusterOption{
e2e.WithClusterSize(1),
e2e.WithGoFailEnabled(true),
),
},
})
scenarios = append(scenarios, testScenario{
name: "Issue14685",
failpoint: DefragBeforeCopyPanic,
cluster: *e2e.NewConfig(
clusterOpts: []e2e.EPClusterOption{
e2e.WithClusterSize(1),
e2e.WithGoFailEnabled(true),
),
},
})
scenarios = append(scenarios, testScenario{
name: "Issue13766",
failpoint: KillFailpoint,
traffic: traffic.HighTraffic,
cluster: *e2e.NewConfig(
e2e.WithSnapshotCount(100),
),
name: "Issue13766",
failpoint: KillFailpoint,
traffic: traffic.HighTraffic,
clusterOpts: []e2e.EPClusterOption{e2e.WithSnapshotCount(100)},
})
scenarios = append(scenarios, testScenario{
name: "Issue15220",
failpoint: RandomFailpoint,
watch: watchConfig{
requestProgress: true,
},
cluster: *e2e.NewConfig(
e2e.WithClusterSize(1),
),
clusterOpts: []e2e.EPClusterOption{e2e.WithClusterSize(1)},
})
scenarios = append(scenarios, testScenario{
name: "IssueXYZ",
failpoint: PutReturnErrNoSpace,
traffic: traffic.KubernetesTraffic,
cluster: *e2e.NewConfig(
clusterOpts: []e2e.EPClusterOption{
e2e.WithIsPeerTLS(true),
e2e.WithSnapshotCount(100),
e2e.WithPeerProxy(true),
e2e.WithGoFailEnabled(true),
),
e2e.WithWatchProcessNotifyInterval(100 * time.Millisecond),
e2e.WithLogLevel("debug"),
},
})
if v.Compare(version.V3_5) >= 0 {
scenarios = append(scenarios, testScenario{
name: "Issue15271",
failpoint: BlackholeUntilSnapshot,
traffic: traffic.HighTraffic,
cluster: *e2e.NewConfig(
clusterOpts: []e2e.EPClusterOption{
e2e.WithSnapshotCount(100),
e2e.WithPeerProxy(true),
e2e.WithIsPeerTLS(true),
),
},
})
}
for _, scenario := range scenarios {
Expand All @@ -133,18 +134,21 @@ func TestRobustness(t *testing.T) {
t.Run(scenario.name, func(t *testing.T) {
lg := zaptest.NewLogger(t)
scenario.cluster.Logger = lg
scenario.clusterOpts = append(scenario.clusterOpts, e2e.WithEnableSaveLogOnStop(true))
scenario.cluster = *e2e.NewConfig(scenario.clusterOpts...)
ctx := context.Background()
testRobustness(ctx, t, lg, scenario)
})
}
}

type testScenario struct {
name string
failpoint Failpoint
cluster e2e.EtcdProcessClusterConfig
traffic traffic.Config
watch watchConfig
name string
failpoint Failpoint
clusterOpts []e2e.EPClusterOption
cluster e2e.EtcdProcessClusterConfig
traffic traffic.Config
watch watchConfig
}

func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testScenario) {
Expand Down
10 changes: 10 additions & 0 deletions tests/robustness/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ func (r *report) Report(t *testing.T, force bool) {
for _, member := range r.clus.Procs {
memberDataDir := filepath.Join(path, fmt.Sprintf("server-%s", member.Config().Name))
persistMemberDataDir(t, r.lg, member, memberDataDir)
logsFile := filepath.Join(path, fmt.Sprintf("server-%s.log", member.Config().Name))
persistMemberLogs(t, r.lg, member, logsFile)
}
if r.clientReports != nil {
sort.Slice(r.clientReports, func(i, j int) bool {
Expand Down Expand Up @@ -101,6 +103,14 @@ func persistMemberDataDir(t *testing.T, lg *zap.Logger, member e2e.EtcdProcess,
}
}

func persistMemberLogs(t *testing.T, lg *zap.Logger, member e2e.EtcdProcess, path string) {
lg.Info("Saving member logs dir", zap.String("member", member.Config().Name), zap.String("path", path))
err := os.WriteFile(path, []byte(strings.Join(member.LogLines(), "")), 0644)
if err != nil {
t.Fatal(err)
}
}

func persistWatchResponses(t *testing.T, lg *zap.Logger, path string, responses []traffic.WatchResponse) {
lg.Info("Saving watch responses", zap.String("path", path))
file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755)
Expand Down

0 comments on commit aeaf52b

Please sign in to comment.