diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 3d8e5083f..df78cc3dc 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -621,22 +621,40 @@ func (a *FlowableActivity) DropFlowSource(ctx context.Context, req *protos.DropF ctx = context.WithValue(ctx, shared.FlowNameKey, req.FlowJobName) srcConn, err := connectors.GetByNameAs[connectors.CDCPullConnector](ctx, nil, a.CatalogPool, req.PeerName) if err != nil { - return fmt.Errorf("failed to get source connector: %w", err) + srcConnErr := fmt.Errorf("[DropFlowSource] failed to get source connector: %w", err) + a.Alerter.LogFlowError(ctx, req.FlowJobName, srcConnErr) + return srcConnErr } defer connectors.CloseConnector(ctx, srcConn) - return srcConn.PullFlowCleanup(ctx, req.FlowJobName) + err = srcConn.PullFlowCleanup(ctx, req.FlowJobName) + if err != nil { + pullCleanupErr := fmt.Errorf("[DropFlowSource] failed to clean up source: %w", err) + a.Alerter.LogFlowError(ctx, req.FlowJobName, pullCleanupErr) + return pullCleanupErr + } + + return nil } func (a *FlowableActivity) DropFlowDestination(ctx context.Context, req *protos.DropFlowActivityInput) error { ctx = context.WithValue(ctx, shared.FlowNameKey, req.FlowJobName) dstConn, err := connectors.GetByNameAs[connectors.CDCSyncConnector](ctx, nil, a.CatalogPool, req.PeerName) if err != nil { - return fmt.Errorf("failed to get destination connector: %w", err) + dstConnErr := fmt.Errorf("[DropFlowDestination] failed to get destination connector: %w", err) + a.Alerter.LogFlowError(ctx, req.FlowJobName, dstConnErr) + return dstConnErr } defer connectors.CloseConnector(ctx, dstConn) - return dstConn.SyncFlowCleanup(ctx, req.FlowJobName) + err = dstConn.SyncFlowCleanup(ctx, req.FlowJobName) + if err != nil { + syncFlowCleanupErr := fmt.Errorf("[DropFlowDestination] failed to clean up destination: %w", err) + a.Alerter.LogFlowError(ctx, req.FlowJobName, syncFlowCleanupErr) + return syncFlowCleanupErr + } + + return nil } func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {