Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 164 additions & 57 deletions tests/robustness/validate/patch_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,37 +24,78 @@ import (
"go.etcd.io/etcd/tests/v3/robustness/report"
)

type patchArgs struct {
returnTime int64
clientCount int64
persistedCount int64
revision int64
}

func patchLinearizableOperations(operations []porcupine.Operation, reports []report.ClientReport, persistedRequests []model.EtcdRequest) []porcupine.Operation {
putRevision := putRevision(reports)
putRevision, delRevision := watchRevisions(reports)
persistedPutCount := countPersistedPuts(persistedRequests)
clientPutCount := countClientPuts(reports)
putReturnTime := uniquePutReturnTime(operations, persistedRequests, clientPutCount)
return patchOperations(operations, putRevision, putReturnTime, clientPutCount, persistedPutCount)

persistedDeleteCount := countPersistedDeletes(persistedRequests)
clientDeleteCount := countClientDeletes(reports)

putReturnTime, delReturnTime := uniqueOperationReturnTime(operations, persistedRequests, clientPutCount, clientDeleteCount)

putArgs := make(map[model.PutOptions]patchArgs)
for opts, c := range clientPutCount {
putArgs[opts] = patchArgs{
clientCount: c,
persistedCount: persistedPutCount[opts],
returnTime: putReturnTime[opts],
revision: putRevision[opts],
}
}
delArgs := make(map[model.DeleteOptions]patchArgs)
for opts, c := range clientDeleteCount {
delArgs[opts] = patchArgs{
clientCount: c,
persistedCount: persistedDeleteCount[opts],
returnTime: delReturnTime[opts],
revision: delRevision[opts],
}
}

return patchOperations(
operations, putArgs, delArgs,
)
}

