Skip to content

Commit

Permalink
Renamed + added tier2 readiness metrics
Browse files Browse the repository at this point in the history
Small caveats, the pending shutdown signal does not trigger a change in the app readiness metrics for now. The health check will change but the metrics not yet. This is an improvement that will be needed in the future.
  • Loading branch information
maoueh committed Mar 14, 2024
1 parent 6f223f3 commit 5d498c6
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 6 deletions.
2 changes: 2 additions & 0 deletions app/tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ func (a *Tier1App) Run() error {
}

a.OnTerminating(func(err error) {
metrics.AppReadinessTier1.SetNotReady()

svc.Shutdown(err)
time.Sleep(2 * time.Second) // enough time to send termination grpc responses
})
Expand Down
3 changes: 3 additions & 0 deletions app/tier2.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,12 @@ func (a *Tier2App) Run() error {
return fmt.Errorf("failed to setup trust authenticator: %w", err)
}

a.OnTerminating(func(_ error) { metrics.AppReadinessTier2.SetNotReady() })

go func() {
a.logger.Info("launching gRPC server")
a.isReady.CompareAndSwap(false, true)
metrics.AppReadinessTier2.SetReady()

err := service.ListenTier2(a.config.GRPCListenAddr, a.config.ServiceDiscoveryURL, svc, trustAuth, a.logger, a.HealthCheck)
a.Shutdown(err)
Expand Down
5 changes: 3 additions & 2 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ var SquashesLaunched = MetricSet.NewCounter("substreams_total_squashes_launched"
var SquashersStarted = MetricSet.NewCounter("substreams_total_squash_processes_launched", "Counter for Total squash processes launched, used for rate")
var SquashersEnded = MetricSet.NewCounter("substreams_total_squash_processes_closed", "Counter for Total squash processes closed, used for active processes")

var ActiveWorkerRequest = MetricSet.NewGauge("substreams_active_worker_request", "Number of active Substreams worker requests a tier1 app is currently doing against tier2 nodes")
var WorkerRequestCounter = MetricSet.NewCounter("substreams_worker_request_counter", "Counter for total Substreams worker requests a tier1 app made against tier2 nodes")
var Tier1ActiveWorkerRequest = MetricSet.NewGauge("substreams_tier1_active_worker_request", "Number of active Substreams worker requests a tier1 app is currently doing against tier2 nodes")
var Tier1WorkerRequestCounter = MetricSet.NewCounter("substreams_tier1_request_counter", "Counter for total Substreams worker requests a tier1 app made against tier2 nodes")

var AppReadinessTier1 = MetricSet.NewAppReadiness("substreams_tier1")
var AppReadinessTier2 = MetricSet.NewAppReadiness("substreams_tier2")

var registerOnce sync.Once

Expand Down
8 changes: 4 additions & 4 deletions orchestrator/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (s *Scheduler) Update(msg loop.Msg) loop.Cmd {

switch msg := msg.(type) {
case work.MsgJobSucceeded:
metrics.ActiveWorkerRequest.Dec()
metrics.Tier1ActiveWorkerRequest.Dec()

s.Stages.MarkSegmentPartialPresent(msg.Unit)
s.WorkerPool.Return(msg.Worker)
Expand Down Expand Up @@ -113,16 +113,16 @@ func (s *Scheduler) Update(msg loop.Msg) loop.Cmd {
s.logger.Info("scheduling work", zap.Object("unit", workUnit))
modules := s.Stages.StageModules(workUnit.Stage)

metrics.ActiveWorkerRequest.Inc()
metrics.WorkerRequestCounter.Inc()
metrics.Tier1ActiveWorkerRequest.Inc()
metrics.Tier1WorkerRequestCounter.Inc()

return loop.Batch(
worker.Work(s.ctx, workUnit, workRange, modules, s.stream),
work.CmdScheduleNextJob(),
)

case work.MsgJobFailed:
metrics.ActiveWorkerRequest.Dec()
metrics.Tier1ActiveWorkerRequest.Dec()

cmds = append(cmds, loop.Quit(msg.Error))

Expand Down

0 comments on commit 5d498c6

Please sign in to comment.