diff --git a/pkg/foundation/metrics/measure/measure.go b/pkg/foundation/metrics/measure/measure.go index f627d68ac..e670b2782 100644 --- a/pkg/foundation/metrics/measure/measure.go +++ b/pkg/foundation/metrics/measure/measure.go @@ -28,6 +28,16 @@ var ( PipelinesGauge = metrics.NewLabeledGauge("conduit_pipelines", "Number of pipelines by status.", []string{"status"}) + PipelineStatusGauge = metrics.NewLabeledGauge( + "conduit_pipeline_status", + "A pipeline's status (as specified in the gRPC API: https://buf.build/conduitio/conduit/docs/main:api.v1#api.v1.Pipeline.State).", + []string{"pipeline_name"}, + ) + PipelineRecoveringCount = metrics.NewLabeledCounter( + "pipeline_recovering_count", + "Number of times a pipeline have been recovering (by pipeline name)", + []string{"pipeline_name"}, + ) ConnectorsGauge = metrics.NewLabeledGauge("conduit_connectors", "Number of connectors by type.", []string{"type"}) diff --git a/pkg/foundation/metrics/metrics.go b/pkg/foundation/metrics/metrics.go index 22832d5b0..60cb0971c 100644 --- a/pkg/foundation/metrics/metrics.go +++ b/pkg/foundation/metrics/metrics.go @@ -68,6 +68,10 @@ type Gauge interface { // LabeledGauge describes a gauge that must have values populated before use. type LabeledGauge interface { + // WithValues returns the Gauge for the given slice of label + // values (same order as the label names used when creating this LabeledGauge). + // If that combination of label values is accessed for the first time, + // a new Gauge is created. WithValues(labels ...string) Gauge } diff --git a/pkg/foundation/metrics/prometheus/gauge.go b/pkg/foundation/metrics/prometheus/gauge.go index 7c4ff6305..72055d1e5 100644 --- a/pkg/foundation/metrics/prometheus/gauge.go +++ b/pkg/foundation/metrics/prometheus/gauge.go @@ -44,7 +44,6 @@ func (g *gauge) Inc(vs ...float64) { g.pg.Inc() return } - g.pg.Add(sumFloat64(vs...)) } diff --git a/pkg/pipeline/lifecycle.go b/pkg/pipeline/lifecycle.go index 9eb8b24a3..a9c35d98d 100644 --- a/pkg/pipeline/lifecycle.go +++ b/pkg/pipeline/lifecycle.go @@ -593,10 +593,12 @@ func (s *Service) runPipeline(ctx context.Context, pl *Instance) error { }) } - measure.PipelinesGauge.WithValues(strings.ToLower(pl.GetStatus().String())).Dec() + s.updateOldStatusMetrics(pl) + pl.SetStatus(StatusRunning) pl.Error = "" - measure.PipelinesGauge.WithValues(strings.ToLower(pl.GetStatus().String())).Inc() + + s.updateNewStatusMetrics(pl) err := s.store.Set(ctx, pl.ID, pl) if err != nil { @@ -613,7 +615,7 @@ func (s *Service) runPipeline(ctx context.Context, pl *Instance) error { nodesWg.Wait() err := pl.t.Err() - measure.PipelinesGauge.WithValues(strings.ToLower(pl.GetStatus().String())).Dec() + s.updateOldStatusMetrics(pl) switch err { case tomb.ErrStillAlive: @@ -640,7 +642,7 @@ func (s *Service) runPipeline(ctx context.Context, pl *Instance) error { s.notify(pl.ID, err) // It's important to update the metrics before we handle the error from s.Store.Set() (if any), // since the source of the truth is the actual pipeline (stored in memory). - measure.PipelinesGauge.WithValues(strings.ToLower(pl.GetStatus().String())).Inc() + s.updateNewStatusMetrics(pl) storeErr := s.store.Set(ctx, pl.ID, pl) if storeErr != nil { diff --git a/pkg/pipeline/service.go b/pkg/pipeline/service.go index 95f910765..ca8373335 100644 --- a/pkg/pipeline/service.go +++ b/pkg/pipeline/service.go @@ -85,7 +85,8 @@ func (s *Service) Init(ctx context.Context) error { // change status to "systemStopped" to mark which pipeline was running instance.SetStatus(StatusSystemStopped) } - measure.PipelinesGauge.WithValues(strings.ToLower(instance.GetStatus().String())).Inc() + + s.updateNewStatusMetrics(instance) } s.logger.Info(ctx).Int("count", len(s.instances)).Msg("pipelines initialized") @@ -143,7 +144,8 @@ func (s *Service) Create(ctx context.Context, id string, cfg Config, p Provision s.instances[pl.ID] = pl s.instanceNames[cfg.Name] = true - measure.PipelinesGauge.WithValues(strings.ToLower(pl.GetStatus().String())).Inc() + + s.updateNewStatusMetrics(pl) return pl, nil } @@ -308,7 +310,8 @@ func (s *Service) Delete(ctx context.Context, pipelineID string) error { delete(s.instances, pl.ID) delete(s.instanceNames, pl.Config.Name) - measure.PipelinesGauge.WithValues(strings.ToLower(pl.GetStatus().String())).Dec() + + s.updateOldStatusMetrics(pl) return nil } @@ -363,3 +366,14 @@ func (s *Service) validatePipeline(cfg Config, id string) error { return cerrors.Join(errs...) } + +func (s *Service) updateOldStatusMetrics(pl *Instance) { + status := strings.ToLower(pl.GetStatus().String()) + measure.PipelinesGauge.WithValues(status).Dec() +} + +func (s *Service) updateNewStatusMetrics(pl *Instance) { + status := strings.ToLower(pl.GetStatus().String()) + measure.PipelinesGauge.WithValues(status).Inc() + measure.PipelineStatusGauge.WithValues(pl.Config.Name).Set(float64(pl.GetStatus())) +}