diff --git a/tests/robustness/validate/validate.go b/tests/robustness/validate/validate.go index a23734953c5..0318a9c7a95 100644 --- a/tests/robustness/validate/validate.go +++ b/tests/robustness/validate/validate.go @@ -40,8 +40,9 @@ func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, report return results } - // TODO: Use requests from linearization for replay. - replay := model.NewReplay(persistedRequests) + // Use linearization results from operations + linearizedRequests := getLinearizedRequests(linearizableOperations, reports, persistedRequests) + replay := model.NewReplay(linearizedRequests) err = validateWatch(lg, cfg, reports, replay) if err != nil { @@ -54,6 +55,41 @@ func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, report return results } +// getLinearizedRequests converts linearizable operations to a sequence of requests +// while preserving error responses from the client reports +func getLinearizedRequests(operations []porcupine.Operation, reports []report.ClientReport, persistedRequests []model.EtcdRequest) []model.EtcdRequest { + result := make([]model.EtcdRequest, 0, len(persistedRequests)) + + // Build map of failed operations from client reports + failedOps := make(map[int]bool) + for _, report := range reports { + for _, op := range report.KeyValue { + response := op.Output.(model.MaybeEtcdResponse) + if response.Error != "" { + failedOps[op.ClientId] = true + } + } + } + + // Track processed operations + opIndex := 0 + + // Build sequence combining linearized operations and error responses + for i := range persistedRequests { + if failedOps[i] { + // Keep failed operations in their original position + result = append(result, persistedRequests[i]) + } else if opIndex < len(operations) { + // Use operations order for successful requests + originalIndex := operations[opIndex].Input.(int) + result = append(result, persistedRequests[originalIndex]) + opIndex++ + } + } + + return result +} + type Config struct { ExpectRevisionUnique bool } diff --git a/tests/robustness/validate/validate_test.go b/tests/robustness/validate/validate_test.go index bd6709af97e..8120d9bd864 100644 --- a/tests/robustness/validate/validate_test.go +++ b/tests/robustness/validate/validate_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/anishathalye/porcupine" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" @@ -1835,6 +1836,155 @@ func TestValidateWatch(t *testing.T) { }, expectError: errBrokeFilter.Error(), }, + { + name: "Linearizable - ordered events match linearization - pass", + reports: []report.ClientReport{ + { + Watch: []model.WatchOperation{ + { + Request: model.WatchRequest{ + WithPrefix: true, + }, + Responses: []model.WatchResponse{ + { + Events: []model.WatchEvent{ + putWatchEvent("a", "1", 2, true), + putWatchEvent("b", "2", 3, true), + }, + }, + }, + }, + }, + KeyValue: []porcupine.Operation{ + { + Input: putRequest("a", "1"), + Call: 1, + Return: 2, + }, + { + Input: putRequest("b", "2"), + Call: 3, + Return: 4, + }, + }, + }, + }, + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("b", "2"), + }, + }, + { + name: "Linearizable - concurrent atomic txn - pass", + reports: []report.ClientReport{ + { + Watch: []model.WatchOperation{ + { + Request: model.WatchRequest{ + WithPrefix: true, + }, + Responses: []model.WatchResponse{ + { + Events: []model.WatchEvent{ + putWatchEvent("a", "1", 2, true), + putWatchEvent("b", "2", 2, true), + }, + }, + }, + }, + }, + KeyValue: []porcupine.Operation{ + { + Input: model.EtcdRequest{ + Type: model.Txn, + Txn: &model.TxnRequest{ + OperationsOnSuccess: []model.EtcdOperation{ + { + Type: model.PutOperation, + Put: model.PutOptions{ + Key: "a", + Value: model.ToValueOrHash("1"), + }, + }, + { + Type: model.PutOperation, + Put: model.PutOptions{ + Key: "b", + Value: model.ToValueOrHash("2"), + }, + }, + }, + }, + }, + Call: 1, + Return: 2, + }, + }, + }, + }, + persistedRequests: []model.EtcdRequest{ + { + Type: model.Txn, + Txn: &model.TxnRequest{ + OperationsOnSuccess: []model.EtcdOperation{ + { + Type: model.PutOperation, + Put: model.PutOptions{ + Key: "a", + Value: model.ToValueOrHash("1"), + }, + }, + { + Type: model.PutOperation, + Put: model.PutOptions{ + Key: "b", + Value: model.ToValueOrHash("2"), + }, + }, + }, + }, + }, + }, + }, + { + name: "Linearizable - non-atomic unordered events - fail", + reports: []report.ClientReport{ + { + Watch: []model.WatchOperation{ + { + Request: model.WatchRequest{ + WithPrefix: true, + }, + Responses: []model.WatchResponse{ + { + Events: []model.WatchEvent{ + putWatchEvent("b", "2", 3, true), + putWatchEvent("a", "1", 2, true), + }, + }, + }, + }, + }, + KeyValue: []porcupine.Operation{ + { + Input: putRequest("a", "1"), + Call: 1, + Return: 2, + }, + { + Input: putRequest("b", "2"), + Call: 3, + Return: 4, + }, + }, + }, + }, + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + putRequest("b", "2"), + }, + expectError: errBrokeOrdered.Error(), + }, } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { diff --git a/tests/robustness/validate/watch.go b/tests/robustness/validate/watch.go index 506cbeca431..ba8fa72a28d 100644 --- a/tests/robustness/validate/watch.go +++ b/tests/robustness/validate/watch.go @@ -35,6 +35,7 @@ var ( errBrokePrevKV = errors.New("incorrect event prevValue") errBrokeIsCreate = errors.New("incorrect event IsCreate") errBrokeFilter = errors.New("event not matching watch filter") + errBrokeLinearizable = errors.New("broke linearizable property: watch events don't match linearization order") ) func validateWatch(lg *zap.Logger, cfg Config, reports []report.ClientReport, replay *model.EtcdReplay) error {