Skip to content

Commit

Permalink
Added metrics to see the number of active and total counter of tier2 …
Browse files Browse the repository at this point in the history
…worker request for graphing purposes (#430)

* Added metrics to see the number of active and total counter of tier2 worker request for graphing purposes

Also fixed readiness label which was labelled as `firehose`.

* Renamed + added tier2 readiness metrics

Small caveats, the pending shutdown signal does not trigger a change in the app readiness metrics for now. The health check will change but the metrics not yet. This is an improvement that will be needed in the future.

* Fixed some typos and nitpicks
  • Loading branch information
maoueh authored Mar 14, 2024
1 parent 834b2e2 commit db4e901
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 4 deletions.
4 changes: 3 additions & 1 deletion app/tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ func (a *Tier1App) Run() error {
}

a.OnTerminating(func(err error) {
metrics.AppReadinessTier1.SetNotReady()

svc.Shutdown(err)
time.Sleep(2 * time.Second) // enough time to send termination grpc responses
})
Expand All @@ -186,7 +188,7 @@ func (a *Tier1App) Run() error {
a.logger.Info("waiting until hub is real-time synced")
select {
case <-forkableHub.Ready:
metrics.AppReadiness.SetReady()
metrics.AppReadinessTier1.SetReady()
case <-a.Terminating():
return
}
Expand Down
3 changes: 3 additions & 0 deletions app/tier2.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,12 @@ func (a *Tier2App) Run() error {
return fmt.Errorf("failed to setup trust authenticator: %w", err)
}

a.OnTerminating(func(_ error) { metrics.AppReadinessTier2.SetNotReady() })

go func() {
a.logger.Info("launching gRPC server")
a.isReady.CompareAndSwap(false, true)
metrics.AppReadinessTier2.SetReady()

err := service.ListenTier2(a.config.GRPCListenAddr, a.config.ServiceDiscoveryURL, svc, trustAuth, a.logger, a.HealthCheck)
a.Shutdown(err)
Expand Down
14 changes: 13 additions & 1 deletion docs/release-notes/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,22 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
Check a usage of this new feature on the [substreams-db-graph-converter](https://github.com/streamingfast/substreams-db-graph-converter/) repository.

* Pick up docs from the README.md or README in the same directory as the manifest, when top-level package.doc is empty

* Tier2 service now supports a maximum concurrent requests limit. Default set to 0 (unlimited).

* Improved file listing performance for Google Storage backends by 25%

* Tier2 will now read back mapper outputs (if they exist) to prevent running them again. Additionally, it will not read back the full blocks if its inputs can be satisfied from existing cached mapper outputs.
* Tier2 will skip processing completely if the output_module is a mapper that has already been processed (ex: when multiple requests are indexing the same data at the same time)

* Tier2 will skip processing completely if the `output_module` is a mapper that has already been processed (ex: when multiple requests are indexing the same data at the same time)

* [Operator] Readiness metric for Substreams tier1 app is now named `substreams_tier1` (was mistakenly called `firehose` before).

* [Operator] Added back deadiness metric for Substreams tiere app (named `substreams_tier2`).

* [Operator] Added metric `substreams_tier1_active_worker_requests` which gives the number of active Substreams worker requests a tier1 app is currently doing against tier2 nodes.

* [Operator] Added metric `substreams_tier1_worker_request_counter` which gives the total Substreams worker requests a tier1 app made against tier2 nodes.

## v1.3.7

Expand Down
8 changes: 6 additions & 2 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

var MetricSet = dmetrics.NewSet()

var ActiveSubstreams = MetricSet.NewGauge("substreams_active_requests", "Number of active substreams requests")
var ActiveSubstreams = MetricSet.NewGauge("substreams_active_requests", "Number of active Substreams requests")
var SubstreamsCounter = MetricSet.NewCounter("substreams_counter", "Substreams requests count")

var BlockBeginProcess = MetricSet.NewCounter("substreams_block_process_start_counter", "Counter for total block processes started, used for rate")
Expand All @@ -19,7 +19,11 @@ var SquashesLaunched = MetricSet.NewCounter("substreams_total_squashes_launched"
var SquashersStarted = MetricSet.NewCounter("substreams_total_squash_processes_launched", "Counter for Total squash processes launched, used for rate")
var SquashersEnded = MetricSet.NewCounter("substreams_total_squash_processes_closed", "Counter for Total squash processes closed, used for active processes")

var AppReadiness = MetricSet.NewAppReadiness("firehose")
var Tier1ActiveWorkerRequest = MetricSet.NewGauge("substreams_tier1_active_worker_requests", "Number of active Substreams worker requests a tier1 app is currently doing against tier2 nodes")
var Tier1WorkerRequestCounter = MetricSet.NewCounter("substreams_tier1_worker_request_counter", "Counter for total Substreams worker requests a tier1 app made against tier2 nodes")

var AppReadinessTier1 = MetricSet.NewAppReadiness("substreams_tier1")
var AppReadinessTier2 = MetricSet.NewAppReadiness("substreams_tier2")

var registerOnce sync.Once

Expand Down
9 changes: 9 additions & 0 deletions orchestrator/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"go.uber.org/zap"

"github.com/streamingfast/substreams/metrics"
"github.com/streamingfast/substreams/orchestrator/execout"
"github.com/streamingfast/substreams/orchestrator/loop"
"github.com/streamingfast/substreams/orchestrator/response"
Expand Down Expand Up @@ -80,6 +81,8 @@ func (s *Scheduler) Update(msg loop.Msg) loop.Cmd {

switch msg := msg.(type) {
case work.MsgJobSucceeded:
metrics.Tier1ActiveWorkerRequest.Dec()

s.Stages.MarkSegmentPartialPresent(msg.Unit)
s.WorkerPool.Return(msg.Worker)

Expand Down Expand Up @@ -109,12 +112,18 @@ func (s *Scheduler) Update(msg loop.Msg) loop.Cmd {

s.logger.Info("scheduling work", zap.Object("unit", workUnit))
modules := s.Stages.StageModules(workUnit.Stage)

metrics.Tier1ActiveWorkerRequest.Inc()
metrics.Tier1WorkerRequestCounter.Inc()

return loop.Batch(
worker.Work(s.ctx, workUnit, workRange, modules, s.stream),
work.CmdScheduleNextJob(),
)

case work.MsgJobFailed:
metrics.Tier1ActiveWorkerRequest.Dec()

cmds = append(cmds, loop.Quit(msg.Error))

case stage.MsgMergeFinished:
Expand Down

0 comments on commit db4e901

Please sign in to comment.