func putRevision(reports []report.ClientReport) map[keyValue]int64 {
requestRevision := map[keyValue]int64{}
func watchRevisions(reports []report.ClientReport) (map[model.PutOptions]int64, map[model.DeleteOptions]int64) {
putRevisions := map[model.PutOptions]int64{}
delRevisions := map[model.DeleteOptions]int64{}

for _, client := range reports {
for _, watch := range client.Watch {
for _, resp := range watch.Responses {
for _, event := range resp.Events {
switch event.Type {
case model.RangeOperation:
case model.PutOperation:
kv := keyValue{Key: event.Key, Value: event.Value}
requestRevision[kv] = event.Revision
kv := model.PutOptions{Key: event.Key, Value: event.Value}
putRevisions[kv] = event.Revision
case model.DeleteOperation:
kv := model.DeleteOptions{Key: event.Key} // ADD: Track delete revisions
delRevisions[kv] = event.Revision
default:
panic(fmt.Sprintf("unknown event type %q", event.Type))
}
}
}
}
}
return requestRevision
return putRevisions, delRevisions
}

func patchOperations(operations []porcupine.Operation, watchRevision, putReturnTime, clientPutCount, persistedPutCount map[keyValue]int64) []porcupine.Operation {
func patchOperations(
operations []porcupine.Operation,
putArgs map[model.PutOptions]patchArgs,
delArgs map[model.DeleteOptions]patchArgs,
) []porcupine.Operation {
newOperations := make([]porcupine.Operation, 0, len(operations))

for _, op := range operations {
Expand All @@ -70,26 +111,47 @@ func patchOperations(operations []porcupine.Operation, watchRevision, putReturnT
for _, etcdOp := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) {
switch etcdOp.Type {
case model.PutOperation:
kv := keyValue{Key: etcdOp.Put.Key, Value: etcdOp.Put.Value}
if _, ok := persistedPutCount[kv]; ok {
kv := model.PutOptions{Key: etcdOp.Put.Key, Value: etcdOp.Put.Value}
arg, ok := putArgs[kv]
if !ok {
continue
}
if arg.persistedCount > 0 {
persisted = true
}
if count := clientPutCount[kv]; count != 1 {
if arg.clientCount != 1 {
continue
}
if revision, ok := watchRevision[kv]; ok {
txnRevision = revision
if arg.revision > 0 {
txnRevision = arg.revision
}
if returnTime, ok := putReturnTime[kv]; ok {
op.Return = min(op.Return, returnTime)
if arg.returnTime > 0 {
op.Return = min(op.Return, arg.returnTime)
}
case model.DeleteOperation:
kv := model.DeleteOptions{Key: etcdOp.Delete.Key}
arg, ok := delArgs[kv]
if !ok {
continue
}
if arg.persistedCount > 0 {
persisted = true
}
if arg.clientCount != 1 {
continue
}
if arg.revision > 0 {
txnRevision = arg.revision
}
if arg.returnTime > 0 {
op.Return = min(op.Return, arg.returnTime)
}
case model.RangeOperation:
default:
panic(fmt.Sprintf("unknown operation type %q", etcdOp.Type))
}
}
if isUniqueTxn(request.Txn, clientPutCount) {
if isUniqueTxn(request.Txn, putArgs, delArgs) {
if !persisted {
// Remove non persisted operations
continue
Expand All @@ -106,12 +168,12 @@ func patchOperations(operations []porcupine.Operation, watchRevision, putReturnT
return newOperations
}

func isUniqueTxn(request *model.TxnRequest, clientRequestCount map[keyValue]int64) bool {
return isUniqueOps(request.OperationsOnSuccess, clientRequestCount) && isUniqueOps(request.OperationsOnFailure, clientRequestCount)
func isUniqueTxn(request *model.TxnRequest, putArgs map[model.PutOptions]patchArgs, delArgs map[model.DeleteOptions]patchArgs) bool {
return isUniqueOps(request.OperationsOnSuccess, putArgs, delArgs) && isUniqueOps(request.OperationsOnFailure, putArgs, delArgs)
}

func isUniqueOps(ops []model.EtcdOperation, clientRequestCount map[keyValue]int64) bool {
return hasUniqueWriteOperation(ops, clientRequestCount) || !hasWriteOperation(ops)
func isUniqueOps(ops []model.EtcdOperation, putArgs map[model.PutOptions]patchArgs, delArgs map[model.DeleteOptions]patchArgs) bool {
return hasUniqueWriteOperation(ops, putArgs, delArgs) || !hasWriteOperation(ops)
}

func hasWriteOperation(ops []model.EtcdOperation) bool {
Expand All @@ -123,15 +185,19 @@ func hasWriteOperation(ops []model.EtcdOperation) bool {
return false
}

func hasUniqueWriteOperation(ops []model.EtcdOperation, clientRequestCount map[keyValue]int64) bool {
func hasUniqueWriteOperation(ops []model.EtcdOperation, putArgs map[model.PutOptions]patchArgs, delArgs map[model.DeleteOptions]patchArgs) bool {
for _, operation := range ops {
switch operation.Type {
case model.PutOperation:
kv := keyValue{Key: operation.Put.Key, Value: operation.Put.Value}
if count := clientRequestCount[kv]; count == 1 {
kv := model.PutOptions{Key: operation.Put.Key, Value: operation.Put.Value}
if arg, ok := putArgs[kv]; ok && arg.clientCount == 1 {
return true
}
case model.DeleteOperation:
kv := model.DeleteOptions{Key: operation.Delete.Key}
if arg, ok := delArgs[kv]; ok && arg.clientCount == 1 {
return true
}
case model.RangeOperation:
default:
panic(fmt.Sprintf("unknown operation type %q", operation.Type))
Expand All @@ -140,25 +206,33 @@ func hasUniqueWriteOperation(ops []model.EtcdOperation, clientRequestCount map[k
return false
}

func uniquePutReturnTime(allOperations []porcupine.Operation, persistedRequests []model.EtcdRequest, clientPutCount map[keyValue]int64) map[keyValue]int64 {
earliestReturnTime := map[keyValue]int64{}
func uniqueOperationReturnTime(allOperations []porcupine.Operation, persistedRequests []model.EtcdRequest, clientPutCount map[model.PutOptions]int64, clientDeleteCount map[model.DeleteOptions]int64) (map[model.PutOptions]int64, map[model.DeleteOptions]int64) {
putTimes := map[model.PutOptions]int64{}
delTimes := map[model.DeleteOptions]int64{}
var lastReturnTime int64
for _, op := range allOperations {
request := op.Input.(model.EtcdRequest)
switch request.Type {
case model.Txn:
for _, etcdOp := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) {
if etcdOp.Type != model.PutOperation {
continue
}
kv := keyValue{Key: etcdOp.Put.Key, Value: etcdOp.Put.Value}
if count := clientPutCount[kv]; count > 1 {
continue
}
if returnTime, ok := earliestReturnTime[kv]; !ok || returnTime > op.Return {
earliestReturnTime[kv] = op.Return
switch etcdOp.Type {
case model.PutOperation:
kv := model.PutOptions{Key: etcdOp.Put.Key, Value: etcdOp.Put.Value}
if clientPutCount[kv] > 1 {
continue
}
if returnTime, ok := putTimes[kv]; !ok || returnTime > op.Return {
putTimes[kv] = op.Return
}
case model.DeleteOperation:
kv := model.DeleteOptions{Key: etcdOp.Delete.Key}
if clientDeleteCount[kv] > 1 {
continue
}
if returnTime, ok := delTimes[kv]; !ok || returnTime > op.Return {
delTimes[kv] = op.Return
}
}
earliestReturnTime[kv] = op.Return
}
case model.Range:
case model.LeaseGrant:
Expand All @@ -181,17 +255,25 @@ func uniquePutReturnTime(allOperations []porcupine.Operation, persistedRequests
lastReturnTime--
}
for _, op := range request.Txn.OperationsOnSuccess {
if op.Type != model.PutOperation {
continue
}
kv := keyValue{Key: op.Put.Key, Value: op.Put.Value}
if count := clientPutCount[kv]; count > 1 {
continue
}
returnTime, ok := earliestReturnTime[kv]
if ok {
lastReturnTime = min(returnTime, lastReturnTime)
earliestReturnTime[kv] = lastReturnTime
switch op.Type {
case model.PutOperation:
kv := model.PutOptions{Key: op.Put.Key, Value: op.Put.Value}
if clientPutCount[kv] > 1 {
continue
}
if returnTime, ok := putTimes[kv]; ok {
lastReturnTime = min(returnTime, lastReturnTime)
putTimes[kv] = lastReturnTime
}
case model.DeleteOperation:
kv := model.DeleteOptions{Key: op.Delete.Key}
if clientDeleteCount[kv] > 1 {
continue
}
if returnTime, ok := delTimes[kv]; ok {
lastReturnTime = min(returnTime, lastReturnTime)
delTimes[kv] = lastReturnTime
}
}
}
case model.LeaseGrant:
Expand All @@ -201,11 +283,11 @@ func uniquePutReturnTime(allOperations []porcupine.Operation, persistedRequests
panic(fmt.Sprintf("Unknown request type: %q", request.Type))
}
}
return earliestReturnTime
return putTimes, delTimes
}

func countClientPuts(reports []report.ClientReport) map[keyValue]int64 {
counter := map[keyValue]int64{}
func countClientPuts(reports []report.ClientReport) map[model.PutOptions]int64 {
counter := map[model.PutOptions]int64{}
for _, client := range reports {
for _, op := range client.KeyValue {
request := op.Input.(model.EtcdRequest)
Expand All @@ -215,21 +297,21 @@ func countClientPuts(reports []report.ClientReport) map[keyValue]int64 {
return counter
}

func countPersistedPuts(requests []model.EtcdRequest) map[keyValue]int64 {
counter := map[keyValue]int64{}
func countPersistedPuts(requests []model.EtcdRequest) map[model.PutOptions]int64 {
counter := map[model.PutOptions]int64{}
for _, request := range requests {
countPuts(counter, request)
}
return counter
}

func countPuts(counter map[keyValue]int64, request model.EtcdRequest) {
func countPuts(counter map[model.PutOptions]int64, request model.EtcdRequest) {
switch request.Type {
case model.Txn:
for _, operation := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) {
switch operation.Type {
case model.PutOperation:
kv := keyValue{Key: operation.Put.Key, Value: operation.Put.Value}
kv := model.PutOptions{Key: operation.Put.Key, Value: operation.Put.Value}
counter[kv]++
case model.DeleteOperation:
case model.RangeOperation:
Expand All @@ -247,7 +329,32 @@ func countPuts(counter map[keyValue]int64, request model.EtcdRequest) {
}
}

type keyValue struct {
Key string
Value model.ValueOrHash
func countClientDeletes(reports []report.ClientReport) map[model.DeleteOptions]int64 {
counter := map[model.DeleteOptions]int64{}
for _, client := range reports {
for _, op := range client.KeyValue {
request := op.Input.(model.EtcdRequest)
countDeletes(counter, request)
}
}
return counter
}

func countPersistedDeletes(requests []model.EtcdRequest) map[model.DeleteOptions]int64 {
counter := map[model.DeleteOptions]int64{}
for _, req := range requests {
countDeletes(counter, req)
}
return counter
}

func countDeletes(counter map[model.DeleteOptions]int64, request model.EtcdRequest) {
if request.Type != model.Txn {
return
}
for _, operation := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) {
if operation.Type == model.DeleteOperation {
counter[operation.Delete]++
}
}
}
Loading