Skip to content

Commit

Permalink
Fix slot guage (#2283)
Browse files Browse the repository at this point in the history
There seems to be an issue where RecordSlotSize panics in this line with
nil pointer dereference:
```
slotMetricGauges.SlotLagGauge.Record(ctx, float64(slotInfo[0].LagInMb), metric.WithAttributeSet(attribute.NewSet(
```
As far as I can see the only thing that can be nil is
`slotMetricGauges.SlotLagGauge` , so this PR adds a guard for that and
the other guages here
  • Loading branch information
Amogh-Bharadwaj authored Nov 24, 2024
1 parent 3d81321 commit c0d1d8a
Showing 1 changed file with 42 additions and 25 deletions.
67 changes: 42 additions & 25 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -1215,12 +1215,17 @@ func (c *PostgresConnector) HandleSlotInfo(
logger.Info(fmt.Sprintf("Checking %s lag for %s", alertKeys.SlotName, alertKeys.PeerName),
slog.Float64("LagInMB", float64(slotInfo[0].LagInMb)))
alerter.AlertIfSlotLag(ctx, alertKeys, slotInfo[0])
slotMetricGauges.SlotLagGauge.Record(ctx, float64(slotInfo[0].LagInMb), metric.WithAttributeSet(attribute.NewSet(
attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName),
attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName),
attribute.String(otel_metrics.SlotNameKey, alertKeys.SlotName),
attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID())),
))

if slotMetricGauges.SlotLagGauge != nil {
slotMetricGauges.SlotLagGauge.Record(ctx, float64(slotInfo[0].LagInMb), metric.WithAttributeSet(attribute.NewSet(
attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName),
attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName),
attribute.String(otel_metrics.SlotNameKey, alertKeys.SlotName),
attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID())),
))
} else {
logger.Warn("warning: slotMetricGauges.SlotLagGauge is nil")
}

// Also handles alerts for PeerDB user connections exceeding a given limit here
res, err := getOpenConnectionsForUser(ctx, c.conn, c.config.User)
Expand All @@ -1229,25 +1234,33 @@ func (c *PostgresConnector) HandleSlotInfo(
return err
}
alerter.AlertIfOpenConnections(ctx, alertKeys, res)
slotMetricGauges.OpenConnectionsGauge.Record(ctx, res.CurrentOpenConnections, metric.WithAttributeSet(attribute.NewSet(
attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName),
attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName),
attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()),
)))

if slotMetricGauges.OpenConnectionsGauge != nil {
slotMetricGauges.OpenConnectionsGauge.Record(ctx, res.CurrentOpenConnections, metric.WithAttributeSet(attribute.NewSet(
attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName),
attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName),
attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()),
)))
} else {
logger.Warn("warning: slotMetricGauges.OpenConnectionsGauge is nil")
}
replicationRes, err := getOpenReplicationConnectionsForUser(ctx, c.conn, c.config.User)
if err != nil {
logger.Warn("warning: failed to get current open replication connections", "error", err)
return err
}

slotMetricGauges.OpenReplicationConnectionsGauge.Record(ctx, replicationRes.CurrentOpenConnections,
metric.WithAttributeSet(attribute.NewSet(
attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName),
attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName),
attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()),
)),
)
if slotMetricGauges.OpenReplicationConnectionsGauge != nil {
slotMetricGauges.OpenReplicationConnectionsGauge.Record(ctx, replicationRes.CurrentOpenConnections,
metric.WithAttributeSet(attribute.NewSet(
attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName),
attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName),
attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()),
)),
)
} else {
logger.Warn("warning: slotMetricGauges.OpenReplicationConnectionsGauge is nil")
}

var intervalSinceLastNormalize *time.Duration
if err := alerter.CatalogPool.QueryRow(
Expand All @@ -1261,13 +1274,17 @@ func (c *PostgresConnector) HandleSlotInfo(
return nil
}
if intervalSinceLastNormalize != nil {
slotMetricGauges.IntervalSinceLastNormalizeGauge.Record(ctx, intervalSinceLastNormalize.Seconds(),
metric.WithAttributeSet(attribute.NewSet(
attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName),
attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName),
attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()),
)),
)
if slotMetricGauges.IntervalSinceLastNormalizeGauge != nil {
slotMetricGauges.IntervalSinceLastNormalizeGauge.Record(ctx, intervalSinceLastNormalize.Seconds(),
metric.WithAttributeSet(attribute.NewSet(
attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName),
attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName),
attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()),
)),
)
} else {
logger.Warn("warning: slotMetricGauges.IntervalSinceLastNormalizeGauge is nil")
}
alerter.AlertIfTooLongSinceLastNormalize(ctx, alertKeys, *intervalSinceLastNormalize)
}

Expand Down

0 comments on commit c0d1d8a

Please sign in to comment.