Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(test/robustness): Utilize Porcupine linearization for watch validation #19428

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 38 additions & 2 deletions tests/robustness/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, report
return results
}

// TODO: Use requests from linearization for replay.
replay := model.NewReplay(persistedRequests)
// Use linearization results from operations
linearizedRequests := getLinearizedRequests(linearizableOperations, reports, persistedRequests)
replay := model.NewReplay(linearizedRequests)

err = validateWatch(lg, cfg, reports, replay)
if err != nil {
Expand All @@ -54,6 +55,41 @@ func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, report
return results
}

// getLinearizedRequests converts linearizable operations to a sequence of requests
// while preserving error responses from the client reports
func getLinearizedRequests(operations []porcupine.Operation, reports []report.ClientReport, persistedRequests []model.EtcdRequest) []model.EtcdRequest {
result := make([]model.EtcdRequest, 0, len(persistedRequests))

// Build map of failed operations from client reports
failedOps := make(map[int]bool)
for _, report := range reports {
for _, op := range report.KeyValue {
response := op.Output.(model.MaybeEtcdResponse)
if response.Error != "" {
failedOps[op.ClientId] = true
}
}
}

// Track processed operations
opIndex := 0

// Build sequence combining linearized operations and error responses
for i := range persistedRequests {
if failedOps[i] {
// Keep failed operations in their original position
result = append(result, persistedRequests[i])
} else if opIndex < len(operations) {
// Use operations order for successful requests
originalIndex := operations[opIndex].Input.(int)
result = append(result, persistedRequests[originalIndex])
opIndex++
}
}

return result
}

type Config struct {
ExpectRevisionUnique bool
}
Expand Down
150 changes: 150 additions & 0 deletions tests/robustness/validate/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"
"time"

"github.com/anishathalye/porcupine"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"

Expand Down Expand Up @@ -1835,6 +1836,155 @@ func TestValidateWatch(t *testing.T) {
},
expectError: errBrokeFilter.Error(),
},
{
name: "Linearizable - ordered events match linearization - pass",
reports: []report.ClientReport{
{
Watch: []model.WatchOperation{
{
Request: model.WatchRequest{
WithPrefix: true,
},
Responses: []model.WatchResponse{
{
Events: []model.WatchEvent{
putWatchEvent("a", "1", 2, true),
putWatchEvent("b", "2", 3, true),
},
},
},
},
},
KeyValue: []porcupine.Operation{
{
Input: putRequest("a", "1"),
Call: 1,
Return: 2,
},
{
Input: putRequest("b", "2"),
Call: 3,
Return: 4,
},
},
},
},
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("b", "2"),
},
},
{
name: "Linearizable - concurrent atomic txn - pass",
reports: []report.ClientReport{
{
Watch: []model.WatchOperation{
{
Request: model.WatchRequest{
WithPrefix: true,
},
Responses: []model.WatchResponse{
{
Events: []model.WatchEvent{
putWatchEvent("a", "1", 2, true),
putWatchEvent("b", "2", 2, true),
},
},
},
},
},
KeyValue: []porcupine.Operation{
{
Input: model.EtcdRequest{
Type: model.Txn,
Txn: &model.TxnRequest{
OperationsOnSuccess: []model.EtcdOperation{
{
Type: model.PutOperation,
Put: model.PutOptions{
Key: "a",
Value: model.ToValueOrHash("1"),
},
},
{
Type: model.PutOperation,
Put: model.PutOptions{
Key: "b",
Value: model.ToValueOrHash("2"),
},
},
},
},
},
Call: 1,
Return: 2,
},
},
},
},
persistedRequests: []model.EtcdRequest{
{
Type: model.Txn,
Txn: &model.TxnRequest{
OperationsOnSuccess: []model.EtcdOperation{
{
Type: model.PutOperation,
Put: model.PutOptions{
Key: "a",
Value: model.ToValueOrHash("1"),
},
},
{
Type: model.PutOperation,
Put: model.PutOptions{
Key: "b",
Value: model.ToValueOrHash("2"),
},
},
},
},
},
},
},
{
name: "Linearizable - non-atomic unordered events - fail",
reports: []report.ClientReport{
{
Watch: []model.WatchOperation{
{
Request: model.WatchRequest{
WithPrefix: true,
},
Responses: []model.WatchResponse{
{
Events: []model.WatchEvent{
putWatchEvent("b", "2", 3, true),
putWatchEvent("a", "1", 2, true),
},
},
},
},
},
KeyValue: []porcupine.Operation{
{
Input: putRequest("a", "1"),
Call: 1,
Return: 2,
},
{
Input: putRequest("b", "2"),
Call: 3,
Return: 4,
},
},
},
},
persistedRequests: []model.EtcdRequest{
putRequest("a", "1"),
putRequest("b", "2"),
},
expectError: errBrokeOrdered.Error(),
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions tests/robustness/validate/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var (
errBrokePrevKV = errors.New("incorrect event prevValue")
errBrokeIsCreate = errors.New("incorrect event IsCreate")
errBrokeFilter = errors.New("event not matching watch filter")
errBrokeLinearizable = errors.New("broke linearizable property: watch events don't match linearization order")
)

func validateWatch(lg *zap.Logger, cfg Config, reports []report.ClientReport, replay *model.EtcdReplay) error {
Expand Down