-
Notifications
You must be signed in to change notification settings - Fork 875
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Batch task queue user data persistence updates #7039
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally lgtm, but someone else should review line-by-line.
if err != nil { | ||
return err | ||
if m.Db.IsDupEntryError(err) { | ||
return &persistence.ConditionFailedError{Msg: err.Error()} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with this type of error handling you will end up with "partially updated" state, but you don't really know which one are passed.
(unless I miss something).
Is there a reason to stop on error? or may it make sense to move forward, and return an array of failed updates?
if err != nil { | ||
return gocql.ConvertError("UpdateTaskQueueUserData", err) | ||
} | ||
defer iter.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a bit confused about this.
In previous iteration code closes iter and process error.
You change it to defer, and remove error check.
so even if iter.err is not nill (that is what iter.Close() returns) you still execute that if !applied {
section.
Which is different from what it was before.
Intended?
for taskQueue, update := range request.Updates { | ||
userData, err := m.serializer.TaskQueueUserDataToBlob(update.UserData.Data, enumspb.ENCODING_TYPE_PROTO3) | ||
if err != nil { | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here. are you sure you want to break this iterator for a single failure? Or just drop this specific update?
@@ -0,0 +1,176 @@ | |||
// The MIT License | |||
// | |||
// Copyright (c) 2024 Temporal Technologies Inc. All rights reserved. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2025 :)
|
||
// NewBatcher creates a Batcher. `fn` is the processing function, `opts` are the timing options. | ||
// `clock` is usually clock.NewRealTimeSource but can be a fake time source for testing. | ||
func NewBatcher[T, R any](fn func([]T) R, opts BatcherOptions, clock clock.TimeSource) *Batcher[T, R] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timeSource instead of "clock" as a variable name.
type Batcher[T, R any] struct { | ||
fn func([]T) R // batch executor function | ||
opts BatcherOptions // timing/size options | ||
clock clock.TimeSource // clock for testing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timeSource instead of clock, because it is actually TimeSource.
return err | ||
} | ||
tlm := m.getQueueManager(dbq) | ||
tlm.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may be out of depth here - will this work? I think you want lock/unlock per iteration?
// try to add more items. stop after a gap of MaxGap, total time of MaxTotalWait, or | ||
// MaxItems items. | ||
maxWaitC, maxWaitT := s.clock.NewTimer(s.opts.MaxDelay) | ||
loop: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand all the possible implications, so I guess I will trust this work and covered by tests.
What changed?
Multiple user data updates coming in for task queues in the same namespace within a short period of time get batched into a smaller number of persistence operations.
Why?
With deployments, we sometimes have to update user data on multiple task queues at once (all in the same namespace), and on cassandra, these updates go through an LWT. This could cause a backup since the throughput of LWTs is fairly low.
This change allows batching of multiple updates in one persistence operation (LWT on cassandra or transaction on sql). The batching is transparent: updates that come in within a short period of time automatically get batched (in matching engine).
How did you test it?
Potential risks