From eadb503a86e0a9e673fd28245459d7f013b5d203 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Sat, 7 Dec 2024 00:54:30 +0530 Subject: [PATCH] [temporal] adjust threshold for workflows to ContinueAsNew --- flow/shared/workflow.go | 7 +++++++ flow/workflows/cdc_flow.go | 3 ++- flow/workflows/sync_flow.go | 2 +- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/flow/shared/workflow.go b/flow/shared/workflow.go index c9cafc37e2..4dfb192511 100644 --- a/flow/shared/workflow.go +++ b/flow/shared/workflow.go @@ -6,6 +6,7 @@ import ( "log/slog" "go.temporal.io/sdk/client" + "go.temporal.io/sdk/workflow" "github.com/PeerDB-io/peer-flow/generated/protos" ) @@ -25,3 +26,9 @@ func GetWorkflowStatus(ctx context.Context, temporalClient client.Client, workfl } return state, nil } + +func ShouldWorkflowContinueAsNew(ctx workflow.Context) bool { + info := workflow.GetInfo(ctx) + return info.GetContinueAsNewSuggested() && + (info.GetCurrentHistoryLength() > 40960 || info.GetCurrentHistorySize() > 40*1024*1024) +} diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 9533ecddec..a10e544e2a 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -552,7 +552,7 @@ func CDCFlowWorkflow( return state, err } - if workflow.GetInfo(ctx).GetContinueAsNewSuggested() { + if shared.ShouldWorkflowContinueAsNew(ctx) { restart = true if syncFlowFuture != nil { if err := model.SyncStopSignal.SignalChildWorkflow(ctx, syncFlowFuture, struct{}{}).Get(ctx, nil); err != nil { @@ -561,6 +561,7 @@ func CDCFlowWorkflow( } } } + workflow.GetInfo(ctx).GetCurrentHistorySize() if restart || finished { for ctx.Err() == nil && (!finished || mainLoopSelector.HasPending()) { diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index f6d75d4ee5..e3337de2dc 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -108,7 +108,7 @@ func SyncFlowWorkflow( } if (options.NumberOfSyncs > 0 && currentSyncFlowNum >= options.NumberOfSyncs) || - syncErr || ctx.Err() != nil || workflow.GetInfo(ctx).GetContinueAsNewSuggested() { + syncErr || ctx.Err() != nil || shared.ShouldWorkflowContinueAsNew(ctx) { break } }