From 8dc6c981dc293bdc5fbc0ff7708241203b9ba2e0 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj <65964360+Amogh-Bharadwaj@users.noreply.github.com> Date: Thu, 24 Oct 2024 15:42:24 +0530 Subject: [PATCH] Alerts for drop mirror (#2172) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add alerting for drop mirror's 2 activities - `DropFlowSource` and `DropFlowDestination` --------- Co-authored-by: Philip Dubé --- flow/activities/flowable.go | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 3d8e5083f..df78cc3dc 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -621,22 +621,40 @@ func (a *FlowableActivity) DropFlowSource(ctx context.Context, req *protos.DropF ctx = context.WithValue(ctx, shared.FlowNameKey, req.FlowJobName) srcConn, err := connectors.GetByNameAs[connectors.CDCPullConnector](ctx, nil, a.CatalogPool, req.PeerName) if err != nil { - return fmt.Errorf("failed to get source connector: %w", err) + srcConnErr := fmt.Errorf("[DropFlowSource] failed to get source connector: %w", err) + a.Alerter.LogFlowError(ctx, req.FlowJobName, srcConnErr) + return srcConnErr } defer connectors.CloseConnector(ctx, srcConn) - return srcConn.PullFlowCleanup(ctx, req.FlowJobName) + err = srcConn.PullFlowCleanup(ctx, req.FlowJobName) + if err != nil { + pullCleanupErr := fmt.Errorf("[DropFlowSource] failed to clean up source: %w", err) + a.Alerter.LogFlowError(ctx, req.FlowJobName, pullCleanupErr) + return pullCleanupErr + } + + return nil } func (a *FlowableActivity) DropFlowDestination(ctx context.Context, req *protos.DropFlowActivityInput) error { ctx = context.WithValue(ctx, shared.FlowNameKey, req.FlowJobName) dstConn, err := connectors.GetByNameAs[connectors.CDCSyncConnector](ctx, nil, a.CatalogPool, req.PeerName) if err != nil { - return fmt.Errorf("failed to get destination connector: %w", err) + dstConnErr := fmt.Errorf("[DropFlowDestination] failed to get destination connector: %w", err) + a.Alerter.LogFlowError(ctx, req.FlowJobName, dstConnErr) + return dstConnErr } defer connectors.CloseConnector(ctx, dstConn) - return dstConn.SyncFlowCleanup(ctx, req.FlowJobName) + err = dstConn.SyncFlowCleanup(ctx, req.FlowJobName) + if err != nil { + syncFlowCleanupErr := fmt.Errorf("[DropFlowDestination] failed to clean up destination: %w", err) + a.Alerter.LogFlowError(ctx, req.FlowJobName, syncFlowCleanupErr) + return syncFlowCleanupErr + } + + return nil } func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {