Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added logging to task queue user data propagation #6456

Merged
merged 5 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,10 @@ func BuildId(buildId string) ZapTag {
return NewStringTag("build-id", buildId)
}

func UserDataVersion(v int64) ZapTag {
return NewInt64("user-data-version", v)
}

func Cause(cause string) ZapTag {
return NewStringTag("cause", cause)
}
2 changes: 2 additions & 0 deletions service/matching/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ type (

GetUserDataLongPollTimeout dynamicconfig.DurationPropertyFn
GetUserDataMinWaitTime time.Duration
GetUserDataReturnBudget time.Duration

// taskWriter configuration
OutstandingTaskAppendsThreshold func() int
Expand Down Expand Up @@ -298,6 +299,7 @@ func newTaskQueueConfig(tq *tqid.TaskQueue, config *Config, ns namespace.Name) *
},
GetUserDataLongPollTimeout: config.GetUserDataLongPollTimeout,
GetUserDataMinWaitTime: 1 * time.Second,
GetUserDataReturnBudget: returnEmptyTaskTimeBudget,
OutstandingTaskAppendsThreshold: func() int {
return config.OutstandingTaskAppendsThreshold(ns.String(), taskQueueName, taskType)
},
Expand Down
42 changes: 1 addition & 41 deletions service/matching/matching_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1461,51 +1461,11 @@ func (e *matchingEngineImpl) GetTaskQueueUserData(
if err != nil {
return nil, err
}
version := req.GetLastKnownUserDataVersion()
if version < 0 {
return nil, serviceerror.NewInvalidArgument("last_known_user_data_version must not be negative")
}

if req.WaitNewData {
var cancel context.CancelFunc
ctx, cancel = newChildContext(ctx, e.config.GetUserDataLongPollTimeout(), returnEmptyTaskTimeBudget)
defer cancel()
// mark alive so that it doesn't unload while a child partition is doing a long poll
pm.MarkAlive()
}

for {
resp := &matchingservice.GetTaskQueueUserDataResponse{}
userData, userDataChanged, err := pm.GetUserDataManager().GetUserData()
if errors.Is(err, errTaskQueueClosed) {
// If we're closing, return a success with no data, as if the request expired. We shouldn't
// close due to idleness (because of the MarkAlive above), so we're probably closing due to a
// change of ownership. The caller will retry and be redirected to the new owner.
return resp, nil
} else if err != nil {
return nil, err
}
if req.WaitNewData && userData.GetVersion() == version {
// long-poll: wait for data to change/appear
select {
case <-ctx.Done():
return resp, nil
case <-userDataChanged:
continue
}
}
if userData != nil {
if userData.Version > version {
resp.UserData = userData
} else if userData.Version < version {
// This is highly unlikely but may happen due to an edge case in during ownership transfer.
// We rely on client retries in this case to let the system eventually self-heal.
return nil, serviceerror.NewInvalidArgument(
"requested task queue user data for version greater than known version")
}
}
return resp, nil
}
return pm.GetUserDataManager().HandleGetUserDataRequest(ctx, req)
}

