From 5d498c622ac99d5f18015ff250bb8e46024a7253 Mon Sep 17 00:00:00 2001 From: Matthieu Vachon Date: Thu, 14 Mar 2024 10:44:27 -0400 Subject: [PATCH] Renamed + added tier2 readiness metrics Small caveats, the pending shutdown signal does not trigger a change in the app readiness metrics for now. The health check will change but the metrics not yet. This is an improvement that will be needed in the future. --- app/tier1.go | 2 ++ app/tier2.go | 3 +++ metrics/metrics.go | 5 +++-- orchestrator/scheduler/scheduler.go | 8 ++++---- 4 files changed, 12 insertions(+), 6 deletions(-) diff --git a/app/tier1.go b/app/tier1.go index ca0038d68..3776d9ec1 100644 --- a/app/tier1.go +++ b/app/tier1.go @@ -177,6 +177,8 @@ func (a *Tier1App) Run() error { } a.OnTerminating(func(err error) { + metrics.AppReadinessTier1.SetNotReady() + svc.Shutdown(err) time.Sleep(2 * time.Second) // enough time to send termination grpc responses }) diff --git a/app/tier2.go b/app/tier2.go index 77cbdcdd8..1d8e2f50b 100644 --- a/app/tier2.go +++ b/app/tier2.go @@ -111,9 +111,12 @@ func (a *Tier2App) Run() error { return fmt.Errorf("failed to setup trust authenticator: %w", err) } + a.OnTerminating(func(_ error) { metrics.AppReadinessTier2.SetNotReady() }) + go func() { a.logger.Info("launching gRPC server") a.isReady.CompareAndSwap(false, true) + metrics.AppReadinessTier2.SetReady() err := service.ListenTier2(a.config.GRPCListenAddr, a.config.ServiceDiscoveryURL, svc, trustAuth, a.logger, a.HealthCheck) a.Shutdown(err) diff --git a/metrics/metrics.go b/metrics/metrics.go index 8025e9a79..374a5a77a 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -19,10 +19,11 @@ var SquashesLaunched = MetricSet.NewCounter("substreams_total_squashes_launched" var SquashersStarted = MetricSet.NewCounter("substreams_total_squash_processes_launched", "Counter for Total squash processes launched, used for rate") var SquashersEnded = MetricSet.NewCounter("substreams_total_squash_processes_closed", "Counter for Total squash processes closed, used for active processes") -var ActiveWorkerRequest = MetricSet.NewGauge("substreams_active_worker_request", "Number of active Substreams worker requests a tier1 app is currently doing against tier2 nodes") -var WorkerRequestCounter = MetricSet.NewCounter("substreams_worker_request_counter", "Counter for total Substreams worker requests a tier1 app made against tier2 nodes") +var Tier1ActiveWorkerRequest = MetricSet.NewGauge("substreams_tier1_active_worker_request", "Number of active Substreams worker requests a tier1 app is currently doing against tier2 nodes") +var Tier1WorkerRequestCounter = MetricSet.NewCounter("substreams_tier1_request_counter", "Counter for total Substreams worker requests a tier1 app made against tier2 nodes") var AppReadinessTier1 = MetricSet.NewAppReadiness("substreams_tier1") +var AppReadinessTier2 = MetricSet.NewAppReadiness("substreams_tier2") var registerOnce sync.Once diff --git a/orchestrator/scheduler/scheduler.go b/orchestrator/scheduler/scheduler.go index 621add053..a87418395 100644 --- a/orchestrator/scheduler/scheduler.go +++ b/orchestrator/scheduler/scheduler.go @@ -81,7 +81,7 @@ func (s *Scheduler) Update(msg loop.Msg) loop.Cmd { switch msg := msg.(type) { case work.MsgJobSucceeded: - metrics.ActiveWorkerRequest.Dec() + metrics.Tier1ActiveWorkerRequest.Dec() s.Stages.MarkSegmentPartialPresent(msg.Unit) s.WorkerPool.Return(msg.Worker) @@ -113,8 +113,8 @@ func (s *Scheduler) Update(msg loop.Msg) loop.Cmd { s.logger.Info("scheduling work", zap.Object("unit", workUnit)) modules := s.Stages.StageModules(workUnit.Stage) - metrics.ActiveWorkerRequest.Inc() - metrics.WorkerRequestCounter.Inc() + metrics.Tier1ActiveWorkerRequest.Inc() + metrics.Tier1WorkerRequestCounter.Inc() return loop.Batch( worker.Work(s.ctx, workUnit, workRange, modules, s.stream), @@ -122,7 +122,7 @@ func (s *Scheduler) Update(msg loop.Msg) loop.Cmd { ) case work.MsgJobFailed: - metrics.ActiveWorkerRequest.Dec() + metrics.Tier1ActiveWorkerRequest.Dec() cmds = append(cmds, loop.Quit(msg.Error))