Skip to content

Commit

Permalink
temporal: replace deprecated SearchAttributes with TypedSearchAttribu…
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Oct 21, 2024
1 parent f9513a0 commit 895c643
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 39 deletions.
24 changes: 9 additions & 15 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,9 @@ func (h *FlowRequestHandler) CreateCDCFlow(

workflowID := fmt.Sprintf("%s-peerflow-%s", cfg.FlowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: h.peerflowTaskQueueID,
SearchAttributes: map[string]interface{}{
shared.MirrorNameSearchAttribute: cfg.FlowJobName,
},
ID: workflowID,
TaskQueue: h.peerflowTaskQueueID,
TypedSearchAttributes: shared.NewSearchAttributes(cfg.FlowJobName),
}

err := h.createCdcJobEntry(ctx, req, workflowID)
Expand Down Expand Up @@ -208,11 +206,9 @@ func (h *FlowRequestHandler) CreateQRepFlow(
cfg := req.QrepConfig
workflowID := fmt.Sprintf("%s-qrepflow-%s", cfg.FlowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: h.peerflowTaskQueueID,
SearchAttributes: map[string]interface{}{
shared.MirrorNameSearchAttribute: cfg.FlowJobName,
},
ID: workflowID,
TaskQueue: h.peerflowTaskQueueID,
TypedSearchAttributes: shared.NewSearchAttributes(cfg.FlowJobName),
}
if req.CreateCatalogEntry {
if err := h.createQRepJobEntry(ctx, req, workflowID); err != nil {
Expand Down Expand Up @@ -308,11 +304,9 @@ func (h *FlowRequestHandler) shutdownFlow(
}
workflowID := fmt.Sprintf("%s-dropflow-%s", flowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: h.peerflowTaskQueueID,
SearchAttributes: map[string]interface{}{
shared.MirrorNameSearchAttribute: flowJobName,
},
ID: workflowID,
TaskQueue: h.peerflowTaskQueueID,
TypedSearchAttributes: shared.NewSearchAttributes(flowJobName),
}

dropFlowHandle, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions,
Expand Down
10 changes: 9 additions & 1 deletion flow/shared/constants.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package shared

import (
"go.temporal.io/sdk/temporal"
)

type (
ContextKey string
TaskQueueID string
Expand All @@ -16,7 +20,11 @@ const (
FlowStatusQuery = "q-flow-status"
)

const MirrorNameSearchAttribute = "MirrorName"
var MirrorNameSearchAttribute = temporal.NewSearchAttributeKeyString("MirrorName")

func NewSearchAttributes(mirrorName string) temporal.SearchAttributes {
return temporal.NewSearchAttributes(MirrorNameSearchAttribute.ValueSet(mirrorName))
}

const (
FlowNameKey ContextKey = "flowName"
Expand Down
30 changes: 14 additions & 16 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func processCDCFlowConfigUpdate(
logger log.Logger,
cfg *protos.FlowConnectionConfigs,
state *CDCFlowWorkflowState,
mirrorNameSearch map[string]interface{},
mirrorNameSearch temporal.SearchAttributes,
) error {
flowConfigUpdate := state.FlowConfigUpdate

Expand Down Expand Up @@ -139,7 +139,7 @@ func processTableAdditions(
logger log.Logger,
cfg *protos.FlowConnectionConfigs,
state *CDCFlowWorkflowState,
mirrorNameSearch map[string]interface{},
mirrorNameSearch temporal.SearchAttributes,
) error {
flowConfigUpdate := state.FlowConfigUpdate
if len(flowConfigUpdate.AdditionalTables) == 0 {
Expand Down Expand Up @@ -181,8 +181,8 @@ func processTableAdditions(
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 20,
},
SearchAttributes: mirrorNameSearch,
WaitForCancellation: true,
TypedSearchAttributes: mirrorNameSearch,
WaitForCancellation: true,
}
childAdditionalTablesCDCFlowCtx := workflow.WithChildOptions(ctx, childAdditionalTablesCDCFlowOpts)
childAdditionalTablesCDCFlowFuture := workflow.ExecuteChildWorkflow(
Expand Down Expand Up @@ -329,9 +329,7 @@ func CDCFlowWorkflow(
return state, fmt.Errorf("failed to set `%s` query handler: %w", shared.FlowStatusQuery, err)
}

mirrorNameSearch := map[string]interface{}{
shared.MirrorNameSearchAttribute: cfg.FlowJobName,
}
mirrorNameSearch := shared.NewSearchAttributes(cfg.FlowJobName)

var syncCountLimit int
if state.ActiveSignal == model.PauseSignal {
Expand Down Expand Up @@ -416,8 +414,8 @@ func CDCFlowWorkflow(
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 20,
},
SearchAttributes: mirrorNameSearch,
WaitForCancellation: true,
TypedSearchAttributes: mirrorNameSearch,
WaitForCancellation: true,
}
setupFlowCtx := workflow.WithChildOptions(ctx, childSetupFlowOpts)
setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, cfg)
Expand All @@ -438,9 +436,9 @@ func CDCFlowWorkflow(
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 20,
},
TaskQueue: taskQueue,
SearchAttributes: mirrorNameSearch,
WaitForCancellation: true,
TaskQueue: taskQueue,
TypedSearchAttributes: mirrorNameSearch,
WaitForCancellation: true,
}
snapshotFlowCtx := workflow.WithChildOptions(ctx, childSnapshotFlowOpts)
snapshotFlowFuture := workflow.ExecuteChildWorkflow(
Expand Down Expand Up @@ -502,8 +500,8 @@ func CDCFlowWorkflow(
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 20,
},
SearchAttributes: mirrorNameSearch,
WaitForCancellation: true,
TypedSearchAttributes: mirrorNameSearch,
WaitForCancellation: true,
}
syncCtx := workflow.WithChildOptions(ctx, syncFlowOpts)

Expand All @@ -513,8 +511,8 @@ func CDCFlowWorkflow(
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 20,
},
SearchAttributes: mirrorNameSearch,
WaitForCancellation: true,
TypedSearchAttributes: mirrorNameSearch,
WaitForCancellation: true,
}
normCtx := workflow.WithChildOptions(ctx, normalizeFlowOpts)

Expand Down
10 changes: 3 additions & 7 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,7 @@ func (q *QRepFlowExecution) startChildWorkflow(
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 20,
},
SearchAttributes: map[string]interface{}{
shared.MirrorNameSearchAttribute: q.config.FlowJobName,
},
TypedSearchAttributes: shared.NewSearchAttributes(q.config.FlowJobName),
})

return workflow.ExecuteChildWorkflow(partFlowCtx, QRepPartitionWorkflow, q.config, partitions, q.runUUID)
Expand Down Expand Up @@ -326,10 +324,8 @@ func (q *QRepFlowExecution) waitForNewRows(
lastPartition *protos.QRepPartition,
) error {
ctx = workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
SearchAttributes: map[string]interface{}{
shared.MirrorNameSearchAttribute: q.config.FlowJobName,
},
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
TypedSearchAttributes: shared.NewSearchAttributes(q.config.FlowJobName),
})
future := workflow.ExecuteChildWorkflow(ctx, QRepWaitForNewRowsWorkflow, q.config, lastPartition)

Expand Down

0 comments on commit 895c643

Please sign in to comment.