diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index c3e577a4dc..7fb3f857da 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -221,7 +221,7 @@ func SetupCDCFlowStatusQuery(t *testing.T, env WorkflowRun, config *protos.FlowC var status protos.FlowStatus if err := response.Get(&status); err != nil { t.Fatal(err) - } else if status == protos.FlowStatus_STATUS_RUNNING { + } else if status == protos.FlowStatus_STATUS_RUNNING || status == protos.FlowStatus_STATUS_COMPLETED { return } else if counter > 30 { env.Cancel() diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 72e37b01fd..bd1d5459dd 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -485,6 +485,8 @@ func CDCFlowWorkflow( // if initial_copy_only is opted for, we end the flow here. if cfg.InitialSnapshotOnly { + logger.Info("initial snapshot only, ending flow") + state.CurrentFlowStatus = protos.FlowStatus_STATUS_COMPLETED return state, nil } } diff --git a/protos/flow.proto b/protos/flow.proto index de7bf740d0..42170a5630 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -385,6 +385,7 @@ enum FlowStatus { STATUS_SNAPSHOT = 5; STATUS_TERMINATING = 6; STATUS_TERMINATED = 7; + STATUS_COMPLETED = 8; } message CDCFlowConfigUpdate {