func (e *matchingEngineImpl) ApplyTaskQueueUserDataReplicationEvent(
Expand Down
6 changes: 6 additions & 0 deletions service/matching/task_queue_partition_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import (
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"

"go.temporal.io/server/api/matchingservice/v1"
"go.temporal.io/server/api/matchingservicemock/v1"
"go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/api/taskqueue/v1"
Expand Down Expand Up @@ -588,6 +590,10 @@ func (m *mockUserDataManager) UpdateUserData(_ context.Context, _ UserDataUpdate
return nil
}

func (m *mockUserDataManager) HandleGetUserDataRequest(ctx context.Context, req *matchingservice.GetTaskQueueUserDataRequest) (*matchingservice.GetTaskQueueUserDataResponse, error) {
panic("unused")
}

func (m *mockUserDataManager) updateVersioningData(data *persistence.VersioningData) {
m.Lock()
defer m.Unlock()
Expand Down
102 changes: 98 additions & 4 deletions service/matching/user_data_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/clock/hybrid_logical_clock"
"go.temporal.io/server/common/future"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log"
Expand Down Expand Up @@ -66,6 +67,8 @@ type (
// UpdateUserData updates user data for this task queue and replicates across clusters if necessary.
// Extra care should be taken to avoid mutating the existing data in the update function.
UpdateUserData(ctx context.Context, options UserDataUpdateOptions, updateFn UserDataUpdateFunc) error
// Handles the maybe-long-poll GetUserData RPC.
HandleGetUserDataRequest(ctx context.Context, req *matchingservice.GetTaskQueueUserDataRequest) (*matchingservice.GetTaskQueueUserDataResponse, error)
}

UserDataUpdateOptions struct {
Expand Down Expand Up @@ -276,6 +279,10 @@ func (m *userDataManagerImpl) fetchUserData(ctx context.Context) error {
WaitNewData: hasFetchedUserData,
})
if err != nil {
// don't log on context canceled, produces too much log spam at shutdown
if !common.IsContextCanceledErr(err) {
m.logger.Error("error fetching user data from parent", tag.Error(err))
}
var unimplErr *serviceerror.Unimplemented
if errors.As(err, &unimplErr) {
// This might happen during a deployment. The older version couldn't have had any user data,
Expand All @@ -292,6 +299,9 @@ func (m *userDataManagerImpl) fetchUserData(ctx context.Context) error {
// nil inner fields.
if res.GetUserData() != nil {
m.setUserDataForNonOwningPartition(res.GetUserData())
m.logNewUserData("fetched user data from parent", res.GetUserData())
} else {
m.logger.Debug("fetched user data from parent, no change")
}
hasFetchedUserData = true
m.setUserDataState(userDataEnabled, nil)
Expand Down Expand Up @@ -339,6 +349,7 @@ func (m *userDataManagerImpl) loadUserDataFromDB(ctx context.Context) error {
m.lock.Lock()
defer m.lock.Unlock()
m.setUserDataLocked(response.UserData)
m.logNewUserData("loaded user data from db", response.UserData)

return nil
}
Expand All @@ -349,6 +360,9 @@ func (m *userDataManagerImpl) UpdateUserData(ctx context.Context, options UserDa
if m.store == nil {
return errUserDataNoMutateNonRoot
}
if err := m.WaitUntilInitialized(ctx); err != nil {
return err
}
newData, shouldReplicate, err := m.updateUserData(ctx, updateFn, options.KnownVersion, options.TaskQueueLimitPerBuildId)
if err != nil {
return err
Expand Down Expand Up @@ -413,6 +427,7 @@ func (m *userDataManagerImpl) updateUserData(
}
updatedUserData, shouldReplicate, err := updateFn(preUpdateData)
if err != nil {
m.logger.Error("user data update function failed", tag.Error(err))
return nil, false, err
}

Expand Down Expand Up @@ -441,14 +456,86 @@ func (m *userDataManagerImpl) updateUserData(
BuildIdsAdded: added,
BuildIdsRemoved: removed,
})
var updatedVersionedData *persistencespb.VersionedTaskQueueUserData
if err == nil {
updatedVersionedData = &persistencespb.VersionedTaskQueueUserData{Version: preUpdateVersion + 1, Data: updatedUserData}
m.setUserDataLocked(updatedVersionedData)
if err != nil {
m.logger.Error("failed to push new user data to owning matching node for namespace", tag.Error(err))
return nil, false, err
}

updatedVersionedData := &persistencespb.VersionedTaskQueueUserData{Version: preUpdateVersion + 1, Data: updatedUserData}
m.logNewUserData("modified user data", updatedVersionedData)
m.setUserDataLocked(updatedVersionedData)

return updatedVersionedData, shouldReplicate, err
}

func (m *userDataManagerImpl) HandleGetUserDataRequest(
ctx context.Context,
req *matchingservice.GetTaskQueueUserDataRequest,
) (*matchingservice.GetTaskQueueUserDataResponse, error) {
version := req.GetLastKnownUserDataVersion()
if version < 0 {
return nil, serviceerror.NewInvalidArgument("last_known_user_data_version must not be negative")
}

if req.WaitNewData {
var cancel context.CancelFunc
ctx, cancel = newChildContext(ctx, m.config.GetUserDataLongPollTimeout(), m.config.GetUserDataReturnBudget)
defer cancel()
}

for {
resp := &matchingservice.GetTaskQueueUserDataResponse{}
userData, userDataChanged, err := m.GetUserData()
if errors.Is(err, errTaskQueueClosed) {
// If we're closing, return a success with no data, as if the request expired. We shouldn't
// close due to idleness (because of the MarkAlive above), so we're probably closing due to a
// change of ownership. The caller will retry and be redirected to the new owner.
m.logger.Debug("returning empty user data (closing)", tag.NewBoolTag("long-poll", req.WaitNewData))
return resp, nil
} else if err != nil {
return nil, err
}
if req.WaitNewData && userData.GetVersion() == version {
// long-poll: wait for data to change/appear
select {
case <-ctx.Done():
m.logger.Debug("returning empty user data (expired)",
tag.NewBoolTag("long-poll", req.WaitNewData),
tag.NewInt64("request-known-version", version),
tag.UserDataVersion(userData.GetVersion()),
)
return resp, nil
case <-userDataChanged:
m.logger.Debug("user data changed while blocked in long poll")
continue
}
}
if userData != nil {
if userData.Version > version {
resp.UserData = userData
m.logger.Info("returning user data",
tag.NewBoolTag("long-poll", req.WaitNewData),
tag.NewInt64("request-known-version", version),
tag.UserDataVersion(userData.Version),
)
} else if userData.Version < version {
// This is highly unlikely but may happen due to an edge case in during ownership transfer.
// We rely on client retries in this case to let the system eventually self-heal.
m.logger.Error("requested task queue user data for version greater than known version",
tag.NewInt64("request-known-version", version),
tag.UserDataVersion(userData.Version),
)
return nil, serviceerror.NewInvalidArgument(
"requested task queue user data for version greater than known version")
}
} else {
m.logger.Debug("returning empty user data (no data)", tag.NewBoolTag("long-poll", req.WaitNewData))
}
return resp, nil
}

}

func (m *userDataManagerImpl) setUserDataForNonOwningPartition(userData *persistencespb.VersionedTaskQueueUserData) {
m.lock.Lock()
defer m.lock.Unlock()
Expand All @@ -459,3 +546,10 @@ func (m *userDataManagerImpl) callerInfoContext(ctx context.Context) context.Con
ns, _ := m.namespaceRegistry.GetNamespaceName(namespace.ID(m.partition.NamespaceId()))
return headers.SetCallerInfo(ctx, headers.NewBackgroundCallerInfo(ns.String()))
}

func (m *userDataManagerImpl) logNewUserData(message string, data *persistencespb.VersionedTaskQueueUserData) {
m.logger.Info(message,
tag.UserDataVersion(data.GetVersion()),
tag.Timestamp(hybrid_logical_clock.UTC(data.GetData().GetClock())),
)
}
Loading
Loading