Skip to content

Commit

Permalink
Merge branch 'main' into stable
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Oct 24, 2024
2 parents 4f33f76 + 22addcb commit d27ebf6
Show file tree
Hide file tree
Showing 61 changed files with 1,417 additions and 697 deletions.
43 changes: 37 additions & 6 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ func (a *FlowableActivity) MaintainPull(
config *protos.FlowConnectionConfigs,
sessionID string,
) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
srcConn, err := connectors.GetByNameAs[connectors.CDCPullConnector](ctx, config.Env, a.CatalogPool, config.SourceName)
if err != nil {
return err
Expand Down Expand Up @@ -312,6 +313,7 @@ func (a *FlowableActivity) MaintainPull(
a.CdcCacheRw.Lock()
delete(a.CdcCache, sessionID)
a.CdcCacheRw.Unlock()
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", err)
}
case <-done:
Expand Down Expand Up @@ -619,22 +621,40 @@ func (a *FlowableActivity) DropFlowSource(ctx context.Context, req *protos.DropF
ctx = context.WithValue(ctx, shared.FlowNameKey, req.FlowJobName)
srcConn, err := connectors.GetByNameAs[connectors.CDCPullConnector](ctx, nil, a.CatalogPool, req.PeerName)
if err != nil {
return fmt.Errorf("failed to get source connector: %w", err)
srcConnErr := fmt.Errorf("[DropFlowSource] failed to get source connector: %w", err)
a.Alerter.LogFlowError(ctx, req.FlowJobName, srcConnErr)
return srcConnErr
}
defer connectors.CloseConnector(ctx, srcConn)

return srcConn.PullFlowCleanup(ctx, req.FlowJobName)
err = srcConn.PullFlowCleanup(ctx, req.FlowJobName)
if err != nil {
pullCleanupErr := fmt.Errorf("[DropFlowSource] failed to clean up source: %w", err)
a.Alerter.LogFlowError(ctx, req.FlowJobName, pullCleanupErr)
return pullCleanupErr
}

return nil
}

func (a *FlowableActivity) DropFlowDestination(ctx context.Context, req *protos.DropFlowActivityInput) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, req.FlowJobName)
dstConn, err := connectors.GetByNameAs[connectors.CDCSyncConnector](ctx, nil, a.CatalogPool, req.PeerName)
if err != nil {
return fmt.Errorf("failed to get destination connector: %w", err)
dstConnErr := fmt.Errorf("[DropFlowDestination] failed to get destination connector: %w", err)
a.Alerter.LogFlowError(ctx, req.FlowJobName, dstConnErr)
return dstConnErr
}
defer connectors.CloseConnector(ctx, dstConn)

return dstConn.SyncFlowCleanup(ctx, req.FlowJobName)
err = dstConn.SyncFlowCleanup(ctx, req.FlowJobName)
if err != nil {
syncFlowCleanupErr := fmt.Errorf("[DropFlowDestination] failed to clean up destination: %w", err)
a.Alerter.LogFlowError(ctx, req.FlowJobName, syncFlowCleanupErr)
return syncFlowCleanupErr
}

return nil
}

