Skip to content

Commit

Permalink
Alerts for drop mirror (#2172)
Browse files Browse the repository at this point in the history
Add alerting for drop mirror's 2 activities - `DropFlowSource` and
`DropFlowDestination`

---------

Co-authored-by: Philip Dubé <[email protected]>
  • Loading branch information
Amogh-Bharadwaj and serprex authored Oct 24, 2024
1 parent 8d0da88 commit 8dc6c98
Showing 1 changed file with 22 additions and 4 deletions.
26 changes: 22 additions & 4 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,22 +621,40 @@ func (a *FlowableActivity) DropFlowSource(ctx context.Context, req *protos.DropF
ctx = context.WithValue(ctx, shared.FlowNameKey, req.FlowJobName)
srcConn, err := connectors.GetByNameAs[connectors.CDCPullConnector](ctx, nil, a.CatalogPool, req.PeerName)
if err != nil {
return fmt.Errorf("failed to get source connector: %w", err)
srcConnErr := fmt.Errorf("[DropFlowSource] failed to get source connector: %w", err)
a.Alerter.LogFlowError(ctx, req.FlowJobName, srcConnErr)
return srcConnErr
}
defer connectors.CloseConnector(ctx, srcConn)

return srcConn.PullFlowCleanup(ctx, req.FlowJobName)
err = srcConn.PullFlowCleanup(ctx, req.FlowJobName)
if err != nil {
pullCleanupErr := fmt.Errorf("[DropFlowSource] failed to clean up source: %w", err)
a.Alerter.LogFlowError(ctx, req.FlowJobName, pullCleanupErr)
return pullCleanupErr
}

return nil
}

func (a *FlowableActivity) DropFlowDestination(ctx context.Context, req *protos.DropFlowActivityInput) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, req.FlowJobName)
dstConn, err := connectors.GetByNameAs[connectors.CDCSyncConnector](ctx, nil, a.CatalogPool, req.PeerName)
if err != nil {
return fmt.Errorf("failed to get destination connector: %w", err)
dstConnErr := fmt.Errorf("[DropFlowDestination] failed to get destination connector: %w", err)
a.Alerter.LogFlowError(ctx, req.FlowJobName, dstConnErr)
return dstConnErr
}
defer connectors.CloseConnector(ctx, dstConn)

return dstConn.SyncFlowCleanup(ctx, req.FlowJobName)
err = dstConn.SyncFlowCleanup(ctx, req.FlowJobName)
if err != nil {
syncFlowCleanupErr := fmt.Errorf("[DropFlowDestination] failed to clean up destination: %w", err)
a.Alerter.LogFlowError(ctx, req.FlowJobName, syncFlowCleanupErr)
return syncFlowCleanupErr
}

return nil
}

func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
Expand Down

0 comments on commit 8dc6c98

Please sign in to comment.