diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index cacc0ba881..cc09bae0d7 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -1065,3 +1065,34 @@ func (a *FlowableActivity) RemoveTablesFromCatalog( return err } + +func (a *FlowableActivity) RemoveFlowEntryFromCatalog(ctx context.Context, flowName string) error { + logger := log.With(activity.GetLogger(ctx), + slog.String(string(shared.FlowNameKey), flowName)) + tx, err := a.CatalogPool.Begin(ctx) + if err != nil { + return fmt.Errorf("failed to begin transaction to remove flow entries from catalog: %w", err) + } + defer shared.RollbackTx(tx, slog.Default()) + + if _, err := tx.Exec(ctx, "DELETE FROM table_schema_mapping WHERE flow_name=$1", flowName); err != nil { + return fmt.Errorf("unable to clear table_schema_mapping in catalog: %w", err) + } + + ct, err := tx.Exec(ctx, "DELETE FROM flows WHERE name=$1", flowName) + if err != nil { + return fmt.Errorf("unable to remove flow entry in catalog: %w", err) + } + if ct.RowsAffected() == 0 { + logger.Warn("flow entry not found in catalog, 0 records deleted") + } else { + logger.Info("flow entries removed from catalog", + slog.Int("rowsAffected", int(ct.RowsAffected()))) + } + + if err := tx.Commit(ctx); err != nil { + return fmt.Errorf("failed to commit transaction to remove flow entries from catalog: %w", err) + } + + return nil +} diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index d9f5c27d9c..8b30331ae8 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -175,31 +175,6 @@ func (h *FlowRequestHandler) updateFlowConfigInCatalog( return shared.UpdateCDCConfigInCatalog(ctx, h.pool, slog.Default(), cfg) } -func (h *FlowRequestHandler) removeFlowEntryInCatalog( - ctx context.Context, - flowName string, -) error { - tx, err := h.pool.Begin(ctx) - if err != nil { - return fmt.Errorf("unable to begin tx to remove flow entry in catalog: %w", err) - } - defer shared.RollbackTx(tx, slog.Default()) - - if _, err := tx.Exec(ctx, "DELETE FROM table_schema_mapping WHERE flow_name=$1", flowName); err != nil { - return fmt.Errorf("unable to clear table_schema_mapping to remove flow entry in catalog: %w", err) - } - - if _, err := tx.Exec(ctx, "DELETE FROM flows WHERE name=$1", flowName); err != nil { - return fmt.Errorf("unable to remove flow entry in catalog: %w", err) - } - - if err := tx.Commit(ctx); err != nil { - return fmt.Errorf("unable to commit remove flow entry in catalog: %w", err) - } - - return nil -} - func (h *FlowRequestHandler) CreateQRepFlow( ctx context.Context, req *protos.CreateQRepFlowRequest, ) (*protos.CreateQRepFlowResponse, error) { @@ -295,56 +270,52 @@ func (h *FlowRequestHandler) shutdownFlow( if err != nil { slog.Error("unable to check if workflow is cdc", logs, slog.Any("error", err)) return fmt.Errorf("unable to determine if workflow is cdc: %w", err) - } else if isCdc { - cdcConfig, err := h.getFlowConfigFromCatalog(ctx, flowJobName) + } + var cdcConfig *protos.FlowConnectionConfigs + if isCdc { + cdcConfig, err = h.getFlowConfigFromCatalog(ctx, flowJobName) if err != nil { slog.Error("unable to get cdc config from catalog", logs, slog.Any("error", err)) return fmt.Errorf("unable to get cdc config from catalog: %w", err) } - workflowID := fmt.Sprintf("%s-dropflow-%s", flowJobName, uuid.New()) - workflowOptions := client.StartWorkflowOptions{ - ID: workflowID, - TaskQueue: h.peerflowTaskQueueID, - TypedSearchAttributes: shared.NewSearchAttributes(flowJobName), - } + } + dropFlowWorkflowID := fmt.Sprintf("%s-dropflow-%s", flowJobName, uuid.New()) + workflowOptions := client.StartWorkflowOptions{ + ID: dropFlowWorkflowID, + TaskQueue: h.peerflowTaskQueueID, + TypedSearchAttributes: shared.NewSearchAttributes(flowJobName), + } - dropFlowHandle, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, - peerflow.DropFlowWorkflow, &protos.DropFlowInput{ - FlowJobName: flowJobName, - SourcePeerName: cdcConfig.SourceName, - DestinationPeerName: cdcConfig.DestinationName, - DropFlowStats: deleteStats, - }) - if err != nil { - slog.Error("unable to start DropFlow workflow", logs, slog.Any("error", err)) - return fmt.Errorf("unable to start DropFlow workflow: %w", err) - } + dropFlowHandle, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, + peerflow.DropFlowWorkflow, &protos.DropFlowInput{ + FlowJobName: flowJobName, + DropFlowStats: deleteStats, + FlowConnectionConfigs: cdcConfig, + }) + if err != nil { + slog.Error("unable to start DropFlow workflow", logs, slog.Any("error", err)) + return fmt.Errorf("unable to start DropFlow workflow: %w", err) + } - cancelCtx, cancel := context.WithTimeout(ctx, 2*time.Minute) - defer cancel() + cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) + defer cancel() - errChan := make(chan error, 1) - go func() { - errChan <- dropFlowHandle.Get(cancelCtx, nil) - }() + errChan := make(chan error, 1) + go func() { + errChan <- dropFlowHandle.Get(cancelCtx, nil) + }() - select { - case err := <-errChan: - if err != nil { - slog.Error("DropFlow workflow did not execute successfully", logs, slog.Any("error", err)) - return fmt.Errorf("DropFlow workflow did not execute successfully: %w", err) - } - case <-time.After(5 * time.Minute): - if err := h.handleCancelWorkflow(ctx, workflowID, ""); err != nil { - slog.Error("unable to wait for DropFlow workflow to close", logs, slog.Any("error", err)) - return fmt.Errorf("unable to wait for DropFlow workflow to close: %w", err) - } + select { + case err := <-errChan: + if err != nil { + slog.Error("DropFlow workflow did not execute successfully", logs, slog.Any("error", err)) + return fmt.Errorf("DropFlow workflow did not execute successfully: %w", err) + } + case <-time.After(5 * time.Minute): + if err := h.handleCancelWorkflow(ctx, workflowID, ""); err != nil { + slog.Error("unable to wait for DropFlow workflow to close", logs, slog.Any("error", err)) + return fmt.Errorf("unable to wait for DropFlow workflow to close: %w", err) } - } - - if err := h.removeFlowEntryInCatalog(ctx, flowJobName); err != nil { - slog.Error("unable to remove flow job entry", logs, slog.Any("error", err)) - return err } return nil diff --git a/flow/workflows/drop_flow.go b/flow/workflows/drop_flow.go index fe822dd66f..51bf0091a1 100644 --- a/flow/workflows/drop_flow.go +++ b/flow/workflows/drop_flow.go @@ -12,35 +12,27 @@ import ( "github.com/PeerDB-io/peer-flow/shared" ) -func DropFlowWorkflow(ctx workflow.Context, config *protos.DropFlowInput) error { - workflow.GetLogger(ctx).Info("performing cleanup for flow", slog.String(string(shared.FlowNameKey), config.FlowJobName)) - +func executeCDCDropActivities(ctx workflow.Context, input *protos.DropFlowInput) error { ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Minute, }) - ctx = workflow.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) ctx = workflow.WithDataConverter(ctx, converter.NewCompositeDataConverter(converter.NewJSONPayloadConverter())) - dropStatsCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 5 * time.Minute, - HeartbeatTimeout: 1 * time.Minute, - }) - var sourceError, destinationError error var sourceOk, destinationOk, canceled bool - selector := workflow.NewNamedSelector(ctx, config.FlowJobName+"-drop") + selector := workflow.NewNamedSelector(ctx, input.FlowJobName+"-drop") selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) { canceled = true }) - var dropSource, dropDestination, dropStats func(f workflow.Future) + var dropSource, dropDestination func(f workflow.Future) dropSource = func(f workflow.Future) { sourceError = f.Get(ctx, nil) sourceOk = sourceError == nil if !sourceOk { dropSourceFuture := workflow.ExecuteActivity(ctx, flowable.DropFlowSource, &protos.DropFlowActivityInput{ - FlowJobName: config.FlowJobName, - PeerName: config.SourcePeerName, + FlowJobName: input.FlowJobName, + PeerName: input.FlowConnectionConfigs.SourceName, }) selector.AddFuture(dropSourceFuture, dropSource) _ = workflow.Sleep(ctx, time.Second) @@ -51,34 +43,25 @@ func DropFlowWorkflow(ctx workflow.Context, config *protos.DropFlowInput) error destinationOk = destinationError == nil if !destinationOk { dropDestinationFuture := workflow.ExecuteActivity(ctx, flowable.DropFlowDestination, &protos.DropFlowActivityInput{ - FlowJobName: config.FlowJobName, - PeerName: config.DestinationPeerName, + FlowJobName: input.FlowJobName, + PeerName: input.FlowConnectionConfigs.DestinationName, }) selector.AddFuture(dropDestinationFuture, dropDestination) _ = workflow.Sleep(ctx, time.Second) } } - dropStats = func(f workflow.Future) { - statsError := f.Get(dropStatsCtx, nil) - if statsError != nil { - // not fatal - workflow.GetLogger(ctx).Warn("failed to delete mirror stats", slog.Any("error", statsError)) - } - } + dropSourceFuture := workflow.ExecuteActivity(ctx, flowable.DropFlowSource, &protos.DropFlowActivityInput{ - FlowJobName: config.FlowJobName, - PeerName: config.SourcePeerName, + FlowJobName: input.FlowJobName, + PeerName: input.FlowConnectionConfigs.SourceName, }) selector.AddFuture(dropSourceFuture, dropSource) dropDestinationFuture := workflow.ExecuteActivity(ctx, flowable.DropFlowDestination, &protos.DropFlowActivityInput{ - FlowJobName: config.FlowJobName, - PeerName: config.DestinationPeerName, + FlowJobName: input.FlowJobName, + PeerName: input.FlowConnectionConfigs.DestinationName, }) + selector.AddFuture(dropDestinationFuture, dropDestination) - if config.DropFlowStats { - dropStatsFuture := workflow.ExecuteActivity(dropStatsCtx, flowable.DeleteMirrorStats, config.FlowJobName) - selector.AddFuture(dropStatsFuture, dropStats) - } for { selector.Select(ctx) @@ -89,3 +72,45 @@ func DropFlowWorkflow(ctx workflow.Context, config *protos.DropFlowInput) error } } } + +func DropFlowWorkflow(ctx workflow.Context, input *protos.DropFlowInput) error { + ctx = workflow.WithValue(ctx, shared.FlowNameKey, input.FlowJobName) + workflow.GetLogger(ctx).Info("performing cleanup for flow", + slog.String(string(shared.FlowNameKey), input.FlowJobName)) + + if input.FlowConnectionConfigs != nil && input.DropFlowStats { + dropStatsCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 1 * time.Minute, + HeartbeatTimeout: 1 * time.Minute, + }) + dropStatsFuture := workflow.ExecuteActivity(dropStatsCtx, + flowable.DeleteMirrorStats, input.FlowJobName) + err := dropStatsFuture.Get(dropStatsCtx, nil) + if err != nil { + workflow.GetLogger(ctx).Error("failed to delete mirror stats", slog.Any("error", err)) + return err + } + } + + removeFlowEntriesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 1 * time.Minute, + }) + removeFromCatalogFuture := workflow.ExecuteActivity(removeFlowEntriesCtx, + flowable.RemoveFlowEntryFromCatalog, input.FlowJobName) + err := removeFromCatalogFuture.Get(ctx, nil) + if err != nil { + workflow.GetLogger(ctx).Error("failed to remove flow entries from catalog", slog.Any("error", err)) + return err + } + + if input.FlowConnectionConfigs != nil { + err := executeCDCDropActivities(ctx, input) + if err != nil { + workflow.GetLogger(ctx).Error("failed to drop CDC flow", slog.Any("error", err)) + return err + } + workflow.GetLogger(ctx).Info("CDC flow dropped successfully") + } + + return nil +} diff --git a/protos/flow.proto b/protos/flow.proto index 7e24cfc528..d1681fd8d5 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -333,10 +333,10 @@ message QRepParitionResult { } message DropFlowInput { + reserved 2,3; string flow_job_name = 1; - string source_peer_name = 2; - string destination_peer_name = 3; bool drop_flow_stats = 4; + FlowConnectionConfigs flow_connection_configs = 5; } message TableSchemaDelta {