Skip to content

Commit

Permalink
drop_flow: fix hanging bug (#1215)
Browse files Browse the repository at this point in the history
Somehow between testing & merging I screwed up adding initial activities to selector,
causing drop flow to indefinitely hang

Also need explicit listen on ctx.Done in selector since
`selector.Select(ctx)` doesn't return if ctx is canceled,
so if the activities hung in selector that'd cause our workflow to ignore cancelation
  • Loading branch information
serprex authored Feb 7, 2024
1 parent 6309d62 commit 151bb10
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions flow/workflows/drop_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,20 @@ func DropFlowWorkflow(ctx workflow.Context, req *protos.ShutdownRequest) error {
})

ctx = workflow.WithValue(ctx, shared.FlowNameKey, req.FlowJobName)
dropSourceFuture := workflow.ExecuteActivity(ctx, flowable.DropFlowSource, req)
dropDestinationFuture := workflow.ExecuteActivity(ctx, flowable.DropFlowDestination, req)

var sourceError, destinationError error
var sourceOk, destinationOk bool
var sourceOk, destinationOk, canceled bool
selector := workflow.NewNamedSelector(ctx, fmt.Sprintf("%s-drop", req.FlowJobName))
selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {
canceled = true
})

var dropSource, dropDestination func(f workflow.Future)
dropSource = func(f workflow.Future) {
sourceError = f.Get(ctx, nil)
sourceOk = sourceError == nil
if !sourceOk {
dropSourceFuture = workflow.ExecuteActivity(ctx, flowable.DropFlowSource, req)
dropSourceFuture := workflow.ExecuteActivity(ctx, flowable.DropFlowSource, req)
selector.AddFuture(dropSourceFuture, dropSource)
_ = workflow.Sleep(ctx, time.Second)
}
Expand All @@ -41,16 +42,20 @@ func DropFlowWorkflow(ctx workflow.Context, req *protos.ShutdownRequest) error {
destinationError = f.Get(ctx, nil)
destinationOk = destinationError == nil
if !destinationOk {
dropDestinationFuture = workflow.ExecuteActivity(ctx, flowable.DropFlowDestination, req)
dropDestinationFuture := workflow.ExecuteActivity(ctx, flowable.DropFlowDestination, req)
selector.AddFuture(dropDestinationFuture, dropDestination)
_ = workflow.Sleep(ctx, time.Second)
}
}
dropSourceFuture := workflow.ExecuteActivity(ctx, flowable.DropFlowSource, req)
selector.AddFuture(dropSourceFuture, dropSource)
dropDestinationFuture := workflow.ExecuteActivity(ctx, flowable.DropFlowDestination, req)
selector.AddFuture(dropDestinationFuture, dropDestination)

for {
selector.Select(ctx)
if ctx.Err() != nil {
return errors.Join(sourceError, destinationError)
if canceled {
return errors.Join(ctx.Err(), sourceError, destinationError)
} else if sourceOk && destinationOk {
return nil
}
Expand Down

0 comments on commit 151bb10

Please sign in to comment.