From eab51a3e67b2292d06be8d5b5e0ae45848316ecb Mon Sep 17 00:00:00 2001 From: Vyom Yadav Date: Sun, 27 Oct 2024 23:13:24 +0530 Subject: [PATCH] Record Metrics for Reminder Signed-off-by: Vyom Yadav --- internal/reminder/metrics/metrics.go | 84 ++++++++++++++++ internal/reminder/metrics/provider.go | 132 ++++++++++++++++++++++++++ internal/reminder/reminder.go | 42 ++++++-- pkg/config/reminder/config.go | 1 + pkg/config/reminder/metrics.go | 7 ++ 5 files changed, 260 insertions(+), 6 deletions(-) create mode 100644 internal/reminder/metrics/metrics.go create mode 100644 internal/reminder/metrics/provider.go create mode 100644 pkg/config/reminder/metrics.go diff --git a/internal/reminder/metrics/metrics.go b/internal/reminder/metrics/metrics.go new file mode 100644 index 0000000000..a9b4f077b2 --- /dev/null +++ b/internal/reminder/metrics/metrics.go @@ -0,0 +1,84 @@ +package metrics + +import ( + "context" + "go.opentelemetry.io/otel/metric" +) + +// Default bucket boundaries in seconds for the send delay histogram +var sendDelayBuckets = []float64{ + 0, // immediate + 10, // 10 seconds + 20, // 20 seconds + 40, // 40 seconds + 80, // 1m 20s + 160, // 2m 40s + 320, // 5m 20s + 640, // 10m 40s + 1280, // 21m 20s +} + +type Metrics struct { + // Time between when a reminder became eligible and when it was sent + SendDelay metric.Float64Histogram + + // Current number of reminders in the batch + BatchSize metric.Int64Gauge + + // Average batch size (updated on each batch) + AvgBatchSize metric.Float64Gauge + + // For tracking average calculation + // TODO: consider persisting this to avoid reset on restart (maybe) + totalBatches int64 + totalReminders int64 +} + +func NewMetrics(meter metric.Meter) (*Metrics, error) { + sendDelay, err := meter.Float64Histogram( + "reminder_send_delay", + metric.WithDescription("Time between reminder becoming eligible and actual send (seconds)"), + metric.WithUnit("s"), + metric.WithExplicitBucketBoundaries(sendDelayBuckets...), + ) + if err != nil { + return nil, err + } + + batchSize, err := meter.Int64Gauge( + "reminder_batch_size", + metric.WithDescription("Current number of reminders in the batch"), + ) + if err != nil { + return nil, err + } + + avgBatchSize, err := meter.Float64Gauge( + "reminder_avg_batch_size", + metric.WithDescription("Average number of reminders per batch"), + ) + if err != nil { + return nil, err + } + + return &Metrics{ + SendDelay: sendDelay, + BatchSize: batchSize, + AvgBatchSize: avgBatchSize, + }, nil +} + +func (m *Metrics) RecordBatch(ctx context.Context, size int64) { + // Update current batch size + m.BatchSize.Record(ctx, size) + + // Update running average + m.totalBatches++ + m.totalReminders += size + avgSize := float64(m.totalReminders) / float64(m.totalBatches) + m.AvgBatchSize.Record(ctx, avgSize) +} + +func (m *Metrics) RecordSendDelay(ctx context.Context, delaySeconds float64) { + m.SendDelay.Record(ctx, delaySeconds) +} diff --git a/internal/reminder/metrics/provider.go b/internal/reminder/metrics/provider.go new file mode 100644 index 0000000000..bc259c38d6 --- /dev/null +++ b/internal/reminder/metrics/provider.go @@ -0,0 +1,132 @@ +package metrics + +import ( + "context" + "errors" + "fmt" + "github.com/mindersec/minder/pkg/config/reminder" + "net/http" + "time" + + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/rs/zerolog/log" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/prometheus" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.17.0" +) + +const ( + metricsPath = "/metrics" + readHeaderTimeout = 2 * time.Second +) + +// Provider manages the metrics server and OpenTelemetry setup +type Provider struct { + server *http.Server + mp *sdkmetric.MeterProvider + metrics *Metrics +} + +// NewProvider creates a new metrics provider +func NewProvider(cfg *reminder.MetricsConfig) (*Provider, error) { + if cfg == nil { + return nil, errors.New("metrics config is nil") + } + + if !cfg.Enabled { + return &Provider{}, nil + } + + // Create Prometheus exporter + prometheusExporter, err := prometheus.New( + prometheus.WithNamespace("reminder_service"), + ) + if err != nil { + return nil, fmt.Errorf("failed to create Prometheus exporter: %w", err) + } + + // Create resource with service information + res := resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceName("reminder-service"), + semconv.ServiceVersion("v0.1.0"), + ) + + // Create meter provider + mp := sdkmetric.NewMeterProvider( + sdkmetric.WithReader(prometheusExporter), + sdkmetric.WithResource(res), + ) + + // Set global meter provider + otel.SetMeterProvider(mp) + + // Create metrics + meter := mp.Meter("reminder-service") + metrics, err := NewMetrics(meter) + if err != nil { + return nil, fmt.Errorf("failed to create metrics: %w", err) + } + + // Create HTTP server + mux := http.NewServeMux() + mux.Handle(metricsPath, promhttp.Handler()) + + server := &http.Server{ + Addr: fmt.Sprintf("%s:%d", cfg.Host, cfg.Port), + Handler: mux, + ReadHeaderTimeout: readHeaderTimeout, + } + + return &Provider{ + server: server, + mp: mp, + metrics: metrics, + }, nil +} + +// Start starts the metrics server if enabled +func (p *Provider) Start(ctx context.Context) error { + if p.server == nil { + return nil // Metrics disabled + } + + errCh := make(chan error) + go func() { + log.Info().Str("address", p.server.Addr).Msg("Starting metrics server") + if err := p.server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + errCh <- fmt.Errorf("metrics server error: %w", err) + } + }() + + select { + case err := <-errCh: + return err + case <-ctx.Done(): + return p.Shutdown(ctx) + } +} + +// Shutdown gracefully shuts down the metrics server +func (p *Provider) Shutdown(ctx context.Context) error { + if p.server == nil { + return nil + } + + shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + log.Info().Msg("Shutting down metrics server") + if err := p.mp.Shutdown(shutdownCtx); err != nil { + log.Error().Err(err).Msg("Error shutting down meter provider") + } + + return p.server.Shutdown(shutdownCtx) +} + +// Metrics returns the metrics instance +func (p *Provider) Metrics() *Metrics { + return p.metrics +} diff --git a/internal/reminder/reminder.go b/internal/reminder/reminder.go index 7ec9a61974..6fdea0699c 100644 --- a/internal/reminder/reminder.go +++ b/internal/reminder/reminder.go @@ -8,6 +8,7 @@ import ( "context" "errors" "fmt" + "github.com/mindersec/minder/internal/reminder/metrics" "sync" "time" @@ -44,6 +45,8 @@ type reminder struct { eventPublisher message.Publisher eventDBCloser common.DriverCloser + + metricsProvider *metrics.Provider } // NewReminder creates a new reminder instance @@ -66,6 +69,13 @@ func NewReminder(ctx context.Context, store db.Store, config *reminderconfig.Con r.eventPublisher = pub r.eventDBCloser = cl + + metricsProvider, err := metrics.NewProvider(&config.MetricsConfig) + if err != nil { + return nil, fmt.Errorf("error creating metrics provider: %w", err) + } + + r.metricsProvider = metricsProvider return r, nil } @@ -78,6 +88,11 @@ func (r *reminder) Start(ctx context.Context) error { default: } + err := r.metricsProvider.Start(ctx) + if err != nil { + return fmt.Errorf("error starting metrics provider: %w", err) + } + interval := r.cfg.RecurrenceConfig.Interval if interval <= 0 { return fmt.Errorf("invalid interval: %s", r.cfg.RecurrenceConfig.Interval) @@ -119,6 +134,11 @@ func (r *reminder) Stop() { r.stopOnce.Do(func() { close(r.stop) r.eventDBCloser() + + err := r.metricsProvider.Shutdown(context.Background()) + if err != nil { + zerolog.Ctx(context.Background()).Error().Err(err).Msg("error shutting down metrics provider") + } }) } @@ -143,6 +163,11 @@ func (r *reminder) sendReminders(ctx context.Context) error { return fmt.Errorf("error creating reminder messages: %w", err) } + remMetrics := r.metricsProvider.Metrics() + if remMetrics != nil { + remMetrics.RecordBatch(ctx, int64(len(repos))) + } + err = r.eventPublisher.Publish(constants.TopicQueueRepoReminder, messages...) if err != nil { return fmt.Errorf("error publishing messages: %w", err) @@ -151,14 +176,19 @@ func (r *reminder) sendReminders(ctx context.Context) error { repoIds := make([]uuid.UUID, len(repos)) for _, repo := range repos { repoIds = append(repoIds, repo.ID) + if remMetrics != nil { + // sendDelay = Now() - ReminderLastSent - MinElapsed + reminderLastSent := repo.ReminderLastSent + if reminderLastSent.Valid { + remMetrics.SendDelay.Record(ctx, (time.Now().Sub(reminderLastSent.Time) - r.cfg.RecurrenceConfig.MinElapsed).Seconds()) + } else { + // TODO: Should the send delay be zero if the reminder has never been sent? + remMetrics.SendDelay.Record(ctx, 0) + //remMetrics.SendDelay.Record(ctx, r.cfg.RecurrenceConfig.MinElapsed.Seconds()) + } + } } - // TODO: Collect Metrics - // Potential metrics: - // - Gauge: Number of reminders in the current batch - // - UpDownCounter: Average reminders sent per batch - // - Histogram: reminder_last_sent time distribution - err = r.store.UpdateReminderLastSentForRepositories(ctx, repoIds) if err != nil { return fmt.Errorf("reminders published but error updating last sent time: %w", err) diff --git a/pkg/config/reminder/config.go b/pkg/config/reminder/config.go index c53b5c1129..2e4f398545 100644 --- a/pkg/config/reminder/config.go +++ b/pkg/config/reminder/config.go @@ -19,6 +19,7 @@ type Config struct { RecurrenceConfig RecurrenceConfig `mapstructure:"recurrence"` EventConfig EventConfig `mapstructure:"events"` LoggingConfig LoggingConfig `mapstructure:"logging"` + MetricsConfig MetricsConfig `mapstructure:"metrics"` } // Validate validates the configuration diff --git a/pkg/config/reminder/metrics.go b/pkg/config/reminder/metrics.go new file mode 100644 index 0000000000..1596f5b3cd --- /dev/null +++ b/pkg/config/reminder/metrics.go @@ -0,0 +1,7 @@ +package reminder + +type MetricsConfig struct { + Enabled bool `mapstructure:"enabled" default:"true"` + Host string `mapstructure:"host" default:"127.0.0.1"` + Port int `mapstructure:"port" default:"8080"` +}