diff --git a/e2e/service.go b/e2e/service.go index 544707f..a711c46 100644 --- a/e2e/service.go +++ b/e2e/service.go @@ -8,7 +8,6 @@ import ( "github.com/cloudhut/kminion/v2/kafka" "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/twmb/franz-go/pkg/kgo" "go.uber.org/zap" ) @@ -44,7 +43,7 @@ type Service struct { } // NewService creates a new instance of the e2e moinitoring service (wow) -func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metricNamespace string) (*Service, error) { +func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, promRegisterer prometheus.Registerer) (*Service, error) { minionID := uuid.NewString() groupID := fmt.Sprintf("%v-%v", cfg.Consumer.GroupIdPrefix, minionID) @@ -94,29 +93,32 @@ func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *k svc.messageTracker = newMessageTracker(svc) makeCounterVec := func(name string, labelNames []string, help string) *prometheus.CounterVec { - return promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: metricNamespace, + cv := prometheus.NewCounterVec(prometheus.CounterOpts{ Subsystem: "end_to_end", Name: name, Help: help, }, labelNames) + promRegisterer.MustRegister(cv) + return cv } makeGaugeVec := func(name string, labelNames []string, help string) *prometheus.GaugeVec { - return promauto.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: metricNamespace, + gv := prometheus.NewGaugeVec(prometheus.GaugeOpts{ Subsystem: "end_to_end", Name: name, Help: help, }, labelNames) + promRegisterer.MustRegister(gv) + return gv } makeHistogramVec := func(name string, maxLatency time.Duration, labelNames []string, help string) *prometheus.HistogramVec { - return promauto.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: metricNamespace, + hv := prometheus.NewHistogramVec(prometheus.HistogramOpts{ Subsystem: "end_to_end", Name: name, Help: help, Buckets: createHistogramBuckets(maxLatency), }, labelNames) + promRegisterer.MustRegister(hv) + return hv } // Low-level info @@ -178,7 +180,6 @@ func (s *Service) Start(ctx context.Context) error { case <-initCh: isInitialized = true s.logger.Info("consumer has been successfully initialized") - break case <-ctx.Done(): return nil } diff --git a/main.go b/main.go index bc4b8b9..caaf973 100644 --- a/main.go +++ b/main.go @@ -54,6 +54,8 @@ func main() { } }() + wrappedRegisterer := promclient.WrapRegistererWithPrefix(cfg.Exporter.Namespace+"_", promclient.DefaultRegisterer) + // Create kafka service kafkaSvc := kafka.NewService(cfg.Kafka, logger) @@ -77,7 +79,7 @@ func main() { cfg.Minion.EndToEnd, logger, kafkaSvc, - cfg.Exporter.Namespace, + wrappedRegisterer, ) if err != nil { logger.Fatal("failed to create end-to-end monitoring service: %w", zap.Error(err))