Skip to content

Commit

Permalink
Added metrics to see the number of active and total counter of tier2 …
Browse files Browse the repository at this point in the history
…worker request for graphing purposes

Also fixed readiness label which was labelled as `firehose`.
  • Loading branch information
maoueh committed Mar 14, 2024
1 parent cb5094e commit 6f223f3
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 7 deletions.
2 changes: 1 addition & 1 deletion app/tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (a *Tier1App) Run() error {
a.logger.Info("waiting until hub is real-time synced")
select {
case <-forkableHub.Ready:
metrics.AppReadiness.SetReady()
metrics.AppReadinessTier1.SetReady()
case <-a.Terminating():
return
}
Expand Down
20 changes: 16 additions & 4 deletions docs/release-notes/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,23 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## Unreleased

* Pick up docs from the README.md or README in the same directory as the manifest, when top-level package.doc is empty
* Tier2 service now supports a maximum concurrent requests limit. Default set to 0 (unlimited).
* Improved file listing performance for Google Storage backends by 25%
* Pick up docs from the README.md or README in the same directory as the manifest, when top-level `package.doc` field is empty.
*
Tier2 service now supports a maximum concurrent requests limit. Default set to 0 (unlimited).

* Improved file listing performance for Google Storage backends by 25%.

* Tier2 will now read back mapper outputs (if they exist) to prevent running them again. Additionally, it will not read back the full blocks if its inputs can be satisfied from existing cached mapper outputs.
* Tier2 will skip processing completely if the output_module is a mapper that has already been processed (ex: when multiple requests are indexing the same data at the same time)

* Tier2 will skip processing completely if the `output_module` is a mapper that has already been processed (ex: when multiple requests are indexing the same data at the same time)

* [Operator] Readiness metric for Substreams tier1 app is now named `substreams_tier1` (was mistakenly called `firehose` before).

* [Operator] Added back deadiness metric for Substreams tiere app (named `substreams_tier2`).

* [Operator] Added metric `substreams_tier1_active_worker_request` which gives the number of active Substreams worker requests a tier1 app is currently doing against tier2 nodes.

* [Operator] Added metric `substreams_tier_1worker_request_counter` which gives the total Substreams worker requests a tier1 app made against tier2 nodes.

## v1.3.7

Expand Down
7 changes: 5 additions & 2 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

var MetricSet = dmetrics.NewSet()

var ActiveSubstreams = MetricSet.NewGauge("substreams_active_requests", "Number of active substreams requests")
var ActiveSubstreams = MetricSet.NewGauge("substreams_active_requests", "Number of active Substreams requests")
var SubstreamsCounter = MetricSet.NewCounter("substreams_counter", "Substreams requests count")

var BlockBeginProcess = MetricSet.NewCounter("substreams_block_process_start_counter", "Counter for total block processes started, used for rate")
Expand All @@ -19,7 +19,10 @@ var SquashesLaunched = MetricSet.NewCounter("substreams_total_squashes_launched"
var SquashersStarted = MetricSet.NewCounter("substreams_total_squash_processes_launched", "Counter for Total squash processes launched, used for rate")
var SquashersEnded = MetricSet.NewCounter("substreams_total_squash_processes_closed", "Counter for Total squash processes closed, used for active processes")

var AppReadiness = MetricSet.NewAppReadiness("firehose")
var ActiveWorkerRequest = MetricSet.NewGauge("substreams_active_worker_request", "Number of active Substreams worker requests a tier1 app is currently doing against tier2 nodes")
var WorkerRequestCounter = MetricSet.NewCounter("substreams_worker_request_counter", "Counter for total Substreams worker requests a tier1 app made against tier2 nodes")

var AppReadinessTier1 = MetricSet.NewAppReadiness("substreams_tier1")

var registerOnce sync.Once

Expand Down
9 changes: 9 additions & 0 deletions orchestrator/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"go.uber.org/zap"

"github.com/streamingfast/substreams/metrics"
"github.com/streamingfast/substreams/orchestrator/execout"
"github.com/streamingfast/substreams/orchestrator/loop"
"github.com/streamingfast/substreams/orchestrator/response"
Expand Down Expand Up @@ -80,6 +81,8 @@ func (s *Scheduler) Update(msg loop.Msg) loop.Cmd {

switch msg := msg.(type) {
case work.MsgJobSucceeded:
metrics.ActiveWorkerRequest.Dec()

s.Stages.MarkSegmentPartialPresent(msg.Unit)
s.WorkerPool.Return(msg.Worker)

Expand Down Expand Up @@ -109,12 +112,18 @@ func (s *Scheduler) Update(msg loop.Msg) loop.Cmd {

s.logger.Info("scheduling work", zap.Object("unit", workUnit))
modules := s.Stages.StageModules(workUnit.Stage)

metrics.ActiveWorkerRequest.Inc()
metrics.WorkerRequestCounter.Inc()

return loop.Batch(
worker.Work(s.ctx, workUnit, workRange, modules, s.stream),
work.CmdScheduleNextJob(),
)

case work.MsgJobFailed:
metrics.ActiveWorkerRequest.Dec()

cmds = append(cmds, loop.Quit(msg.Error))

case stage.MsgMergeFinished:
Expand Down

0 comments on commit 6f223f3

Please sign in to comment.