Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch task queue user data persistence updates #7039

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

dnr
Copy link
Member

@dnr dnr commented Dec 30, 2024

What changed?

Multiple user data updates coming in for task queues in the same namespace within a short period of time get batched into a smaller number of persistence operations.

Why?

With deployments, we sometimes have to update user data on multiple task queues at once (all in the same namespace), and on cassandra, these updates go through an LWT. This could cause a backup since the throughput of LWTs is fairly low.

This change allows batching of multiple updates in one persistence operation (LWT on cassandra or transaction on sql). The batching is transparent: updates that come in within a short period of time automatically get batched (in matching engine).

How did you test it?

  • unit test for batcher componennt
  • existing tests for user data updates

Potential risks

  • small extra latency on all user data updates
  • retries/conflicts are not handled well yet: on a version conflict, all updates in the batch will fail and none will be retried, but the non-conflicting ones should be retried

@dnr dnr requested a review from a team as a code owner December 30, 2024 20:52
Copy link
Collaborator

@ShahabT ShahabT left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally lgtm, but someone else should review line-by-line.

if err != nil {
return err
if m.Db.IsDupEntryError(err) {
return &persistence.ConditionFailedError{Msg: err.Error()}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with this type of error handling you will end up with "partially updated" state, but you don't really know which one are passed.
(unless I miss something).
Is there a reason to stop on error? or may it make sense to move forward, and return an array of failed updates?

if err != nil {
return gocql.ConvertError("UpdateTaskQueueUserData", err)
}
defer iter.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a bit confused about this.
In previous iteration code closes iter and process error.
You change it to defer, and remove error check.
so even if iter.err is not nill (that is what iter.Close() returns) you still execute that if !applied { section.
Which is different from what it was before.
Intended?

for taskQueue, update := range request.Updates {
userData, err := m.serializer.TaskQueueUserDataToBlob(update.UserData.Data, enumspb.ENCODING_TYPE_PROTO3)
if err != nil {
return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here. are you sure you want to break this iterator for a single failure? Or just drop this specific update?

@@ -0,0 +1,176 @@
// The MIT License
//
// Copyright (c) 2024 Temporal Technologies Inc. All rights reserved.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2025 :)


// NewBatcher creates a Batcher. `fn` is the processing function, `opts` are the timing options.
// `clock` is usually clock.NewRealTimeSource but can be a fake time source for testing.
func NewBatcher[T, R any](fn func([]T) R, opts BatcherOptions, clock clock.TimeSource) *Batcher[T, R] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timeSource instead of "clock" as a variable name.

type Batcher[T, R any] struct {
fn func([]T) R // batch executor function
opts BatcherOptions // timing/size options
clock clock.TimeSource // clock for testing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timeSource instead of clock, because it is actually TimeSource.

return err
}
tlm := m.getQueueManager(dbq)
tlm.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be out of depth here - will this work? I think you want lock/unlock per iteration?

// try to add more items. stop after a gap of MaxGap, total time of MaxTotalWait, or
// MaxItems items.
maxWaitC, maxWaitT := s.clock.NewTimer(s.opts.MaxDelay)
loop:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand all the possible implications, so I guess I will trust this work and covered by tests.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants