Skip to content

Commit

Permalink
[Recovery] Metrics (#1828)
Browse files Browse the repository at this point in the history
  • Loading branch information
hariso authored Sep 11, 2024
1 parent ef35c07 commit c284938
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 8 deletions.
10 changes: 10 additions & 0 deletions pkg/foundation/metrics/measure/measure.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ var (
PipelinesGauge = metrics.NewLabeledGauge("conduit_pipelines",
"Number of pipelines by status.",
[]string{"status"})
PipelineStatusGauge = metrics.NewLabeledGauge(
"conduit_pipeline_status",
"A pipeline's status (as specified in the gRPC API: https://buf.build/conduitio/conduit/docs/main:api.v1#api.v1.Pipeline.State).",
[]string{"pipeline_name"},
)
PipelineRecoveringCount = metrics.NewLabeledCounter(
"pipeline_recovering_count",
"Number of times a pipeline have been recovering (by pipeline name)",
[]string{"pipeline_name"},
)
ConnectorsGauge = metrics.NewLabeledGauge("conduit_connectors",
"Number of connectors by type.",
[]string{"type"})
Expand Down
4 changes: 4 additions & 0 deletions pkg/foundation/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ type Gauge interface {

// LabeledGauge describes a gauge that must have values populated before use.
type LabeledGauge interface {
// WithValues returns the Gauge for the given slice of label
// values (same order as the label names used when creating this LabeledGauge).
// If that combination of label values is accessed for the first time,
// a new Gauge is created.
WithValues(labels ...string) Gauge
}

Expand Down
1 change: 0 additions & 1 deletion pkg/foundation/metrics/prometheus/gauge.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ func (g *gauge) Inc(vs ...float64) {
g.pg.Inc()
return
}

g.pg.Add(sumFloat64(vs...))
}

Expand Down
10 changes: 6 additions & 4 deletions pkg/pipeline/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,10 +593,12 @@ func (s *Service) runPipeline(ctx context.Context, pl *Instance) error {
})
}

measure.PipelinesGauge.WithValues(strings.ToLower(pl.GetStatus().String())).Dec()
s.updateOldStatusMetrics(pl)

pl.SetStatus(StatusRunning)
pl.Error = ""
measure.PipelinesGauge.WithValues(strings.ToLower(pl.GetStatus().String())).Inc()

s.updateNewStatusMetrics(pl)

err := s.store.Set(ctx, pl.ID, pl)
if err != nil {
Expand All @@ -613,7 +615,7 @@ func (s *Service) runPipeline(ctx context.Context, pl *Instance) error {
nodesWg.Wait()
err := pl.t.Err()

measure.PipelinesGauge.WithValues(strings.ToLower(pl.GetStatus().String())).Dec()
s.updateOldStatusMetrics(pl)

switch err {
case tomb.ErrStillAlive:
Expand All @@ -640,7 +642,7 @@ func (s *Service) runPipeline(ctx context.Context, pl *Instance) error {
s.notify(pl.ID, err)
// It's important to update the metrics before we handle the error from s.Store.Set() (if any),
// since the source of the truth is the actual pipeline (stored in memory).
measure.PipelinesGauge.WithValues(strings.ToLower(pl.GetStatus().String())).Inc()
s.updateNewStatusMetrics(pl)

storeErr := s.store.Set(ctx, pl.ID, pl)
if storeErr != nil {
Expand Down
20 changes: 17 additions & 3 deletions pkg/pipeline/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ func (s *Service) Init(ctx context.Context) error {
// change status to "systemStopped" to mark which pipeline was running
instance.SetStatus(StatusSystemStopped)
}
measure.PipelinesGauge.WithValues(strings.ToLower(instance.GetStatus().String())).Inc()

s.updateNewStatusMetrics(instance)
}

s.logger.Info(ctx).Int("count", len(s.instances)).Msg("pipelines initialized")
Expand Down Expand Up @@ -143,7 +144,8 @@ func (s *Service) Create(ctx context.Context, id string, cfg Config, p Provision

s.instances[pl.ID] = pl
s.instanceNames[cfg.Name] = true
measure.PipelinesGauge.WithValues(strings.ToLower(pl.GetStatus().String())).Inc()

s.updateNewStatusMetrics(pl)

return pl, nil
}
Expand Down Expand Up @@ -308,7 +310,8 @@ func (s *Service) Delete(ctx context.Context, pipelineID string) error {

delete(s.instances, pl.ID)
delete(s.instanceNames, pl.Config.Name)
measure.PipelinesGauge.WithValues(strings.ToLower(pl.GetStatus().String())).Dec()

s.updateOldStatusMetrics(pl)

return nil
}
Expand Down Expand Up @@ -363,3 +366,14 @@ func (s *Service) validatePipeline(cfg Config, id string) error {

return cerrors.Join(errs...)
}

func (s *Service) updateOldStatusMetrics(pl *Instance) {
status := strings.ToLower(pl.GetStatus().String())
measure.PipelinesGauge.WithValues(status).Dec()
}

func (s *Service) updateNewStatusMetrics(pl *Instance) {
status := strings.ToLower(pl.GetStatus().String())
measure.PipelinesGauge.WithValues(status).Inc()
measure.PipelineStatusGauge.WithValues(pl.Config.Name).Set(float64(pl.GetStatus()))
}

0 comments on commit c284938

Please sign in to comment.