diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 366fe1b5709..f793c037d18 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -2082,6 +2082,13 @@ the user has not specified an explicit RetryPolicy`, retrypolicy.DefaultDefaultRetrySettings, `DefaultWorkflowRetryPolicy represents the out-of-box retry policy for unset fields where the user has set an explicit RetryPolicy, but not specified all the fields`, + ) + FollowReusePolicyAfterConflictPolicyTerminate = NewNamespaceTypedSetting( + "history.followReusePolicyAfterConflictPolicyTerminate", + true, + `Follows WorkflowIdReusePolicy RejectDuplicate and AllowDuplicateFailedOnly after WorkflowIdReusePolicy TerminateExisting was applied. +If true (the default), RejectDuplicate is disallowed and AllowDuplicateFailedOnly will be honored after TerminateExisting is applied. +This configuration will be become the default behavior in the next release and removed subsequently.`, ) AllowResetWithPendingChildren = NewNamespaceBoolSetting( "history.allowResetWithPendingChildren", diff --git a/service/frontend/errors.go b/service/frontend/errors.go index bd450ab548c..aec0833cd38 100644 --- a/service/frontend/errors.go +++ b/service/frontend/errors.go @@ -85,7 +85,8 @@ var ( errUseVersioningWithoutBuildId = serviceerror.NewInvalidArgument("WorkerVersionStamp must be present if UseVersioning is true.") errUseVersioningWithoutNormalName = serviceerror.NewInvalidArgument("NormalName must be set on sticky queue if UseVersioning is true.") errBuildIdTooLong = serviceerror.NewInvalidArgument("Build ID exceeds configured limit.workerBuildIdSize, use a shorter build ID.") - errIncompatibleIDReusePolicy = serviceerror.NewInvalidArgument("Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING cannot be used together with a WorkflowIDConflictPolicy.") + errIncompatibleIDReusePolicyTerminateIfRunning = serviceerror.NewInvalidArgument("Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING cannot be used together with a WorkflowIDConflictPolicy") + errIncompatibleIDReusePolicyRejectDuplicate = serviceerror.NewInvalidArgument("Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE cannot be used together with WorkflowIdConflictPolicy WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING") errUseEnhancedDescribeOnStickyQueue = serviceerror.NewInvalidArgument("Enhanced DescribeTaskQueue is not valid for a sticky queue, use api_mode=UNSPECIFIED or a normal queue.") errUseEnhancedDescribeOnNonRootQueue = serviceerror.NewInvalidArgument("Enhanced DescribeTaskQueue is not valid for non-root queue partitions, use api_mode=UNSPECIFIED or a normal queue root name.") errTaskQueuePartitionInvalid = serviceerror.NewInvalidArgument("Task Queue Partition invalid, use a different Task Queue or Task Queue Type") diff --git a/service/frontend/service.go b/service/frontend/service.go index d6d9f01355d..da9e4424009 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -129,6 +129,8 @@ type Config struct { // specified RetryPolicy DefaultWorkflowRetryPolicy dynamicconfig.TypedPropertyFnWithNamespaceFilter[retrypolicy.DefaultRetrySettings] + FollowReusePolicyAfterConflictPolicyTerminate dynamicconfig.TypedPropertyFnWithNamespaceFilter[bool] + // VisibilityArchival system protection VisibilityArchivalQueryMaxPageSize dynamicconfig.IntPropertyFn @@ -276,38 +278,39 @@ func NewConfig( InternalFEGlobalNamespaceVisibilityRPS: dynamicconfig.InternalFrontendGlobalNamespaceVisibilityRPS.Get(dc), // Overshoot since these low rate limits don't work well in an uncoordinated global limiter. GlobalNamespaceNamespaceReplicationInducingAPIsRPS: dynamicconfig.FrontendGlobalNamespaceNamespaceReplicationInducingAPIsRPS.Get(dc), - MaxIDLengthLimit: dynamicconfig.MaxIDLengthLimit.Get(dc), - WorkerBuildIdSizeLimit: dynamicconfig.WorkerBuildIdSizeLimit.Get(dc), - ReachabilityTaskQueueScanLimit: dynamicconfig.ReachabilityTaskQueueScanLimit.Get(dc), - ReachabilityQueryBuildIdLimit: dynamicconfig.ReachabilityQueryBuildIdLimit.Get(dc), - ReachabilityCacheOpenWFsTTL: dynamicconfig.ReachabilityCacheOpenWFsTTL.Get(dc), - ReachabilityCacheClosedWFsTTL: dynamicconfig.ReachabilityCacheClosedWFsTTL.Get(dc), - ReachabilityQuerySetDurationSinceDefault: dynamicconfig.ReachabilityQuerySetDurationSinceDefault.Get(dc), - MaxBadBinaries: dynamicconfig.FrontendMaxBadBinaries.Get(dc), - DisableListVisibilityByFilter: dynamicconfig.DisableListVisibilityByFilter.Get(dc), - BlobSizeLimitError: dynamicconfig.BlobSizeLimitError.Get(dc), - BlobSizeLimitWarn: dynamicconfig.BlobSizeLimitWarn.Get(dc), - ThrottledLogRPS: dynamicconfig.FrontendThrottledLogRPS.Get(dc), - ShutdownDrainDuration: dynamicconfig.FrontendShutdownDrainDuration.Get(dc), - ShutdownFailHealthCheckDuration: dynamicconfig.FrontendShutdownFailHealthCheckDuration.Get(dc), - EnableNamespaceNotActiveAutoForwarding: dynamicconfig.EnableNamespaceNotActiveAutoForwarding.Get(dc), - SearchAttributesNumberOfKeysLimit: dynamicconfig.SearchAttributesNumberOfKeysLimit.Get(dc), - SearchAttributesSizeOfValueLimit: dynamicconfig.SearchAttributesSizeOfValueLimit.Get(dc), - SearchAttributesTotalSizeLimit: dynamicconfig.SearchAttributesTotalSizeLimit.Get(dc), - VisibilityArchivalQueryMaxPageSize: dynamicconfig.VisibilityArchivalQueryMaxPageSize.Get(dc), - DisallowQuery: dynamicconfig.DisallowQuery.Get(dc), - SendRawWorkflowHistory: dynamicconfig.SendRawWorkflowHistory.Get(dc), - DefaultWorkflowRetryPolicy: dynamicconfig.DefaultWorkflowRetryPolicy.Get(dc), - DefaultWorkflowTaskTimeout: dynamicconfig.DefaultWorkflowTaskTimeout.Get(dc), - EnableServerVersionCheck: dynamicconfig.EnableServerVersionCheck.Get(dc), - EnableTokenNamespaceEnforcement: dynamicconfig.EnableTokenNamespaceEnforcement.Get(dc), - KeepAliveMinTime: dynamicconfig.KeepAliveMinTime.Get(dc), - KeepAlivePermitWithoutStream: dynamicconfig.KeepAlivePermitWithoutStream.Get(dc), - KeepAliveMaxConnectionIdle: dynamicconfig.KeepAliveMaxConnectionIdle.Get(dc), - KeepAliveMaxConnectionAge: dynamicconfig.KeepAliveMaxConnectionAge.Get(dc), - KeepAliveMaxConnectionAgeGrace: dynamicconfig.KeepAliveMaxConnectionAgeGrace.Get(dc), - KeepAliveTime: dynamicconfig.KeepAliveTime.Get(dc), - KeepAliveTimeout: dynamicconfig.KeepAliveTimeout.Get(dc), + MaxIDLengthLimit: dynamicconfig.MaxIDLengthLimit.Get(dc), + WorkerBuildIdSizeLimit: dynamicconfig.WorkerBuildIdSizeLimit.Get(dc), + ReachabilityTaskQueueScanLimit: dynamicconfig.ReachabilityTaskQueueScanLimit.Get(dc), + ReachabilityQueryBuildIdLimit: dynamicconfig.ReachabilityQueryBuildIdLimit.Get(dc), + ReachabilityCacheOpenWFsTTL: dynamicconfig.ReachabilityCacheOpenWFsTTL.Get(dc), + ReachabilityCacheClosedWFsTTL: dynamicconfig.ReachabilityCacheClosedWFsTTL.Get(dc), + ReachabilityQuerySetDurationSinceDefault: dynamicconfig.ReachabilityQuerySetDurationSinceDefault.Get(dc), + MaxBadBinaries: dynamicconfig.FrontendMaxBadBinaries.Get(dc), + DisableListVisibilityByFilter: dynamicconfig.DisableListVisibilityByFilter.Get(dc), + BlobSizeLimitError: dynamicconfig.BlobSizeLimitError.Get(dc), + BlobSizeLimitWarn: dynamicconfig.BlobSizeLimitWarn.Get(dc), + ThrottledLogRPS: dynamicconfig.FrontendThrottledLogRPS.Get(dc), + ShutdownDrainDuration: dynamicconfig.FrontendShutdownDrainDuration.Get(dc), + ShutdownFailHealthCheckDuration: dynamicconfig.FrontendShutdownFailHealthCheckDuration.Get(dc), + EnableNamespaceNotActiveAutoForwarding: dynamicconfig.EnableNamespaceNotActiveAutoForwarding.Get(dc), + SearchAttributesNumberOfKeysLimit: dynamicconfig.SearchAttributesNumberOfKeysLimit.Get(dc), + SearchAttributesSizeOfValueLimit: dynamicconfig.SearchAttributesSizeOfValueLimit.Get(dc), + SearchAttributesTotalSizeLimit: dynamicconfig.SearchAttributesTotalSizeLimit.Get(dc), + VisibilityArchivalQueryMaxPageSize: dynamicconfig.VisibilityArchivalQueryMaxPageSize.Get(dc), + DisallowQuery: dynamicconfig.DisallowQuery.Get(dc), + SendRawWorkflowHistory: dynamicconfig.SendRawWorkflowHistory.Get(dc), + DefaultWorkflowRetryPolicy: dynamicconfig.DefaultWorkflowRetryPolicy.Get(dc), + FollowReusePolicyAfterConflictPolicyTerminate: dynamicconfig.FollowReusePolicyAfterConflictPolicyTerminate.Get(dc), + DefaultWorkflowTaskTimeout: dynamicconfig.DefaultWorkflowTaskTimeout.Get(dc), + EnableServerVersionCheck: dynamicconfig.EnableServerVersionCheck.Get(dc), + EnableTokenNamespaceEnforcement: dynamicconfig.EnableTokenNamespaceEnforcement.Get(dc), + KeepAliveMinTime: dynamicconfig.KeepAliveMinTime.Get(dc), + KeepAlivePermitWithoutStream: dynamicconfig.KeepAlivePermitWithoutStream.Get(dc), + KeepAliveMaxConnectionIdle: dynamicconfig.KeepAliveMaxConnectionIdle.Get(dc), + KeepAliveMaxConnectionAge: dynamicconfig.KeepAliveMaxConnectionAge.Get(dc), + KeepAliveMaxConnectionAgeGrace: dynamicconfig.KeepAliveMaxConnectionAgeGrace.Get(dc), + KeepAliveTime: dynamicconfig.KeepAliveTime.Get(dc), + KeepAliveTimeout: dynamicconfig.KeepAliveTimeout.Get(dc), DeleteNamespaceDeleteActivityRPS: dynamicconfig.DeleteNamespaceDeleteActivityRPS.Get(dc), DeleteNamespacePageSize: dynamicconfig.DeleteNamespacePageSize.Get(dc), diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index 8c185df2764..56cd93f50fb 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -119,34 +119,35 @@ type ( workflowservice.UnsafeWorkflowServiceServer status int32 - tokenSerializer common.TaskTokenSerializer - config *Config - versionChecker headers.VersionChecker - namespaceHandler *namespaceHandler - getDefaultWorkflowRetrySettings dynamicconfig.TypedPropertyFnWithNamespaceFilter[retrypolicy.DefaultRetrySettings] - visibilityMgr manager.VisibilityManager - logger log.Logger - throttledLogger log.Logger - persistenceExecutionName string - clusterMetadataManager persistence.ClusterMetadataManager - clusterMetadata cluster.Metadata - historyClient historyservice.HistoryServiceClient - matchingClient matchingservice.MatchingServiceClient - deploymentStoreClient deployment.DeploymentStoreClient - archiverProvider provider.ArchiverProvider - payloadSerializer serialization.Serializer - namespaceRegistry namespace.Registry - saMapperProvider searchattribute.MapperProvider - saProvider searchattribute.Provider - saValidator *searchattribute.Validator - archivalMetadata archiver.ArchivalMetadata - healthServer *health.Server - overrides *Overrides - membershipMonitor membership.Monitor - healthInterceptor *interceptor.HealthInterceptor - scheduleSpecBuilder *scheduler.SpecBuilder - outstandingPollers collection.SyncMap[string, collection.SyncMap[string, context.CancelFunc]] - httpEnabled bool + tokenSerializer common.TaskTokenSerializer + config *Config + versionChecker headers.VersionChecker + namespaceHandler *namespaceHandler + getDefaultWorkflowRetrySettings dynamicconfig.TypedPropertyFnWithNamespaceFilter[retrypolicy.DefaultRetrySettings] + followReusePolicyAfterConflictPolicyTerminate dynamicconfig.TypedPropertyFnWithNamespaceFilter[bool] + visibilityMgr manager.VisibilityManager + logger log.Logger + throttledLogger log.Logger + persistenceExecutionName string + clusterMetadataManager persistence.ClusterMetadataManager + clusterMetadata cluster.Metadata + historyClient historyservice.HistoryServiceClient + matchingClient matchingservice.MatchingServiceClient + deploymentStoreClient deployment.DeploymentStoreClient + archiverProvider provider.ArchiverProvider + payloadSerializer serialization.Serializer + namespaceRegistry namespace.Registry + saMapperProvider searchattribute.MapperProvider + saProvider searchattribute.Provider + saValidator *searchattribute.Validator + archivalMetadata archiver.ArchivalMetadata + healthServer *health.Server + overrides *Overrides + membershipMonitor membership.Monitor + healthInterceptor *interceptor.HealthInterceptor + scheduleSpecBuilder *scheduler.SpecBuilder + outstandingPollers collection.SyncMap[string, collection.SyncMap[string, context.CancelFunc]] + httpEnabled bool } ) @@ -177,7 +178,6 @@ func NewWorkflowHandler( scheduleSpecBuilder *scheduler.SpecBuilder, httpEnabled bool, ) *WorkflowHandler { - handler := &WorkflowHandler{ status: common.DaemonStatusInitialized, config: config, @@ -193,21 +193,22 @@ func NewWorkflowHandler( timeSource, config, ), - getDefaultWorkflowRetrySettings: config.DefaultWorkflowRetryPolicy, - visibilityMgr: visibilityMgr, - logger: logger, - throttledLogger: throttledLogger, - persistenceExecutionName: persistenceExecutionName, - clusterMetadataManager: clusterMetadataManager, - clusterMetadata: clusterMetadata, - historyClient: historyClient, - matchingClient: matchingClient, - deploymentStoreClient: deploymentStoreClient, - archiverProvider: archiverProvider, - payloadSerializer: payloadSerializer, - namespaceRegistry: namespaceRegistry, - saProvider: saProvider, - saMapperProvider: saMapperProvider, + getDefaultWorkflowRetrySettings: config.DefaultWorkflowRetryPolicy, + followReusePolicyAfterConflictPolicyTerminate: config.FollowReusePolicyAfterConflictPolicyTerminate, + visibilityMgr: visibilityMgr, + logger: logger, + throttledLogger: throttledLogger, + persistenceExecutionName: persistenceExecutionName, + clusterMetadataManager: clusterMetadataManager, + clusterMetadata: clusterMetadata, + historyClient: historyClient, + matchingClient: matchingClient, + deploymentStoreClient: deploymentStoreClient, + archiverProvider: archiverProvider, + payloadSerializer: payloadSerializer, + namespaceRegistry: namespaceRegistry, + saProvider: saProvider, + saMapperProvider: saMapperProvider, saValidator: searchattribute.NewValidator( saProvider, saMapperProvider, @@ -468,7 +469,10 @@ func (wh *WorkflowHandler) prepareStartWorkflowRequest( return nil, err } - if err := wh.validateWorkflowIdReusePolicy(request.WorkflowIdReusePolicy, request.WorkflowIdConflictPolicy); err != nil { + if err := wh.validateWorkflowIdReusePolicy( + namespaceName, + request.WorkflowIdReusePolicy, + request.WorkflowIdConflictPolicy); err != nil { return nil, err } @@ -1982,6 +1986,7 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(ctx context.Context, } if err := wh.validateWorkflowIdReusePolicy( + namespaceName, request.WorkflowIdReusePolicy, request.WorkflowIdConflictPolicy, ); err != nil { @@ -4783,12 +4788,18 @@ func (wh *WorkflowHandler) validateVersionRuleBuildId(request *workflowservice.U } func (wh *WorkflowHandler) validateWorkflowIdReusePolicy( + namespaceName namespace.Name, reusePolicy enumspb.WorkflowIdReusePolicy, conflictPolicy enumspb.WorkflowIdConflictPolicy, ) error { if conflictPolicy != enumspb.WORKFLOW_ID_CONFLICT_POLICY_UNSPECIFIED && reusePolicy == enumspb.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING { - return errIncompatibleIDReusePolicy + return errIncompatibleIDReusePolicyTerminateIfRunning + } + if conflictPolicy == enumspb.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING && + reusePolicy == enumspb.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE && + wh.followReusePolicyAfterConflictPolicyTerminate(namespaceName.String()) { + return errIncompatibleIDReusePolicyRejectDuplicate } return nil } diff --git a/service/frontend/workflow_handler_test.go b/service/frontend/workflow_handler_test.go index d6420a64c81..19849bad2f7 100644 --- a/service/frontend/workflow_handler_test.go +++ b/service/frontend/workflow_handler_test.go @@ -642,7 +642,35 @@ func (s *WorkflowHandlerSuite) TestStartWorkflowExecution_InvalidWorkflowIdReuse s.Nil(resp) s.Equal(err, serviceerror.NewInvalidArgument( - "Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING cannot be used together with a WorkflowIDConflictPolicy.")) + "Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING cannot be used together with a WorkflowIDConflictPolicy")) +} + +func (s *WorkflowHandlerSuite) TestStartWorkflowExecution_InvalidWorkflowIdReusePolicy_RejectDuplicate() { + req := &workflowservice.StartWorkflowExecutionRequest{ + WorkflowId: testWorkflowID, + WorkflowType: &commonpb.WorkflowType{Name: "WORKFLOW"}, + TaskQueue: &taskqueuepb.TaskQueue{Name: "TASK_QUEUE", Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + WorkflowIdReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE, + WorkflowIdConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING, + } + + // by default, disallow + config := s.newConfig() + wh := s.getWorkflowHandler(config) + resp, err := wh.StartWorkflowExecution(context.Background(), req) + s.Nil(resp) + s.Equal(err, serviceerror.NewInvalidArgument( + "Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE cannot be used together with WorkflowIdConflictPolicy WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING")) + + // allow if explicitly allowed + s.mockSearchAttributesMapperProvider.EXPECT().GetMapper(gomock.Any()).Return(nil, nil) + s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Any()).Return(namespace.NewID(), nil) + s.mockHistoryClient.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any()).Return(&historyservice.StartWorkflowExecutionResponse{Started: true}, nil) + + config.FollowReusePolicyAfterConflictPolicyTerminate = dc.GetBoolPropertyFnFilteredByNamespace(false) + wh = s.getWorkflowHandler(config) + _, err = wh.StartWorkflowExecution(context.Background(), req) + s.NoError(err) } func (s *WorkflowHandlerSuite) TestStartWorkflowExecution_DefaultWorkflowIdDuplicationPolicies() { @@ -829,7 +857,7 @@ func (s *WorkflowHandlerSuite) TestSignalWithStartWorkflowExecution_InvalidWorkf s.Nil(resp) s.Equal(err, serviceerror.NewInvalidArgument( - "Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING cannot be used together with a WorkflowIDConflictPolicy.")) + "Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING cannot be used together with a WorkflowIDConflictPolicy")) } func (s *WorkflowHandlerSuite) TestSignalWithStartWorkflowExecution_DefaultWorkflowIdDuplicationPolicies() {