Skip to content

Commit

Permalink
Merge pull request #18905 from serathius/robustness-duplicated-puts-3
Browse files Browse the repository at this point in the history
Robustness duplicated puts 3
  • Loading branch information
serathius authored Nov 24, 2024
2 parents fd02589 + 668834b commit fe45307
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 131 deletions.
199 changes: 106 additions & 93 deletions tests/robustness/validate/patch_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import (
func patchLinearizableOperations(reports []report.ClientReport, persistedRequests []model.EtcdRequest) []porcupine.Operation {
allOperations := relevantOperations(reports)
putRevision := putRevision(reports)
putReturnTimeFromWatch := putReturnTimeFromWatch(reports)
putReturnTimeFromPersisted := putReturnTimeFromPersistedOperations(allOperations, persistedRequests)
return patchOperations(allOperations, putRevision, putReturnTimeFromWatch, putReturnTimeFromPersisted)
putReturnTime := putReturnTime(allOperations, reports, persistedRequests)
clientPutCount := countClientPuts(reports)
persistedPutCount := countPersistedPuts(persistedRequests)
return patchOperations(allOperations, putRevision, putReturnTime, clientPutCount, persistedPutCount)
}

func relevantOperations(reports []report.ClientReport) []porcupine.Operation {
Expand All @@ -46,30 +47,6 @@ func relevantOperations(reports []report.ClientReport) []porcupine.Operation {
return ops
}

func putReturnTimeFromWatch(reports []report.ClientReport) map[keyValue]int64 {
earliestTime := map[keyValue]int64{}
for _, client := range reports {
for _, watch := range client.Watch {
for _, resp := range watch.Responses {
for _, event := range resp.Events {
switch event.Type {
case model.RangeOperation:
case model.PutOperation:
kv := keyValue{Key: event.Key, Value: event.Value}
if t, ok := earliestTime[kv]; !ok || t > resp.Time.Nanoseconds() {
earliestTime[kv] = resp.Time.Nanoseconds()
}
case model.DeleteOperation:
default:
panic(fmt.Sprintf("unknown event type %q", event.Type))
}
}
}
}
}
return earliestTime
}

func putRevision(reports []report.ClientReport) map[keyValue]int64 {
requestRevision := map[keyValue]int64{}
for _, client := range reports {
Expand All @@ -92,7 +69,7 @@ func putRevision(reports []report.ClientReport) map[keyValue]int64 {
return requestRevision
}

func patchOperations(operations []porcupine.Operation, watchRevision, putReturnTimeFromWatch, putReturnTimeFromPersisted map[keyValue]int64) []porcupine.Operation {
func patchOperations(operations []porcupine.Operation, watchRevision, putReturnTime, clientPutCount, persistedPutCount map[keyValue]int64) []porcupine.Operation {
newOperations := make([]porcupine.Operation, 0, len(operations))

for _, op := range operations {
Expand All @@ -109,14 +86,16 @@ func patchOperations(operations []porcupine.Operation, watchRevision, putReturnT
switch etcdOp.Type {
case model.PutOperation:
kv := keyValue{Key: etcdOp.Put.Key, Value: etcdOp.Put.Value}
if _, ok := persistedPutCount[kv]; ok {
persisted = true
}
if count := clientPutCount[kv]; count != 1 {
continue
}
if revision, ok := watchRevision[kv]; ok {
txnRevision = revision
}
if returnTime, ok := putReturnTimeFromWatch[kv]; ok {
op.Return = min(op.Return, returnTime)
}
if returnTime, ok := putReturnTimeFromPersisted[kv]; ok {
persisted = true
if returnTime, ok := putReturnTime[kv]; ok {
op.Return = min(op.Return, returnTime)
}
case model.DeleteOperation:
Expand All @@ -125,7 +104,7 @@ func patchOperations(operations []porcupine.Operation, watchRevision, putReturnT
panic(fmt.Sprintf("unknown operation type %q", etcdOp.Type))
}
}
if isUniqueTxn(request.Txn) {
if isUniqueTxn(request.Txn, clientPutCount) {
if !persisted {
// Remove non persisted operations
continue
Expand All @@ -143,12 +122,12 @@ func patchOperations(operations []porcupine.Operation, watchRevision, putReturnT
return newOperations
}

func isUniqueTxn(request *model.TxnRequest) bool {
return isUniqueOps(request.OperationsOnSuccess) && isUniqueOps(request.OperationsOnFailure)
func isUniqueTxn(request *model.TxnRequest, clientRequestCount map[keyValue]int64) bool {
return isUniqueOps(request.OperationsOnSuccess, clientRequestCount) && isUniqueOps(request.OperationsOnFailure, clientRequestCount)
}

func isUniqueOps(ops []model.EtcdOperation) bool {
return hasUniqueWriteOperation(ops) || !hasWriteOperation(ops)
func isUniqueOps(ops []model.EtcdOperation, clientRequestCount map[keyValue]int64) bool {
return hasUniqueWriteOperation(ops, clientRequestCount) || !hasWriteOperation(ops)
}

func hasWriteOperation(ops []model.EtcdOperation) bool {
Expand All @@ -160,104 +139,138 @@ func hasWriteOperation(ops []model.EtcdOperation) bool {
return false
}

func hasUniqueWriteOperation(ops []model.EtcdOperation) bool {
for _, etcdOp := range ops {
if etcdOp.Type == model.PutOperation {
return true
func hasUniqueWriteOperation(ops []model.EtcdOperation, clientRequestCount map[keyValue]int64) bool {
for _, operation := range ops {
switch operation.Type {
case model.PutOperation:
kv := keyValue{Key: operation.Put.Key, Value: operation.Put.Value}
if count := clientRequestCount[kv]; count == 1 {
return true
}
case model.DeleteOperation:
case model.RangeOperation:
default:
panic(fmt.Sprintf("unknown operation type %q", operation.Type))
}
}
return false
}

func putReturnTimeFromPersistedOperations(allOperations []porcupine.Operation, persistedRequests []model.EtcdRequest) map[keyValue]int64 {
putReturnTimes := putReturnTime(allOperations)
persisted := map[keyValue]int64{}

lastReturnTime := maxReturnTime(putReturnTimes)

for i := len(persistedRequests) - 1; i >= 0; i-- {
request := persistedRequests[i]
func putReturnTime(allOperations []porcupine.Operation, reports []report.ClientReport, persistedRequests []model.EtcdRequest) map[keyValue]int64 {
earliestReturnTime := map[keyValue]int64{}
var lastReturnTime int64
for _, op := range allOperations {
request := op.Input.(model.EtcdRequest)
switch request.Type {
case model.Txn:
hasPut := false
lastReturnTime--
for _, op := range request.Txn.OperationsOnSuccess {
if op.Type != model.PutOperation {
for _, etcdOp := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) {
if etcdOp.Type != model.PutOperation {
continue
}
kv := keyValue{Key: op.Put.Key, Value: op.Put.Value}
if _, found := persisted[kv]; found {
panic(fmt.Sprintf("Unexpected duplicate event in persisted requests. %d %+v", i, op))
}
hasPut = true
persisted[kv] = lastReturnTime
}
if hasPut {
newReturnTime := returnTimeFromRequest(putReturnTimes, request)
if newReturnTime != -1 {
lastReturnTime = min(lastReturnTime, newReturnTime)
kv := keyValue{Key: etcdOp.Put.Key, Value: etcdOp.Put.Value}
if returnTime, ok := earliestReturnTime[kv]; !ok || returnTime > op.Return {
earliestReturnTime[kv] = op.Return
}
earliestReturnTime[kv] = op.Return
}
case model.Range:
case model.LeaseGrant:
case model.LeaseRevoke:
case model.Compact:
default:
panic(fmt.Sprintf("Unknown request type: %q", request.Type))
}
if op.Return > lastReturnTime {
lastReturnTime = op.Return
}
}
return persisted
}

func putReturnTime(operations []porcupine.Operation) map[model.EtcdOperation]int64 {
newOperations := map[model.EtcdOperation]int64{}
for _, op := range operations {
request := op.Input.(model.EtcdRequest)
for _, client := range reports {
for _, watch := range client.Watch {
for _, resp := range watch.Responses {
for _, event := range resp.Events {
switch event.Type {
case model.RangeOperation:
case model.PutOperation:
kv := keyValue{Key: event.Key, Value: event.Value}
if t, ok := earliestReturnTime[kv]; !ok || t > resp.Time.Nanoseconds() {
earliestReturnTime[kv] = resp.Time.Nanoseconds()
}
case model.DeleteOperation:
default:
panic(fmt.Sprintf("unknown event type %q", event.Type))
}
}
}
}
}

for i := len(persistedRequests) - 1; i >= 0; i-- {
request := persistedRequests[i]
switch request.Type {
case model.Txn:
for _, etcdOp := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) {
if etcdOp.Type != model.PutOperation {
lastReturnTime--
for _, op := range request.Txn.OperationsOnSuccess {
if op.Type != model.PutOperation {
continue
}
if _, found := newOperations[etcdOp]; found {
panic("Unexpected duplicate event in persisted requests.")
kv := keyValue{Key: op.Put.Key, Value: op.Put.Value}
returnTime, ok := earliestReturnTime[kv]
if ok {
lastReturnTime = min(returnTime, lastReturnTime)
earliestReturnTime[kv] = lastReturnTime
}
newOperations[etcdOp] = op.Return
}
case model.Range:
case model.LeaseGrant:
case model.LeaseRevoke:
case model.Compact:
default:
panic(fmt.Sprintf("Unknown request type: %q", request.Type))
}
}
return newOperations
return earliestReturnTime
}

func maxReturnTime(operationTime map[model.EtcdOperation]int64) int64 {
var maxReturnTime int64
for _, returnTime := range operationTime {
if returnTime > maxReturnTime {
maxReturnTime = returnTime
func countClientPuts(reports []report.ClientReport) map[keyValue]int64 {
counter := map[keyValue]int64{}
for _, client := range reports {
for _, op := range client.KeyValue {
request := op.Input.(model.EtcdRequest)
countPuts(counter, request)
}
}
return maxReturnTime
return counter
}

func countPersistedPuts(requests []model.EtcdRequest) map[keyValue]int64 {
counter := map[keyValue]int64{}
for _, request := range requests {
countPuts(counter, request)
}
return counter
}

func returnTimeFromRequest(putReturnTimes map[model.EtcdOperation]int64, request model.EtcdRequest) int64 {
func countPuts(counter map[keyValue]int64, request model.EtcdRequest) {
switch request.Type {
case model.Txn:
for _, op := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) {
if op.Type != model.PutOperation {
continue
}
if time, found := putReturnTimes[op]; found {
return time
for _, operation := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) {
switch operation.Type {
case model.PutOperation:
kv := keyValue{Key: operation.Put.Key, Value: operation.Put.Value}
counter[kv]++
case model.DeleteOperation:
case model.RangeOperation:
default:
panic(fmt.Sprintf("unknown operation type %q", operation.Type))
}
}
return -1
case model.LeaseGrant:
case model.LeaseRevoke:
case model.Compact:
case model.Defragment:
case model.Range:
default:
panic(fmt.Sprintf("Unknown request type: %q", request.Type))
panic(fmt.Sprintf("unknown request type %q", request.Type))
}
}

Expand Down
52 changes: 49 additions & 3 deletions tests/robustness/validate/patch_history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//nolint:unparam
package validate

import (
Expand Down Expand Up @@ -73,7 +74,7 @@ func TestPatchHistory(t *testing.T) {
},
},
{
name: "failed put remains if there is a matching event, return time based on next persisted request",
name: "failed put remains if there is a matching event, uniqueness allows for return time to be based on next persisted request",
historyFunc: func(h *model.AppendableHistory) {
h.AppendPut("key1", "value", 100, infinite, nil, errors.New("failed"))
h.AppendPut("key2", "value", 300, 400, &clientv3.PutResponse{}, nil)
Expand All @@ -88,7 +89,7 @@ func TestPatchHistory(t *testing.T) {
},
},
{
name: "failed put remains if there is a matching event, revision and return time based on watch",
name: "failed put remains if there is a matching persisted request, uniqueness allows for revision and return time to be based on watch",
historyFunc: func(h *model.AppendableHistory) {
h.AppendPut("key", "value", 100, infinite, nil, errors.New("failed"))
},
Expand All @@ -100,6 +101,22 @@ func TestPatchHistory(t *testing.T) {
{Return: 300, Output: model.MaybeEtcdResponse{Persisted: true, PersistedRevision: 2}},
},
},
{
name: "failed put remains if there is a matching persisted request, lack of uniqueness causes time to be untouched regardless of persisted event and watch",
historyFunc: func(h *model.AppendableHistory) {
h.AppendPut("key", "value", 1, 2, nil, errors.New("failed"))
h.AppendPut("key", "value", 3, 4, &clientv3.PutResponse{}, nil)
},
persistedRequest: []model.EtcdRequest{
putRequest("key", "value"),
putRequest("key", "value"),
},
watchOperations: watchResponse(3, putEvent("key", "value", 2), putEvent("key", "value", 3)),
expectedRemainingOperations: []porcupine.Operation{
{Return: 1000000004, Output: model.MaybeEtcdResponse{Error: "failed"}},
{Return: 4, Output: putResponse(model.EtcdOperationResult{})},
},
},
{
name: "failed put is dropped if event has different key",
historyFunc: func(h *model.AppendableHistory) {
Expand Down Expand Up @@ -139,7 +156,7 @@ func TestPatchHistory(t *testing.T) {
},
},
{
name: "failed put with lease remains if there is a matching event, return time based on next persisted request",
name: "failed put with lease remains if there is a matching event, uniqueness allows return time to be based on next persisted request",
historyFunc: func(h *model.AppendableHistory) {
h.AppendPutWithLease("key1", "value", 123, 100, infinite, nil, errors.New("failed"))
h.AppendPutWithLease("key2", "value", 234, 300, 400, &clientv3.PutResponse{}, nil)
Expand All @@ -153,6 +170,35 @@ func TestPatchHistory(t *testing.T) {
{Return: 400, Output: putResponse(model.EtcdOperationResult{})},
},
},
{
name: "failed put with lease remains if there is a matching event, uniqueness allows for revision and return time to be based on watch",
historyFunc: func(h *model.AppendableHistory) {
h.AppendPutWithLease("key", "value", 123, 1, 2, nil, errors.New("failed"))
},
persistedRequest: []model.EtcdRequest{
putRequestWithLease("key", "value", 123),
},
watchOperations: watchResponse(3, putEvent("key", "value", 2)),
expectedRemainingOperations: []porcupine.Operation{
{Return: 3, Output: model.MaybeEtcdResponse{Persisted: true, PersistedRevision: 2}},
},
},
{
name: "failed put with lease remains if there is a matching persisted request, lack of uniqueness causes time to be untouched regardless of persisted event and watch",
historyFunc: func(h *model.AppendableHistory) {
h.AppendPutWithLease("key", "value", 123, 1, 2, nil, errors.New("failed"))
h.AppendPutWithLease("key", "value", 321, 3, 4, &clientv3.PutResponse{}, nil)
},
persistedRequest: []model.EtcdRequest{
putRequestWithLease("key", "value", 123),
putRequestWithLease("key", "value", 321),
},
watchOperations: watchResponse(3, putEvent("key", "value", 2), putEvent("key", "value", 3)),
expectedRemainingOperations: []porcupine.Operation{
{Return: 1000000004, Output: model.MaybeEtcdResponse{Error: "failed"}},
{Return: 4, Output: putResponse(model.EtcdOperationResult{})},
},
},
{
name: "failed put is dropped",
historyFunc: func(h *model.AppendableHistory) {
Expand Down
Loading

0 comments on commit fe45307

Please sign in to comment.