Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Recovery] Metrics #1828

Merged
merged 12 commits into from
Sep 11, 2024
10 changes: 10 additions & 0 deletions pkg/foundation/metrics/measure/measure.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ var (
PipelinesGauge = metrics.NewLabeledGauge("conduit_pipelines",
"Number of pipelines by status.",
[]string{"status"})
PipelineStatusGauge = metrics.NewLabeledGauge(
hariso marked this conversation as resolved.
Show resolved Hide resolved
"conduit_pipeline_status",
"Pipeline statuses.",
[]string{"pipeline_name", "status"},
)
PipelineRecoveringCount = metrics.NewLabeledCounter(
hariso marked this conversation as resolved.
Show resolved Hide resolved
"pipeline_recovering_count",
"Number of times pipelines have been recovering (by by pipeline name)",
hariso marked this conversation as resolved.
Show resolved Hide resolved
[]string{"pipeline_name"},
)
ConnectorsGauge = metrics.NewLabeledGauge("conduit_connectors",
"Number of connectors by type.",
[]string{"type"})
Expand Down
14 changes: 14 additions & 0 deletions pkg/foundation/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ type Gauge interface {

// LabeledGauge describes a gauge that must have values populated before use.
type LabeledGauge interface {
// WithValues returns the Gauge for the given slice of label
// values (same order as the label names used when creating this LabeledGauge).
// If that combination of label values is accessed for the first time,
// a new Gauge is created.
WithValues(labels ...string) Gauge
DeleteLabels(labels ...string) bool
hariso marked this conversation as resolved.
Show resolved Hide resolved
}

// Timer is a metric that allows collecting the duration of an action in
Expand Down Expand Up @@ -318,6 +323,15 @@ func (mt *labeledGauge) WithValues(vs ...string) Gauge {
return g
}

func (mt *labeledGauge) DeleteLabels(labels ...string) bool {
deleted := len(labels) > 0
for _, m := range mt.metrics {
deleted = deleted && m.DeleteLabels(labels...)
}

return deleted
}

type timer struct {
spec
metrics []Timer
Expand Down
1 change: 1 addition & 0 deletions pkg/foundation/metrics/noop/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (Gauge) Set(float64) {}
type LabeledGauge struct{}

func (LabeledGauge) WithValues(...string) metrics.Gauge { return Gauge{} }
func (LabeledGauge) DeleteLabels(...string) bool { return true }

type Timer struct{}

Expand Down
5 changes: 4 additions & 1 deletion pkg/foundation/metrics/prometheus/gauge.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ func (lg *labeledGauge) WithValues(labels ...string) metrics.Gauge {
return &gauge{pg: lg.pg.WithLabelValues(labels...)}
}

func (lg *labeledGauge) DeleteLabels(labels ...string) bool {
return lg.pg.DeleteLabelValues(labels...)
raulb marked this conversation as resolved.
Show resolved Hide resolved
}

func (lg *labeledGauge) Describe(c chan<- *prometheus.Desc) {
lg.pg.Describe(c)
}
Expand All @@ -44,7 +48,6 @@ func (g *gauge) Inc(vs ...float64) {
g.pg.Inc()
return
}

g.pg.Add(sumFloat64(vs...))
}

Expand Down
10 changes: 6 additions & 4 deletions pkg/pipeline/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,10 +593,12 @@ func (s *Service) runPipeline(ctx context.Context, pl *Instance) error {
})
}

measure.PipelinesGauge.WithValues(strings.ToLower(pl.GetStatus().String())).Dec()
s.updateOldStatusMetrics(pl)

pl.SetStatus(StatusRunning)
pl.Error = ""
measure.PipelinesGauge.WithValues(strings.ToLower(pl.GetStatus().String())).Inc()

s.updateNewStatusMetrics(pl)

err := s.store.Set(ctx, pl.ID, pl)
if err != nil {
Expand All @@ -613,7 +615,7 @@ func (s *Service) runPipeline(ctx context.Context, pl *Instance) error {
nodesWg.Wait()
err := pl.t.Err()

measure.PipelinesGauge.WithValues(strings.ToLower(pl.GetStatus().String())).Dec()
s.updateOldStatusMetrics(pl)

switch err {
case tomb.ErrStillAlive:
Expand All @@ -640,7 +642,7 @@ func (s *Service) runPipeline(ctx context.Context, pl *Instance) error {
s.notify(pl.ID, err)
// It's important to update the metrics before we handle the error from s.Store.Set() (if any),
// since the source of the truth is the actual pipeline (stored in memory).
measure.PipelinesGauge.WithValues(strings.ToLower(pl.GetStatus().String())).Inc()
s.updateNewStatusMetrics(pl)

storeErr := s.store.Set(ctx, pl.ID, pl)
if storeErr != nil {
Expand Down
19 changes: 16 additions & 3 deletions pkg/pipeline/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ func (s *Service) Init(ctx context.Context) error {
// change status to "systemStopped" to mark which pipeline was running
instance.SetStatus(StatusSystemStopped)
}
measure.PipelinesGauge.WithValues(strings.ToLower(instance.GetStatus().String())).Inc()

s.updateNewStatusMetrics(instance)
}

s.logger.Info(ctx).Int("count", len(s.instances)).Msg("pipelines initialized")
Expand Down Expand Up @@ -143,7 +144,8 @@ func (s *Service) Create(ctx context.Context, id string, cfg Config, p Provision

s.instances[pl.ID] = pl
s.instanceNames[cfg.Name] = true
measure.PipelinesGauge.WithValues(strings.ToLower(pl.GetStatus().String())).Inc()

s.updateNewStatusMetrics(pl)

return pl, nil
}
Expand Down Expand Up @@ -308,7 +310,8 @@ func (s *Service) Delete(ctx context.Context, pipelineID string) error {

delete(s.instances, pl.ID)
delete(s.instanceNames, pl.Config.Name)
measure.PipelinesGauge.WithValues(strings.ToLower(pl.GetStatus().String())).Dec()

s.updateOldStatusMetrics(pl)

return nil
}
Expand Down Expand Up @@ -363,3 +366,13 @@ func (s *Service) validatePipeline(cfg Config, id string) error {

return cerrors.Join(errs...)
}

func (s *Service) updateOldStatusMetrics(pl *Instance) {
measure.PipelinesGauge.WithValues(strings.ToLower(pl.GetStatus().String())).Dec()
measure.PipelineStatusGauge.DeleteLabels(pl.Config.Name, strings.ToLower(pl.GetStatus().String()))
}

func (s *Service) updateNewStatusMetrics(pl *Instance) {
measure.PipelinesGauge.WithValues(strings.ToLower(pl.GetStatus().String())).Inc()
measure.PipelineStatusGauge.WithValues(pl.Config.Name, strings.ToLower(pl.GetStatus().String())).Set(1)
}