Skip to content

Commit

Permalink
Deny ReusePolicy RejectDuplicate for ConflictPolicy TerminateExisting (
Browse files Browse the repository at this point in the history
…#7099)

## What changed?
<!-- Describe what has changed in this PR -->

Making the combination of WorkflowIdReusePolicy RejectDuplicate and
WorkflowIdConflictPolicy TerminateExisting invalid.

## Why?
<!-- Tell your future self why have you made these changes -->

The combination is not a desirable configuration for users. It's
essentially "Terminate Workflow", ie no start would ever be performed.

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->

Added test.

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->

This _could_ break a user's behavior. Although it's unlikely. But a
dynamic config was added just in case.

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
  • Loading branch information
stephanos authored Jan 17, 2025
1 parent 984f1a5 commit 00071df
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 81 deletions.
7 changes: 7 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2082,6 +2082,13 @@ the user has not specified an explicit RetryPolicy`,
retrypolicy.DefaultDefaultRetrySettings,
`DefaultWorkflowRetryPolicy represents the out-of-box retry policy for unset fields
where the user has set an explicit RetryPolicy, but not specified all the fields`,
)
FollowReusePolicyAfterConflictPolicyTerminate = NewNamespaceTypedSetting(
"history.followReusePolicyAfterConflictPolicyTerminate",
true,
`Follows WorkflowIdReusePolicy RejectDuplicate and AllowDuplicateFailedOnly after WorkflowIdReusePolicy TerminateExisting was applied.
If true (the default), RejectDuplicate is disallowed and AllowDuplicateFailedOnly will be honored after TerminateExisting is applied.
This configuration will be become the default behavior in the next release and removed subsequently.`,
)
AllowResetWithPendingChildren = NewNamespaceBoolSetting(
"history.allowResetWithPendingChildren",
Expand Down
3 changes: 2 additions & 1 deletion service/frontend/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ var (
errUseVersioningWithoutBuildId = serviceerror.NewInvalidArgument("WorkerVersionStamp must be present if UseVersioning is true.")
errUseVersioningWithoutNormalName = serviceerror.NewInvalidArgument("NormalName must be set on sticky queue if UseVersioning is true.")
errBuildIdTooLong = serviceerror.NewInvalidArgument("Build ID exceeds configured limit.workerBuildIdSize, use a shorter build ID.")
errIncompatibleIDReusePolicy = serviceerror.NewInvalidArgument("Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING cannot be used together with a WorkflowIDConflictPolicy.")
errIncompatibleIDReusePolicyTerminateIfRunning = serviceerror.NewInvalidArgument("Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING cannot be used together with a WorkflowIDConflictPolicy")
errIncompatibleIDReusePolicyRejectDuplicate = serviceerror.NewInvalidArgument("Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE cannot be used together with WorkflowIdConflictPolicy WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING")
errUseEnhancedDescribeOnStickyQueue = serviceerror.NewInvalidArgument("Enhanced DescribeTaskQueue is not valid for a sticky queue, use api_mode=UNSPECIFIED or a normal queue.")
errUseEnhancedDescribeOnNonRootQueue = serviceerror.NewInvalidArgument("Enhanced DescribeTaskQueue is not valid for non-root queue partitions, use api_mode=UNSPECIFIED or a normal queue root name.")
errTaskQueuePartitionInvalid = serviceerror.NewInvalidArgument("Task Queue Partition invalid, use a different Task Queue or Task Queue Type")
Expand Down
67 changes: 35 additions & 32 deletions service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ type Config struct {
// specified RetryPolicy
DefaultWorkflowRetryPolicy dynamicconfig.TypedPropertyFnWithNamespaceFilter[retrypolicy.DefaultRetrySettings]

FollowReusePolicyAfterConflictPolicyTerminate dynamicconfig.TypedPropertyFnWithNamespaceFilter[bool]

// VisibilityArchival system protection
VisibilityArchivalQueryMaxPageSize dynamicconfig.IntPropertyFn

Expand Down Expand Up @@ -276,38 +278,39 @@ func NewConfig(
InternalFEGlobalNamespaceVisibilityRPS: dynamicconfig.InternalFrontendGlobalNamespaceVisibilityRPS.Get(dc),
// Overshoot since these low rate limits don't work well in an uncoordinated global limiter.
GlobalNamespaceNamespaceReplicationInducingAPIsRPS: dynamicconfig.FrontendGlobalNamespaceNamespaceReplicationInducingAPIsRPS.Get(dc),
MaxIDLengthLimit: dynamicconfig.MaxIDLengthLimit.Get(dc),
WorkerBuildIdSizeLimit: dynamicconfig.WorkerBuildIdSizeLimit.Get(dc),
ReachabilityTaskQueueScanLimit: dynamicconfig.ReachabilityTaskQueueScanLimit.Get(dc),
ReachabilityQueryBuildIdLimit: dynamicconfig.ReachabilityQueryBuildIdLimit.Get(dc),
ReachabilityCacheOpenWFsTTL: dynamicconfig.ReachabilityCacheOpenWFsTTL.Get(dc),
ReachabilityCacheClosedWFsTTL: dynamicconfig.ReachabilityCacheClosedWFsTTL.Get(dc),
ReachabilityQuerySetDurationSinceDefault: dynamicconfig.ReachabilityQuerySetDurationSinceDefault.Get(dc),
MaxBadBinaries: dynamicconfig.FrontendMaxBadBinaries.Get(dc),
DisableListVisibilityByFilter: dynamicconfig.DisableListVisibilityByFilter.Get(dc),
BlobSizeLimitError: dynamicconfig.BlobSizeLimitError.Get(dc),
BlobSizeLimitWarn: dynamicconfig.BlobSizeLimitWarn.Get(dc),
ThrottledLogRPS: dynamicconfig.FrontendThrottledLogRPS.Get(dc),
ShutdownDrainDuration: dynamicconfig.FrontendShutdownDrainDuration.Get(dc),
ShutdownFailHealthCheckDuration: dynamicconfig.FrontendShutdownFailHealthCheckDuration.Get(dc),
EnableNamespaceNotActiveAutoForwarding: dynamicconfig.EnableNamespaceNotActiveAutoForwarding.Get(dc),
SearchAttributesNumberOfKeysLimit: dynamicconfig.SearchAttributesNumberOfKeysLimit.Get(dc),
SearchAttributesSizeOfValueLimit: dynamicconfig.SearchAttributesSizeOfValueLimit.Get(dc),
SearchAttributesTotalSizeLimit: dynamicconfig.SearchAttributesTotalSizeLimit.Get(dc),
VisibilityArchivalQueryMaxPageSize: dynamicconfig.VisibilityArchivalQueryMaxPageSize.Get(dc),
DisallowQuery: dynamicconfig.DisallowQuery.Get(dc),
SendRawWorkflowHistory: dynamicconfig.SendRawWorkflowHistory.Get(dc),
DefaultWorkflowRetryPolicy: dynamicconfig.DefaultWorkflowRetryPolicy.Get(dc),
DefaultWorkflowTaskTimeout: dynamicconfig.DefaultWorkflowTaskTimeout.Get(dc),
EnableServerVersionCheck: dynamicconfig.EnableServerVersionCheck.Get(dc),
EnableTokenNamespaceEnforcement: dynamicconfig.EnableTokenNamespaceEnforcement.Get(dc),
KeepAliveMinTime: dynamicconfig.KeepAliveMinTime.Get(dc),
KeepAlivePermitWithoutStream: dynamicconfig.KeepAlivePermitWithoutStream.Get(dc),
KeepAliveMaxConnectionIdle: dynamicconfig.KeepAliveMaxConnectionIdle.Get(dc),
KeepAliveMaxConnectionAge: dynamicconfig.KeepAliveMaxConnectionAge.Get(dc),
KeepAliveMaxConnectionAgeGrace: dynamicconfig.KeepAliveMaxConnectionAgeGrace.Get(dc),
KeepAliveTime: dynamicconfig.KeepAliveTime.Get(dc),
KeepAliveTimeout: dynamicconfig.KeepAliveTimeout.Get(dc),
MaxIDLengthLimit: dynamicconfig.MaxIDLengthLimit.Get(dc),
WorkerBuildIdSizeLimit: dynamicconfig.WorkerBuildIdSizeLimit.Get(dc),
ReachabilityTaskQueueScanLimit: dynamicconfig.ReachabilityTaskQueueScanLimit.Get(dc),
ReachabilityQueryBuildIdLimit: dynamicconfig.ReachabilityQueryBuildIdLimit.Get(dc),
ReachabilityCacheOpenWFsTTL: dynamicconfig.ReachabilityCacheOpenWFsTTL.Get(dc),
ReachabilityCacheClosedWFsTTL: dynamicconfig.ReachabilityCacheClosedWFsTTL.Get(dc),
ReachabilityQuerySetDurationSinceDefault: dynamicconfig.ReachabilityQuerySetDurationSinceDefault.Get(dc),
MaxBadBinaries: dynamicconfig.FrontendMaxBadBinaries.Get(dc),
DisableListVisibilityByFilter: dynamicconfig.DisableListVisibilityByFilter.Get(dc),
BlobSizeLimitError: dynamicconfig.BlobSizeLimitError.Get(dc),
BlobSizeLimitWarn: dynamicconfig.BlobSizeLimitWarn.Get(dc),
ThrottledLogRPS: dynamicconfig.FrontendThrottledLogRPS.Get(dc),
ShutdownDrainDuration: dynamicconfig.FrontendShutdownDrainDuration.Get(dc),
ShutdownFailHealthCheckDuration: dynamicconfig.FrontendShutdownFailHealthCheckDuration.Get(dc),
EnableNamespaceNotActiveAutoForwarding: dynamicconfig.EnableNamespaceNotActiveAutoForwarding.Get(dc),
SearchAttributesNumberOfKeysLimit: dynamicconfig.SearchAttributesNumberOfKeysLimit.Get(dc),
SearchAttributesSizeOfValueLimit: dynamicconfig.SearchAttributesSizeOfValueLimit.Get(dc),
SearchAttributesTotalSizeLimit: dynamicconfig.SearchAttributesTotalSizeLimit.Get(dc),
VisibilityArchivalQueryMaxPageSize: dynamicconfig.VisibilityArchivalQueryMaxPageSize.Get(dc),
DisallowQuery: dynamicconfig.DisallowQuery.Get(dc),
SendRawWorkflowHistory: dynamicconfig.SendRawWorkflowHistory.Get(dc),
DefaultWorkflowRetryPolicy: dynamicconfig.DefaultWorkflowRetryPolicy.Get(dc),
FollowReusePolicyAfterConflictPolicyTerminate: dynamicconfig.FollowReusePolicyAfterConflictPolicyTerminate.Get(dc),
DefaultWorkflowTaskTimeout: dynamicconfig.DefaultWorkflowTaskTimeout.Get(dc),
EnableServerVersionCheck: dynamicconfig.EnableServerVersionCheck.Get(dc),
EnableTokenNamespaceEnforcement: dynamicconfig.EnableTokenNamespaceEnforcement.Get(dc),
KeepAliveMinTime: dynamicconfig.KeepAliveMinTime.Get(dc),
KeepAlivePermitWithoutStream: dynamicconfig.KeepAlivePermitWithoutStream.Get(dc),
KeepAliveMaxConnectionIdle: dynamicconfig.KeepAliveMaxConnectionIdle.Get(dc),
KeepAliveMaxConnectionAge: dynamicconfig.KeepAliveMaxConnectionAge.Get(dc),
KeepAliveMaxConnectionAgeGrace: dynamicconfig.KeepAliveMaxConnectionAgeGrace.Get(dc),
KeepAliveTime: dynamicconfig.KeepAliveTime.Get(dc),
KeepAliveTimeout: dynamicconfig.KeepAliveTimeout.Get(dc),

DeleteNamespaceDeleteActivityRPS: dynamicconfig.DeleteNamespaceDeleteActivityRPS.Get(dc),
DeleteNamespacePageSize: dynamicconfig.DeleteNamespacePageSize.Get(dc),
Expand Down
103 changes: 57 additions & 46 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,34 +119,35 @@ type (
workflowservice.UnsafeWorkflowServiceServer
status int32

tokenSerializer common.TaskTokenSerializer
config *Config
versionChecker headers.VersionChecker
namespaceHandler *namespaceHandler
getDefaultWorkflowRetrySettings dynamicconfig.TypedPropertyFnWithNamespaceFilter[retrypolicy.DefaultRetrySettings]
visibilityMgr manager.VisibilityManager
logger log.Logger
throttledLogger log.Logger
persistenceExecutionName string
clusterMetadataManager persistence.ClusterMetadataManager
clusterMetadata cluster.Metadata
historyClient historyservice.HistoryServiceClient
matchingClient matchingservice.MatchingServiceClient
deploymentStoreClient deployment.DeploymentStoreClient
archiverProvider provider.ArchiverProvider
payloadSerializer serialization.Serializer
namespaceRegistry namespace.Registry
saMapperProvider searchattribute.MapperProvider
saProvider searchattribute.Provider
saValidator *searchattribute.Validator
archivalMetadata archiver.ArchivalMetadata
healthServer *health.Server
overrides *Overrides
membershipMonitor membership.Monitor
healthInterceptor *interceptor.HealthInterceptor
scheduleSpecBuilder *scheduler.SpecBuilder
outstandingPollers collection.SyncMap[string, collection.SyncMap[string, context.CancelFunc]]
httpEnabled bool
tokenSerializer common.TaskTokenSerializer
config *Config
versionChecker headers.VersionChecker
namespaceHandler *namespaceHandler
getDefaultWorkflowRetrySettings dynamicconfig.TypedPropertyFnWithNamespaceFilter[retrypolicy.DefaultRetrySettings]
followReusePolicyAfterConflictPolicyTerminate dynamicconfig.TypedPropertyFnWithNamespaceFilter[bool]
visibilityMgr manager.VisibilityManager
logger log.Logger
throttledLogger log.Logger
persistenceExecutionName string
clusterMetadataManager persistence.ClusterMetadataManager
clusterMetadata cluster.Metadata
historyClient historyservice.HistoryServiceClient
matchingClient matchingservice.MatchingServiceClient
deploymentStoreClient deployment.DeploymentStoreClient
archiverProvider provider.ArchiverProvider
payloadSerializer serialization.Serializer
namespaceRegistry namespace.Registry
saMapperProvider searchattribute.MapperProvider
saProvider searchattribute.Provider
saValidator *searchattribute.Validator
archivalMetadata archiver.ArchivalMetadata
healthServer *health.Server
overrides *Overrides
membershipMonitor membership.Monitor
healthInterceptor *interceptor.HealthInterceptor
scheduleSpecBuilder *scheduler.SpecBuilder
outstandingPollers collection.SyncMap[string, collection.SyncMap[string, context.CancelFunc]]
httpEnabled bool
}
)

Expand Down Expand Up @@ -177,7 +178,6 @@ func NewWorkflowHandler(
scheduleSpecBuilder *scheduler.SpecBuilder,
httpEnabled bool,
) *WorkflowHandler {

handler := &WorkflowHandler{
status: common.DaemonStatusInitialized,
config: config,
Expand All @@ -193,21 +193,22 @@ func NewWorkflowHandler(
timeSource,
config,
),
getDefaultWorkflowRetrySettings: config.DefaultWorkflowRetryPolicy,
visibilityMgr: visibilityMgr,
logger: logger,
throttledLogger: throttledLogger,
persistenceExecutionName: persistenceExecutionName,
clusterMetadataManager: clusterMetadataManager,
clusterMetadata: clusterMetadata,
historyClient: historyClient,
matchingClient: matchingClient,
deploymentStoreClient: deploymentStoreClient,
archiverProvider: archiverProvider,
payloadSerializer: payloadSerializer,
namespaceRegistry: namespaceRegistry,
saProvider: saProvider,
saMapperProvider: saMapperProvider,
getDefaultWorkflowRetrySettings: config.DefaultWorkflowRetryPolicy,
followReusePolicyAfterConflictPolicyTerminate: config.FollowReusePolicyAfterConflictPolicyTerminate,
visibilityMgr: visibilityMgr,
logger: logger,
throttledLogger: throttledLogger,
persistenceExecutionName: persistenceExecutionName,
clusterMetadataManager: clusterMetadataManager,
clusterMetadata: clusterMetadata,
historyClient: historyClient,
matchingClient: matchingClient,
deploymentStoreClient: deploymentStoreClient,
archiverProvider: archiverProvider,
payloadSerializer: payloadSerializer,
namespaceRegistry: namespaceRegistry,
saProvider: saProvider,
saMapperProvider: saMapperProvider,
saValidator: searchattribute.NewValidator(
saProvider,
saMapperProvider,
Expand Down Expand Up @@ -468,7 +469,10 @@ func (wh *WorkflowHandler) prepareStartWorkflowRequest(
return nil, err
}

if err := wh.validateWorkflowIdReusePolicy(request.WorkflowIdReusePolicy, request.WorkflowIdConflictPolicy); err != nil {
if err := wh.validateWorkflowIdReusePolicy(
namespaceName,
request.WorkflowIdReusePolicy,
request.WorkflowIdConflictPolicy); err != nil {
return nil, err
}

Expand Down Expand Up @@ -1982,6 +1986,7 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(ctx context.Context,
}

if err := wh.validateWorkflowIdReusePolicy(
namespaceName,
request.WorkflowIdReusePolicy,
request.WorkflowIdConflictPolicy,
); err != nil {
Expand Down Expand Up @@ -4783,12 +4788,18 @@ func (wh *WorkflowHandler) validateVersionRuleBuildId(request *workflowservice.U
}

func (wh *WorkflowHandler) validateWorkflowIdReusePolicy(
namespaceName namespace.Name,
reusePolicy enumspb.WorkflowIdReusePolicy,
conflictPolicy enumspb.WorkflowIdConflictPolicy,
) error {
if conflictPolicy != enumspb.WORKFLOW_ID_CONFLICT_POLICY_UNSPECIFIED &&
reusePolicy == enumspb.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING {
return errIncompatibleIDReusePolicy
return errIncompatibleIDReusePolicyTerminateIfRunning
}
if conflictPolicy == enumspb.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING &&
reusePolicy == enumspb.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE &&
wh.followReusePolicyAfterConflictPolicyTerminate(namespaceName.String()) {
return errIncompatibleIDReusePolicyRejectDuplicate
}
return nil
}
Expand Down
32 changes: 30 additions & 2 deletions service/frontend/workflow_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,35 @@ func (s *WorkflowHandlerSuite) TestStartWorkflowExecution_InvalidWorkflowIdReuse

s.Nil(resp)
s.Equal(err, serviceerror.NewInvalidArgument(
"Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING cannot be used together with a WorkflowIDConflictPolicy."))
"Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING cannot be used together with a WorkflowIDConflictPolicy"))
}

func (s *WorkflowHandlerSuite) TestStartWorkflowExecution_InvalidWorkflowIdReusePolicy_RejectDuplicate() {
req := &workflowservice.StartWorkflowExecutionRequest{
WorkflowId: testWorkflowID,
WorkflowType: &commonpb.WorkflowType{Name: "WORKFLOW"},
TaskQueue: &taskqueuepb.TaskQueue{Name: "TASK_QUEUE", Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
WorkflowIdReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE,
WorkflowIdConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING,
}

// by default, disallow
config := s.newConfig()
wh := s.getWorkflowHandler(config)
resp, err := wh.StartWorkflowExecution(context.Background(), req)
s.Nil(resp)
s.Equal(err, serviceerror.NewInvalidArgument(
"Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE cannot be used together with WorkflowIdConflictPolicy WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING"))

// allow if explicitly allowed
s.mockSearchAttributesMapperProvider.EXPECT().GetMapper(gomock.Any()).Return(nil, nil)
s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Any()).Return(namespace.NewID(), nil)
s.mockHistoryClient.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any()).Return(&historyservice.StartWorkflowExecutionResponse{Started: true}, nil)

config.FollowReusePolicyAfterConflictPolicyTerminate = dc.GetBoolPropertyFnFilteredByNamespace(false)
wh = s.getWorkflowHandler(config)
_, err = wh.StartWorkflowExecution(context.Background(), req)
s.NoError(err)
}

func (s *WorkflowHandlerSuite) TestStartWorkflowExecution_DefaultWorkflowIdDuplicationPolicies() {
Expand Down Expand Up @@ -829,7 +857,7 @@ func (s *WorkflowHandlerSuite) TestSignalWithStartWorkflowExecution_InvalidWorkf

s.Nil(resp)
s.Equal(err, serviceerror.NewInvalidArgument(
"Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING cannot be used together with a WorkflowIDConflictPolicy."))
"Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING cannot be used together with a WorkflowIDConflictPolicy"))
}

func (s *WorkflowHandlerSuite) TestSignalWithStartWorkflowExecution_DefaultWorkflowIdDuplicationPolicies() {
Expand Down

0 comments on commit 00071df

Please sign in to comment.