From 895c6438ce8c020644a658075acec5fdadac4fac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 21 Oct 2024 20:08:05 +0000 Subject: [PATCH] temporal: replace deprecated SearchAttributes with TypedSearchAttributes (#2167) https://github.com/temporalio/sdk-go/pull/1368 --- flow/cmd/handler.go | 24 +++++++++--------------- flow/shared/constants.go | 10 +++++++++- flow/workflows/cdc_flow.go | 30 ++++++++++++++---------------- flow/workflows/qrep_flow.go | 10 +++------- 4 files changed, 35 insertions(+), 39 deletions(-) diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 2b48a29260..1accee0344 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -140,11 +140,9 @@ func (h *FlowRequestHandler) CreateCDCFlow( workflowID := fmt.Sprintf("%s-peerflow-%s", cfg.FlowJobName, uuid.New()) workflowOptions := client.StartWorkflowOptions{ - ID: workflowID, - TaskQueue: h.peerflowTaskQueueID, - SearchAttributes: map[string]interface{}{ - shared.MirrorNameSearchAttribute: cfg.FlowJobName, - }, + ID: workflowID, + TaskQueue: h.peerflowTaskQueueID, + TypedSearchAttributes: shared.NewSearchAttributes(cfg.FlowJobName), } err := h.createCdcJobEntry(ctx, req, workflowID) @@ -208,11 +206,9 @@ func (h *FlowRequestHandler) CreateQRepFlow( cfg := req.QrepConfig workflowID := fmt.Sprintf("%s-qrepflow-%s", cfg.FlowJobName, uuid.New()) workflowOptions := client.StartWorkflowOptions{ - ID: workflowID, - TaskQueue: h.peerflowTaskQueueID, - SearchAttributes: map[string]interface{}{ - shared.MirrorNameSearchAttribute: cfg.FlowJobName, - }, + ID: workflowID, + TaskQueue: h.peerflowTaskQueueID, + TypedSearchAttributes: shared.NewSearchAttributes(cfg.FlowJobName), } if req.CreateCatalogEntry { if err := h.createQRepJobEntry(ctx, req, workflowID); err != nil { @@ -308,11 +304,9 @@ func (h *FlowRequestHandler) shutdownFlow( } workflowID := fmt.Sprintf("%s-dropflow-%s", flowJobName, uuid.New()) workflowOptions := client.StartWorkflowOptions{ - ID: workflowID, - TaskQueue: h.peerflowTaskQueueID, - SearchAttributes: map[string]interface{}{ - shared.MirrorNameSearchAttribute: flowJobName, - }, + ID: workflowID, + TaskQueue: h.peerflowTaskQueueID, + TypedSearchAttributes: shared.NewSearchAttributes(flowJobName), } dropFlowHandle, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, diff --git a/flow/shared/constants.go b/flow/shared/constants.go index 860c541c25..2dc5a8a64e 100644 --- a/flow/shared/constants.go +++ b/flow/shared/constants.go @@ -1,5 +1,9 @@ package shared +import ( + "go.temporal.io/sdk/temporal" +) + type ( ContextKey string TaskQueueID string @@ -16,7 +20,11 @@ const ( FlowStatusQuery = "q-flow-status" ) -const MirrorNameSearchAttribute = "MirrorName" +var MirrorNameSearchAttribute = temporal.NewSearchAttributeKeyString("MirrorName") + +func NewSearchAttributes(mirrorName string) temporal.SearchAttributes { + return temporal.NewSearchAttributes(MirrorNameSearchAttribute.ValueSet(mirrorName)) +} const ( FlowNameKey ContextKey = "flowName" diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index a45ec28793..72e37b01fd 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -87,7 +87,7 @@ func processCDCFlowConfigUpdate( logger log.Logger, cfg *protos.FlowConnectionConfigs, state *CDCFlowWorkflowState, - mirrorNameSearch map[string]interface{}, + mirrorNameSearch temporal.SearchAttributes, ) error { flowConfigUpdate := state.FlowConfigUpdate @@ -139,7 +139,7 @@ func processTableAdditions( logger log.Logger, cfg *protos.FlowConnectionConfigs, state *CDCFlowWorkflowState, - mirrorNameSearch map[string]interface{}, + mirrorNameSearch temporal.SearchAttributes, ) error { flowConfigUpdate := state.FlowConfigUpdate if len(flowConfigUpdate.AdditionalTables) == 0 { @@ -181,8 +181,8 @@ func processTableAdditions( RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 20, }, - SearchAttributes: mirrorNameSearch, - WaitForCancellation: true, + TypedSearchAttributes: mirrorNameSearch, + WaitForCancellation: true, } childAdditionalTablesCDCFlowCtx := workflow.WithChildOptions(ctx, childAdditionalTablesCDCFlowOpts) childAdditionalTablesCDCFlowFuture := workflow.ExecuteChildWorkflow( @@ -329,9 +329,7 @@ func CDCFlowWorkflow( return state, fmt.Errorf("failed to set `%s` query handler: %w", shared.FlowStatusQuery, err) } - mirrorNameSearch := map[string]interface{}{ - shared.MirrorNameSearchAttribute: cfg.FlowJobName, - } + mirrorNameSearch := shared.NewSearchAttributes(cfg.FlowJobName) var syncCountLimit int if state.ActiveSignal == model.PauseSignal { @@ -416,8 +414,8 @@ func CDCFlowWorkflow( RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 20, }, - SearchAttributes: mirrorNameSearch, - WaitForCancellation: true, + TypedSearchAttributes: mirrorNameSearch, + WaitForCancellation: true, } setupFlowCtx := workflow.WithChildOptions(ctx, childSetupFlowOpts) setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, cfg) @@ -438,9 +436,9 @@ func CDCFlowWorkflow( RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 20, }, - TaskQueue: taskQueue, - SearchAttributes: mirrorNameSearch, - WaitForCancellation: true, + TaskQueue: taskQueue, + TypedSearchAttributes: mirrorNameSearch, + WaitForCancellation: true, } snapshotFlowCtx := workflow.WithChildOptions(ctx, childSnapshotFlowOpts) snapshotFlowFuture := workflow.ExecuteChildWorkflow( @@ -502,8 +500,8 @@ func CDCFlowWorkflow( RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 20, }, - SearchAttributes: mirrorNameSearch, - WaitForCancellation: true, + TypedSearchAttributes: mirrorNameSearch, + WaitForCancellation: true, } syncCtx := workflow.WithChildOptions(ctx, syncFlowOpts) @@ -513,8 +511,8 @@ func CDCFlowWorkflow( RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 20, }, - SearchAttributes: mirrorNameSearch, - WaitForCancellation: true, + TypedSearchAttributes: mirrorNameSearch, + WaitForCancellation: true, } normCtx := workflow.WithChildOptions(ctx, normalizeFlowOpts) diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 9363b30b69..c7348eefa9 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -239,9 +239,7 @@ func (q *QRepFlowExecution) startChildWorkflow( RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 20, }, - SearchAttributes: map[string]interface{}{ - shared.MirrorNameSearchAttribute: q.config.FlowJobName, - }, + TypedSearchAttributes: shared.NewSearchAttributes(q.config.FlowJobName), }) return workflow.ExecuteChildWorkflow(partFlowCtx, QRepPartitionWorkflow, q.config, partitions, q.runUUID) @@ -326,10 +324,8 @@ func (q *QRepFlowExecution) waitForNewRows( lastPartition *protos.QRepPartition, ) error { ctx = workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{ - ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, - SearchAttributes: map[string]interface{}{ - shared.MirrorNameSearchAttribute: q.config.FlowJobName, - }, + ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, + TypedSearchAttributes: shared.NewSearchAttributes(q.config.FlowJobName), }) future := workflow.ExecuteChildWorkflow(ctx, QRepWaitForNewRowsWorkflow, q.config, lastPartition)