func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
Expand Down Expand Up @@ -669,7 +689,7 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {

func() {
pgConfig := pgPeer.GetPostgresConfig()
pgConn, peerErr := connpostgres.NewPostgresConnector(ctx, pgConfig)
pgConn, peerErr := connpostgres.NewPostgresConnector(ctx, nil, pgConfig)
if peerErr != nil {
logger.Error(fmt.Sprintf("error creating connector for postgres peer %s with host %s: %v",
pgPeer.Name, pgConfig.Host, peerErr))
Expand Down Expand Up @@ -767,6 +787,17 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error {
return
}
slotMetricGauges.OpenReplicationConnectionsGauge = openReplicationConnectionsGauge

intervalSinceLastNormalizeGauge, err := otel_metrics.GetOrInitFloat64SyncGauge(a.OtelManager.Meter,
a.OtelManager.Float64GaugesCache,
peerdb_gauges.BuildGaugeName(peerdb_gauges.IntervalSinceLastNormalizeGaugeName),
metric.WithUnit("s"),
metric.WithDescription("Interval since last normalize"))
if err != nil {
logger.Error("Failed to get interval since last normalize gauge", slog.Any("error", err))
return
}
slotMetricGauges.IntervalSinceLastNormalizeGauge = intervalSinceLastNormalizeGauge
}

if err := srcConn.HandleSlotInfo(ctx, a.Alerter, a.CatalogPool, &alerting.AlertKeys{
Expand Down Expand Up @@ -993,7 +1024,7 @@ func (a *FlowableActivity) RemoveTablesFromRawTable(
// we can ignore the error
return nil
}
return fmt.Errorf("[RemoveTablesFromRawTable]:failed to get destination connector: %w", err)
return fmt.Errorf("[RemoveTablesFromRawTable] failed to get destination connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)

Expand Down
3 changes: 2 additions & 1 deletion flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
return dstConn.GetLastOffset(ctx, config.FlowJobName)
}()
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, err
}

Expand Down Expand Up @@ -352,7 +353,7 @@ func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*proto
return nil, err
}

peerOptions, err := peerdbenv.Decrypt(encKeyID, encPeerOptions)
peerOptions, err := peerdbenv.Decrypt(ctx, encKeyID, encPeerOptions)
if err != nil {
return nil, err
}
Expand Down
138 changes: 111 additions & 27 deletions flow/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import (

// alerting service, no cool name :(
type Alerter struct {
catalogPool *pgxpool.Pool
telemetrySender telemetry.Sender
CatalogPool *pgxpool.Pool
snsTelemetrySender telemetry.Sender
incidentIoTelemetrySender telemetry.Sender
}

type AlertSenderConfig struct {
Expand All @@ -38,7 +39,7 @@ type AlertKeys struct {
}

func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]AlertSenderConfig, error) {
rows, err := a.catalogPool.Query(ctx,
rows, err := a.CatalogPool.Query(ctx,
`SELECT
id,
service_type,
Expand All @@ -50,7 +51,7 @@ func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]AlertSenderCon
return nil, fmt.Errorf("failed to read alerter config from catalog: %w", err)
}

keys := peerdbenv.PeerDBEncKeys()
keys := peerdbenv.PeerDBEncKeys(ctx)
return pgx.CollectRows(rows, func(row pgx.CollectableRow) (AlertSenderConfig, error) {
var alertSenderConfig AlertSenderConfig
var serviceType ServiceType
Expand Down Expand Up @@ -126,14 +127,31 @@ func NewAlerter(ctx context.Context, catalogPool *pgxpool.Pool) *Alerter {
snsMessageSender, err = telemetry.NewSNSMessageSenderWithNewClient(ctx, &telemetry.SNSMessageSenderConfig{
Topic: snsTopic,
})
logger.LoggerFromCtx(ctx).Info("Successfully registered telemetry sender")
logger.LoggerFromCtx(ctx).Info("Successfully registered sns telemetry sender")
if err != nil {
panic(fmt.Sprintf("unable to setup telemetry is nil for Alerter %+v", err))
}
}

incidentIoURL := peerdbenv.PeerDBGetIncidentIoUrl()
incidentIoAuth := peerdbenv.PeerDBGetIncidentIoToken()
var incidentIoTelemetrySender telemetry.Sender
if incidentIoURL != "" && incidentIoAuth != "" {
var err error
incidentIoTelemetrySender, err = telemetry.NewIncidentIoMessageSender(ctx, telemetry.IncidentIoMessageSenderConfig{
URL: incidentIoURL,
Token: incidentIoAuth,
})
logger.LoggerFromCtx(ctx).Info("Successfully registered incident.io telemetry sender")
if err != nil {
panic(fmt.Sprintf("unable to setup incident.io telemetry is nil for Alerter %+v", err))
}
}

return &Alerter{
catalogPool: catalogPool,
telemetrySender: snsMessageSender,
CatalogPool: catalogPool,
snsTelemetrySender: snsMessageSender,
incidentIoTelemetrySender: incidentIoTelemetrySender,
}
}

Expand Down Expand Up @@ -172,17 +190,29 @@ func (a *Alerter) AlertIfSlotLag(ctx context.Context, alertKeys *AlertKeys, slot
`currently at %.2fMB!`, deploymentUIDPrefix, slotInfo.SlotName, alertKeys.PeerName, slotInfo.LagInMb)

badWalStatusAlertKey := fmt.Sprintf("%s Bad WAL Status for Peer %s", deploymentUIDPrefix, alertKeys.PeerName)
badWalStatusAlertMessageTemplate := fmt.Sprintf("%sSlot `%s` on peer `%s` has bad WAL status: `%s`",
badWalStatusAlertMessage := fmt.Sprintf("%sSlot `%s` on peer `%s` has bad WAL status: `%s`",
deploymentUIDPrefix, slotInfo.SlotName, alertKeys.PeerName, slotInfo.WalStatus)

for _, alertSenderConfig := range alertSendersForMirrors {
if slotInfo.LagInMb > float32(lowestSlotLagMBAlertThreshold) {
a.alertToProvider(ctx, alertSenderConfig, thresholdAlertKey,
fmt.Sprintf(thresholdAlertMessageTemplate, defaultSlotLagMBAlertThreshold))
if a.checkAndAddAlertToCatalog(ctx,
alertSenderConfig.Id, thresholdAlertKey,
fmt.Sprintf(thresholdAlertMessageTemplate, lowestSlotLagMBAlertThreshold)) {
if alertSenderConfig.Sender.getSlotLagMBAlertThreshold() > 0 {
if slotInfo.LagInMb > float32(alertSenderConfig.Sender.getSlotLagMBAlertThreshold()) {
a.alertToProvider(ctx, alertSenderConfig, thresholdAlertKey,
fmt.Sprintf(thresholdAlertMessageTemplate, alertSenderConfig.Sender.getSlotLagMBAlertThreshold()))
}
} else {
if slotInfo.LagInMb > float32(defaultSlotLagMBAlertThreshold) {
a.alertToProvider(ctx, alertSenderConfig, thresholdAlertKey,
fmt.Sprintf(thresholdAlertMessageTemplate, defaultSlotLagMBAlertThreshold))
}
}
}

if slotInfo.WalStatus == "lost" || slotInfo.WalStatus == "unreserved" {
a.alertToProvider(ctx, alertSenderConfig, badWalStatusAlertKey, badWalStatusAlertMessageTemplate)
if (slotInfo.WalStatus == "lost" || slotInfo.WalStatus == "unreserved") &&
a.checkAndAddAlertToCatalog(ctx, alertSenderConfig.Id, badWalStatusAlertKey, badWalStatusAlertMessage) {
a.alertToProvider(ctx, alertSenderConfig, badWalStatusAlertKey, badWalStatusAlertMessage)
}
}
}
Expand All @@ -192,7 +222,7 @@ func (a *Alerter) AlertIfOpenConnections(ctx context.Context, alertKeys *AlertKe
) {
alertSenderConfigs, err := a.registerSendersFromPool(ctx)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to set Slack senders", slog.Any("error", err))
logger.LoggerFromCtx(ctx).Warn("failed to set alert senders", slog.Any("error", err))
return
}

Expand Down Expand Up @@ -244,6 +274,49 @@ func (a *Alerter) AlertIfOpenConnections(ctx context.Context, alertKeys *AlertKe
}
}

func (a *Alerter) AlertIfTooLongSinceLastNormalize(ctx context.Context, alertKeys *AlertKeys,
intervalSinceLastNormalize time.Duration,
) {
intervalSinceLastNormalizeThreshold, err := peerdbenv.PeerDBIntervalSinceLastNormalizeThresholdMinutes(ctx, nil)
if err != nil {
logger.LoggerFromCtx(ctx).
Warn("failed to get interval since last normalize threshold from catalog", slog.Any("error", err))
}

if intervalSinceLastNormalizeThreshold == 0 {
logger.LoggerFromCtx(ctx).Info("Alerting disabled via environment variable, returning")
return
}
alertSenderConfigs, err := a.registerSendersFromPool(ctx)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to set alert senders", slog.Any("error", err))
return
}

deploymentUIDPrefix := ""
if peerdbenv.PeerDBDeploymentUID() != "" {
deploymentUIDPrefix = fmt.Sprintf("[%s] - ", peerdbenv.PeerDBDeploymentUID())
}

if intervalSinceLastNormalize > time.Duration(intervalSinceLastNormalizeThreshold)*time.Minute {
alertKey := fmt.Sprintf("%s Too long since last data normalize for PeerDB mirror %s",
deploymentUIDPrefix, alertKeys.FlowName)
alertMessage := fmt.Sprintf("%sData hasn't been synced to the target for mirror `%s` since the last `%s`."+
` This could indicate an issue with the pipeline — please check the UI and logs to confirm.`+
` Alternatively, it might be that the source database is idle and not receiving new updates.`, deploymentUIDPrefix,
alertKeys.FlowName, intervalSinceLastNormalize)

for _, alertSenderConfig := range alertSenderConfigs {
if len(alertSenderConfig.AlertForMirrors) == 0 ||
slices.Contains(alertSenderConfig.AlertForMirrors, alertKeys.FlowName) {
if a.checkAndAddAlertToCatalog(ctx, alertSenderConfig.Id, alertKey, alertMessage) {
a.alertToProvider(ctx, alertSenderConfig, alertKey, alertMessage)
}
}
}
}
}

func (a *Alerter) alertToProvider(ctx context.Context, alertSenderConfig AlertSenderConfig, alertKey string, alertMessage string) {
err := alertSenderConfig.Sender.sendAlert(ctx, alertKey, alertMessage)
if err != nil {
Expand All @@ -266,7 +339,7 @@ func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertConfigId i
return false
}

row := a.catalogPool.QueryRow(ctx,
row := a.CatalogPool.QueryRow(ctx,
`SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1 AND alert_config_id=$2
ORDER BY created_timestamp DESC LIMIT 1`,
alertKey, alertConfigId)
Expand All @@ -278,7 +351,7 @@ func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertConfigId i
}

if time.Since(createdTimestamp) >= dur {
_, err = a.catalogPool.Exec(ctx,
_, err = a.CatalogPool.Exec(ctx,
"INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message,alert_config_id) VALUES($1,$2,$3)",
alertKey, alertMessage, alertConfigId)
if err != nil {
Expand All @@ -295,18 +368,29 @@ func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertConfigId i
}

func (a *Alerter) sendTelemetryMessage(ctx context.Context, flowName string, more string, level telemetry.Level) {
if a.telemetrySender != nil {
details := fmt.Sprintf("[%s] %s", flowName, more)
_, err := a.telemetrySender.SendMessage(ctx, details, details, telemetry.Attributes{
Level: level,
DeploymentUID: peerdbenv.PeerDBDeploymentUID(),
Tags: []string{flowName, peerdbenv.PeerDBDeploymentUID()},
Type: flowName,
})
details := fmt.Sprintf("[%s] %s", flowName, more)
attributes := telemetry.Attributes{
Level: level,
DeploymentUID: peerdbenv.PeerDBDeploymentUID(),
Tags: []string{flowName, peerdbenv.PeerDBDeploymentUID()},
Type: flowName,
}

if a.snsTelemetrySender != nil {
_, err := a.snsTelemetrySender.SendMessage(ctx, details, details, attributes)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to send message to snsTelemetrySender", slog.Any("error", err))
return
}
}

if a.incidentIoTelemetrySender != nil {
status, err := a.incidentIoTelemetrySender.SendMessage(ctx, details, details, attributes)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to send message to telemetrySender", slog.Any("error", err))
logger.LoggerFromCtx(ctx).Warn("failed to send message to incidentIoTelemetrySender", slog.Any("error", err))
return
}
logger.LoggerFromCtx(ctx).Info("received status from incident.io", slog.String("status", *status))
}
}

Expand Down Expand Up @@ -335,7 +419,7 @@ func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error)
logger := logger.LoggerFromCtx(ctx)
errorWithStack := fmt.Sprintf("%+v", err)
logger.Error(err.Error(), slog.Any("stack", errorWithStack))
_, err = a.catalogPool.Exec(ctx,
_, err = a.CatalogPool.Exec(ctx,
"INSERT INTO peerdb_stats.flow_errors(flow_name,error_message,error_type) VALUES($1,$2,$3)",
flowName, errorWithStack, "error")
if err != nil {
Expand All @@ -353,7 +437,7 @@ func (a *Alerter) LogFlowEvent(ctx context.Context, flowName string, info string
func (a *Alerter) LogFlowInfo(ctx context.Context, flowName string, info string) {
logger := logger.LoggerFromCtx(ctx)
logger.Info(info)
_, err := a.catalogPool.Exec(ctx,
_, err := a.CatalogPool.Exec(ctx,
"INSERT INTO peerdb_stats.flow_errors(flow_name,error_message,error_type) VALUES($1,$2,$3)",
flowName, info, "info")
if err != nil {
Expand Down
17 changes: 14 additions & 3 deletions flow/auth/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,24 +68,35 @@ func AuthGrpcMiddleware(unauthenticatedMethods []string) ([]grpc.ServerOption, e
for _, method := range unauthenticatedMethods {
unauthenticatedMethodsMap[method] = struct{}{}
}

return []grpc.ServerOption{
grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
slog.Info("Received gRPC request", slog.String("method", info.FullMethod))

if _, unauthorized := unauthenticatedMethodsMap[info.FullMethod]; !unauthorized {
var authHeader string
authHeaders := metadata.ValueFromIncomingContext(ctx, "Authorization")
if len(authHeaders) == 1 {
authHeader = authHeaders[0]
} else if len(authHeaders) > 1 {
slog.Warn("Multiple Authorization headers supplied, request rejected", slog.String("method", info.FullMethod))
return nil, status.Errorf(codes.Unauthenticated, "multiple Authorization headers supplied, request rejected")
}
_, err := validateRequestToken(authHeader, cfg.OauthJwtCustomClaims, ip...)
if err != nil {
slog.Debug("failed to validate request token", slog.Any("error", err))
slog.Debug("Failed to validate request token", slog.String("method", info.FullMethod), slog.Any("error", err))
return nil, status.Errorf(codes.Unauthenticated, "%s", err.Error())
}
}
return handler(ctx, req)

resp, err := handler(ctx, req)

if err != nil {
slog.Error("gRPC request failed", slog.String("method", info.FullMethod), slog.Any("error", err))
} else {
slog.Info("gRPC request completed successfully", slog.String("method", info.FullMethod))
}

return resp, err
}),
}, nil
}
Expand Down
Loading

0 comments on commit d27ebf6

Please sign in to comment.