diff --git a/app/tier1.go b/app/tier1.go index 38be7596e..ca0038d68 100644 --- a/app/tier1.go +++ b/app/tier1.go @@ -186,7 +186,7 @@ func (a *Tier1App) Run() error { a.logger.Info("waiting until hub is real-time synced") select { case <-forkableHub.Ready: - metrics.AppReadiness.SetReady() + metrics.AppReadinessTier1.SetReady() case <-a.Terminating(): return } diff --git a/docs/release-notes/change-log.md b/docs/release-notes/change-log.md index 7dc055239..beaf96e20 100644 --- a/docs/release-notes/change-log.md +++ b/docs/release-notes/change-log.md @@ -11,11 +11,23 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## Unreleased -* Pick up docs from the README.md or README in the same directory as the manifest, when top-level package.doc is empty -* Tier2 service now supports a maximum concurrent requests limit. Default set to 0 (unlimited). -* Improved file listing performance for Google Storage backends by 25% +* Pick up docs from the README.md or README in the same directory as the manifest, when top-level `package.doc` field is empty. +* +Tier2 service now supports a maximum concurrent requests limit. Default set to 0 (unlimited). + +* Improved file listing performance for Google Storage backends by 25%. + * Tier2 will now read back mapper outputs (if they exist) to prevent running them again. Additionally, it will not read back the full blocks if its inputs can be satisfied from existing cached mapper outputs. -* Tier2 will skip processing completely if the output_module is a mapper that has already been processed (ex: when multiple requests are indexing the same data at the same time) + +* Tier2 will skip processing completely if the `output_module` is a mapper that has already been processed (ex: when multiple requests are indexing the same data at the same time) + +* [Operator] Readiness metric for Substreams tier1 app is now named `substreams_tier1` (was mistakenly called `firehose` before). + +* [Operator] Added back deadiness metric for Substreams tiere app (named `substreams_tier2`). + +* [Operator] Added metric `substreams_tier1_active_worker_request` which gives the number of active Substreams worker requests a tier1 app is currently doing against tier2 nodes. + +* [Operator] Added metric `substreams_tier_1worker_request_counter` which gives the total Substreams worker requests a tier1 app made against tier2 nodes. ## v1.3.7 diff --git a/metrics/metrics.go b/metrics/metrics.go index d94a18ce5..8025e9a79 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -9,7 +9,7 @@ import ( var MetricSet = dmetrics.NewSet() -var ActiveSubstreams = MetricSet.NewGauge("substreams_active_requests", "Number of active substreams requests") +var ActiveSubstreams = MetricSet.NewGauge("substreams_active_requests", "Number of active Substreams requests") var SubstreamsCounter = MetricSet.NewCounter("substreams_counter", "Substreams requests count") var BlockBeginProcess = MetricSet.NewCounter("substreams_block_process_start_counter", "Counter for total block processes started, used for rate") @@ -19,7 +19,10 @@ var SquashesLaunched = MetricSet.NewCounter("substreams_total_squashes_launched" var SquashersStarted = MetricSet.NewCounter("substreams_total_squash_processes_launched", "Counter for Total squash processes launched, used for rate") var SquashersEnded = MetricSet.NewCounter("substreams_total_squash_processes_closed", "Counter for Total squash processes closed, used for active processes") -var AppReadiness = MetricSet.NewAppReadiness("firehose") +var ActiveWorkerRequest = MetricSet.NewGauge("substreams_active_worker_request", "Number of active Substreams worker requests a tier1 app is currently doing against tier2 nodes") +var WorkerRequestCounter = MetricSet.NewCounter("substreams_worker_request_counter", "Counter for total Substreams worker requests a tier1 app made against tier2 nodes") + +var AppReadinessTier1 = MetricSet.NewAppReadiness("substreams_tier1") var registerOnce sync.Once diff --git a/orchestrator/scheduler/scheduler.go b/orchestrator/scheduler/scheduler.go index fdcc55b3d..621add053 100644 --- a/orchestrator/scheduler/scheduler.go +++ b/orchestrator/scheduler/scheduler.go @@ -8,6 +8,7 @@ import ( "go.uber.org/zap" + "github.com/streamingfast/substreams/metrics" "github.com/streamingfast/substreams/orchestrator/execout" "github.com/streamingfast/substreams/orchestrator/loop" "github.com/streamingfast/substreams/orchestrator/response" @@ -80,6 +81,8 @@ func (s *Scheduler) Update(msg loop.Msg) loop.Cmd { switch msg := msg.(type) { case work.MsgJobSucceeded: + metrics.ActiveWorkerRequest.Dec() + s.Stages.MarkSegmentPartialPresent(msg.Unit) s.WorkerPool.Return(msg.Worker) @@ -109,12 +112,18 @@ func (s *Scheduler) Update(msg loop.Msg) loop.Cmd { s.logger.Info("scheduling work", zap.Object("unit", workUnit)) modules := s.Stages.StageModules(workUnit.Stage) + + metrics.ActiveWorkerRequest.Inc() + metrics.WorkerRequestCounter.Inc() + return loop.Batch( worker.Work(s.ctx, workUnit, workRange, modules, s.stream), work.CmdScheduleNextJob(), ) case work.MsgJobFailed: + metrics.ActiveWorkerRequest.Dec() + cmds = append(cmds, loop.Quit(msg.Error)) case stage.MsgMergeFinished: