From 593b518f0581bcdeedca49874a348a125a2aaff9 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Wed, 4 Sep 2024 12:51:53 +0200 Subject: [PATCH 1/6] Pipeline status metrics --- pkg/foundation/metrics/measure/measure.go | 3 +++ pkg/foundation/metrics/metrics.go | 14 ++++++++++++++ pkg/foundation/metrics/noop/metrics.go | 1 + pkg/foundation/metrics/prometheus/gauge.go | 5 ++++- pkg/pipeline/lifecycle.go | 10 ++++++---- pkg/pipeline/service.go | 19 ++++++++++++++++--- 6 files changed, 44 insertions(+), 8 deletions(-) diff --git a/pkg/foundation/metrics/measure/measure.go b/pkg/foundation/metrics/measure/measure.go index f627d68ac..c31d81d73 100644 --- a/pkg/foundation/metrics/measure/measure.go +++ b/pkg/foundation/metrics/measure/measure.go @@ -28,6 +28,9 @@ var ( PipelinesGauge = metrics.NewLabeledGauge("conduit_pipelines", "Number of pipelines by status.", []string{"status"}) + PipelineStatusGauge = metrics.NewLabeledGauge("conduit_pipeline_status", + "Pipeline statuses.", + []string{"pipeline_name", "status"}) ConnectorsGauge = metrics.NewLabeledGauge("conduit_connectors", "Number of connectors by type.", []string{"type"}) diff --git a/pkg/foundation/metrics/metrics.go b/pkg/foundation/metrics/metrics.go index 22832d5b0..f79edb0f9 100644 --- a/pkg/foundation/metrics/metrics.go +++ b/pkg/foundation/metrics/metrics.go @@ -68,7 +68,12 @@ type Gauge interface { // LabeledGauge describes a gauge that must have values populated before use. type LabeledGauge interface { + // WithValues returns the Gauge for the given slice of label + // values (same order as the label names used when creating this LabeledGauge). + // If that combination of label values is accessed for the first time, + // a new Gauge is created. WithValues(labels ...string) Gauge + DeleteLabels(labels ...string) bool } // Timer is a metric that allows collecting the duration of an action in @@ -318,6 +323,15 @@ func (mt *labeledGauge) WithValues(vs ...string) Gauge { return g } +func (mt *labeledGauge) DeleteLabels(labels ...string) bool { + deleted := len(labels) > 0 + for _, m := range mt.metrics { + deleted = deleted && m.DeleteLabels(labels...) + } + + return deleted +} + type timer struct { spec metrics []Timer diff --git a/pkg/foundation/metrics/noop/metrics.go b/pkg/foundation/metrics/noop/metrics.go index c1edcf65e..84590b9cb 100644 --- a/pkg/foundation/metrics/noop/metrics.go +++ b/pkg/foundation/metrics/noop/metrics.go @@ -40,6 +40,7 @@ func (Gauge) Set(float64) {} type LabeledGauge struct{} func (LabeledGauge) WithValues(...string) metrics.Gauge { return Gauge{} } +func (LabeledGauge) DeleteLabels(...string) bool { return true } type Timer struct{} diff --git a/pkg/foundation/metrics/prometheus/gauge.go b/pkg/foundation/metrics/prometheus/gauge.go index 7c4ff6305..6a136ffc6 100644 --- a/pkg/foundation/metrics/prometheus/gauge.go +++ b/pkg/foundation/metrics/prometheus/gauge.go @@ -27,6 +27,10 @@ func (lg *labeledGauge) WithValues(labels ...string) metrics.Gauge { return &gauge{pg: lg.pg.WithLabelValues(labels...)} } +func (lg *labeledGauge) DeleteLabels(labels ...string) bool { + return lg.pg.DeleteLabelValues(labels...) +} + func (lg *labeledGauge) Describe(c chan<- *prometheus.Desc) { lg.pg.Describe(c) } @@ -44,7 +48,6 @@ func (g *gauge) Inc(vs ...float64) { g.pg.Inc() return } - g.pg.Add(sumFloat64(vs...)) } diff --git a/pkg/pipeline/lifecycle.go b/pkg/pipeline/lifecycle.go index 9eb8b24a3..a9c35d98d 100644 --- a/pkg/pipeline/lifecycle.go +++ b/pkg/pipeline/lifecycle.go @@ -593,10 +593,12 @@ func (s *Service) runPipeline(ctx context.Context, pl *Instance) error { }) } - measure.PipelinesGauge.WithValues(strings.ToLower(pl.GetStatus().String())).Dec() + s.updateOldStatusMetrics(pl) + pl.SetStatus(StatusRunning) pl.Error = "" - measure.PipelinesGauge.WithValues(strings.ToLower(pl.GetStatus().String())).Inc() + + s.updateNewStatusMetrics(pl) err := s.store.Set(ctx, pl.ID, pl) if err != nil { @@ -613,7 +615,7 @@ func (s *Service) runPipeline(ctx context.Context, pl *Instance) error { nodesWg.Wait() err := pl.t.Err() - measure.PipelinesGauge.WithValues(strings.ToLower(pl.GetStatus().String())).Dec() + s.updateOldStatusMetrics(pl) switch err { case tomb.ErrStillAlive: @@ -640,7 +642,7 @@ func (s *Service) runPipeline(ctx context.Context, pl *Instance) error { s.notify(pl.ID, err) // It's important to update the metrics before we handle the error from s.Store.Set() (if any), // since the source of the truth is the actual pipeline (stored in memory). - measure.PipelinesGauge.WithValues(strings.ToLower(pl.GetStatus().String())).Inc() + s.updateNewStatusMetrics(pl) storeErr := s.store.Set(ctx, pl.ID, pl) if storeErr != nil { diff --git a/pkg/pipeline/service.go b/pkg/pipeline/service.go index 95f910765..aef7a5a49 100644 --- a/pkg/pipeline/service.go +++ b/pkg/pipeline/service.go @@ -85,7 +85,8 @@ func (s *Service) Init(ctx context.Context) error { // change status to "systemStopped" to mark which pipeline was running instance.SetStatus(StatusSystemStopped) } - measure.PipelinesGauge.WithValues(strings.ToLower(instance.GetStatus().String())).Inc() + + s.updateNewStatusMetrics(instance) } s.logger.Info(ctx).Int("count", len(s.instances)).Msg("pipelines initialized") @@ -143,7 +144,8 @@ func (s *Service) Create(ctx context.Context, id string, cfg Config, p Provision s.instances[pl.ID] = pl s.instanceNames[cfg.Name] = true - measure.PipelinesGauge.WithValues(strings.ToLower(pl.GetStatus().String())).Inc() + + s.updateNewStatusMetrics(pl) return pl, nil } @@ -308,7 +310,8 @@ func (s *Service) Delete(ctx context.Context, pipelineID string) error { delete(s.instances, pl.ID) delete(s.instanceNames, pl.Config.Name) - measure.PipelinesGauge.WithValues(strings.ToLower(pl.GetStatus().String())).Dec() + + s.updateOldStatusMetrics(pl) return nil } @@ -363,3 +366,13 @@ func (s *Service) validatePipeline(cfg Config, id string) error { return cerrors.Join(errs...) } + +func (s *Service) updateOldStatusMetrics(pl *Instance) { + measure.PipelinesGauge.WithValues(strings.ToLower(pl.GetStatus().String())).Dec() + measure.PipelineStatusGauge.DeleteLabels(pl.Config.Name, strings.ToLower(pl.GetStatus().String())) +} + +func (s *Service) updateNewStatusMetrics(pl *Instance) { + measure.PipelinesGauge.WithValues(strings.ToLower(pl.GetStatus().String())).Inc() + measure.PipelineStatusGauge.WithValues(pl.Config.Name, strings.ToLower(pl.GetStatus().String())).Set(1) +} From e5b1a02a42486b44d9584a2faf8b3054a258b7bb Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Wed, 4 Sep 2024 13:01:30 +0200 Subject: [PATCH 2/6] Add pipeline_recovering_count --- pkg/foundation/metrics/measure/measure.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/pkg/foundation/metrics/measure/measure.go b/pkg/foundation/metrics/measure/measure.go index c31d81d73..bff880255 100644 --- a/pkg/foundation/metrics/measure/measure.go +++ b/pkg/foundation/metrics/measure/measure.go @@ -28,9 +28,16 @@ var ( PipelinesGauge = metrics.NewLabeledGauge("conduit_pipelines", "Number of pipelines by status.", []string{"status"}) - PipelineStatusGauge = metrics.NewLabeledGauge("conduit_pipeline_status", + PipelineStatusGauge = metrics.NewLabeledGauge( + "conduit_pipeline_status", "Pipeline statuses.", - []string{"pipeline_name", "status"}) + []string{"pipeline_name", "status"}, + ) + PipelineRecoveringCount = metrics.NewLabeledCounter( + "pipeline_recovering_count", + "Number of times pipelines have been recovering (by by pipeline name)", + []string{"pipeline_name"}, + ) ConnectorsGauge = metrics.NewLabeledGauge("conduit_connectors", "Number of connectors by type.", []string{"type"}) From e9b4581a09e3062ebc1fcd1d6fc70d64b8316bf5 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Wed, 4 Sep 2024 14:08:54 +0200 Subject: [PATCH 3/6] minor refactoring --- pkg/pipeline/service.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/pipeline/service.go b/pkg/pipeline/service.go index aef7a5a49..6311333d7 100644 --- a/pkg/pipeline/service.go +++ b/pkg/pipeline/service.go @@ -368,11 +368,13 @@ func (s *Service) validatePipeline(cfg Config, id string) error { } func (s *Service) updateOldStatusMetrics(pl *Instance) { - measure.PipelinesGauge.WithValues(strings.ToLower(pl.GetStatus().String())).Dec() - measure.PipelineStatusGauge.DeleteLabels(pl.Config.Name, strings.ToLower(pl.GetStatus().String())) + status := strings.ToLower(pl.GetStatus().String()) + measure.PipelinesGauge.WithValues(status).Dec() + measure.PipelineStatusGauge.DeleteLabels(pl.Config.Name, status) } func (s *Service) updateNewStatusMetrics(pl *Instance) { - measure.PipelinesGauge.WithValues(strings.ToLower(pl.GetStatus().String())).Inc() - measure.PipelineStatusGauge.WithValues(pl.Config.Name, strings.ToLower(pl.GetStatus().String())).Set(1) + status := strings.ToLower(pl.GetStatus().String()) + measure.PipelinesGauge.WithValues(status).Inc() + measure.PipelineStatusGauge.WithValues(pl.Config.Name, status).Set(1) } From 4175b9fff1fbd372f2eded8b7f493665510f737e Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Thu, 5 Sep 2024 16:07:23 +0200 Subject: [PATCH 4/6] comment --- pkg/foundation/metrics/metrics.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/foundation/metrics/metrics.go b/pkg/foundation/metrics/metrics.go index f79edb0f9..b332eb48f 100644 --- a/pkg/foundation/metrics/metrics.go +++ b/pkg/foundation/metrics/metrics.go @@ -73,6 +73,9 @@ type LabeledGauge interface { // If that combination of label values is accessed for the first time, // a new Gauge is created. WithValues(labels ...string) Gauge + // DeleteLabels removes the metric where the variable labels are the same + // as those passed in as labels (same order as the label names used when creating this LabeledGauge). + // It returns true if a metric was deleted. DeleteLabels(labels ...string) bool } From 517b9a9080912d51e17454174799e0c8a523c55f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Haris=20Osmanagi=C4=87?= Date: Thu, 5 Sep 2024 16:24:02 +0200 Subject: [PATCH 5/6] Update pkg/foundation/metrics/measure/measure.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Raúl Barroso --- pkg/foundation/metrics/measure/measure.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/foundation/metrics/measure/measure.go b/pkg/foundation/metrics/measure/measure.go index bff880255..00d05fd30 100644 --- a/pkg/foundation/metrics/measure/measure.go +++ b/pkg/foundation/metrics/measure/measure.go @@ -35,7 +35,7 @@ var ( ) PipelineRecoveringCount = metrics.NewLabeledCounter( "pipeline_recovering_count", - "Number of times pipelines have been recovering (by by pipeline name)", + "Number of times a pipeline have been recovering (by pipeline name)", []string{"pipeline_name"}, ) ConnectorsGauge = metrics.NewLabeledGauge("conduit_connectors", From 43305423e0f717530ad62c5dd686767140e414fe Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Thu, 5 Sep 2024 18:30:27 +0200 Subject: [PATCH 6/6] update --- pkg/foundation/metrics/measure/measure.go | 4 ++-- pkg/foundation/metrics/metrics.go | 13 ------------- pkg/foundation/metrics/noop/metrics.go | 1 - pkg/foundation/metrics/prometheus/gauge.go | 4 ---- pkg/pipeline/service.go | 3 +-- 5 files changed, 3 insertions(+), 22 deletions(-) diff --git a/pkg/foundation/metrics/measure/measure.go b/pkg/foundation/metrics/measure/measure.go index 00d05fd30..e670b2782 100644 --- a/pkg/foundation/metrics/measure/measure.go +++ b/pkg/foundation/metrics/measure/measure.go @@ -30,8 +30,8 @@ var ( []string{"status"}) PipelineStatusGauge = metrics.NewLabeledGauge( "conduit_pipeline_status", - "Pipeline statuses.", - []string{"pipeline_name", "status"}, + "A pipeline's status (as specified in the gRPC API: https://buf.build/conduitio/conduit/docs/main:api.v1#api.v1.Pipeline.State).", + []string{"pipeline_name"}, ) PipelineRecoveringCount = metrics.NewLabeledCounter( "pipeline_recovering_count", diff --git a/pkg/foundation/metrics/metrics.go b/pkg/foundation/metrics/metrics.go index b332eb48f..60cb0971c 100644 --- a/pkg/foundation/metrics/metrics.go +++ b/pkg/foundation/metrics/metrics.go @@ -73,10 +73,6 @@ type LabeledGauge interface { // If that combination of label values is accessed for the first time, // a new Gauge is created. WithValues(labels ...string) Gauge - // DeleteLabels removes the metric where the variable labels are the same - // as those passed in as labels (same order as the label names used when creating this LabeledGauge). - // It returns true if a metric was deleted. - DeleteLabels(labels ...string) bool } // Timer is a metric that allows collecting the duration of an action in @@ -326,15 +322,6 @@ func (mt *labeledGauge) WithValues(vs ...string) Gauge { return g } -func (mt *labeledGauge) DeleteLabels(labels ...string) bool { - deleted := len(labels) > 0 - for _, m := range mt.metrics { - deleted = deleted && m.DeleteLabels(labels...) - } - - return deleted -} - type timer struct { spec metrics []Timer diff --git a/pkg/foundation/metrics/noop/metrics.go b/pkg/foundation/metrics/noop/metrics.go index 84590b9cb..c1edcf65e 100644 --- a/pkg/foundation/metrics/noop/metrics.go +++ b/pkg/foundation/metrics/noop/metrics.go @@ -40,7 +40,6 @@ func (Gauge) Set(float64) {} type LabeledGauge struct{} func (LabeledGauge) WithValues(...string) metrics.Gauge { return Gauge{} } -func (LabeledGauge) DeleteLabels(...string) bool { return true } type Timer struct{} diff --git a/pkg/foundation/metrics/prometheus/gauge.go b/pkg/foundation/metrics/prometheus/gauge.go index 6a136ffc6..72055d1e5 100644 --- a/pkg/foundation/metrics/prometheus/gauge.go +++ b/pkg/foundation/metrics/prometheus/gauge.go @@ -27,10 +27,6 @@ func (lg *labeledGauge) WithValues(labels ...string) metrics.Gauge { return &gauge{pg: lg.pg.WithLabelValues(labels...)} } -func (lg *labeledGauge) DeleteLabels(labels ...string) bool { - return lg.pg.DeleteLabelValues(labels...) -} - func (lg *labeledGauge) Describe(c chan<- *prometheus.Desc) { lg.pg.Describe(c) } diff --git a/pkg/pipeline/service.go b/pkg/pipeline/service.go index 6311333d7..ca8373335 100644 --- a/pkg/pipeline/service.go +++ b/pkg/pipeline/service.go @@ -370,11 +370,10 @@ func (s *Service) validatePipeline(cfg Config, id string) error { func (s *Service) updateOldStatusMetrics(pl *Instance) { status := strings.ToLower(pl.GetStatus().String()) measure.PipelinesGauge.WithValues(status).Dec() - measure.PipelineStatusGauge.DeleteLabels(pl.Config.Name, status) } func (s *Service) updateNewStatusMetrics(pl *Instance) { status := strings.ToLower(pl.GetStatus().String()) measure.PipelinesGauge.WithValues(status).Inc() - measure.PipelineStatusGauge.WithValues(pl.Config.Name, status).Set(1) + measure.PipelineStatusGauge.WithValues(pl.Config.Name).Set(float64(pl.GetStatus())) }