From 18fc86d31d48c76a54275b30c1e6e57c33b2ec8f Mon Sep 17 00:00:00 2001 From: Charles Billette Date: Thu, 15 Aug 2024 10:44:24 -0400 Subject: [PATCH] Refactor `Stats` to include `fetchCursor` and better metrics --- cmd/substreams-sink-noop/main.go | 6 +++-- cmd/substreams-sink-noop/stats.go | 37 +++++++++++++------------------ 2 files changed, 20 insertions(+), 23 deletions(-) diff --git a/cmd/substreams-sink-noop/main.go b/cmd/substreams-sink-noop/main.go index 9d67516..be954b1 100644 --- a/cmd/substreams-sink-noop/main.go +++ b/cmd/substreams-sink-noop/main.go @@ -132,7 +132,9 @@ func run(cmd *cobra.Command, args []string) error { stopBlock = *blockRange.EndBlock() } - stats := NewStats(stopBlock, headFetcher) + stats := NewStats(stopBlock, headFetcher, func() *sink.Cursor { + return sinker.activeCursor + }) app.OnTerminating(func(_ error) { stats.Close() }) stats.OnTerminated(func(err error) { app.Shutdown(err) }) @@ -225,7 +227,7 @@ func (s *Sinker) HandleBlockScopedData(ctx context.Context, data *pbsubstreamsrp } block := bstream.NewBlockRef(data.Clock.Id, data.Clock.Number) - + ProcessedBlockCount.Inc() s.activeCursor = cursor s.backprocessingCompleted = true diff --git a/cmd/substreams-sink-noop/stats.go b/cmd/substreams-sink-noop/stats.go index 367730d..a6d3a90 100644 --- a/cmd/substreams-sink-noop/stats.go +++ b/cmd/substreams-sink-noop/stats.go @@ -1,34 +1,39 @@ package main import ( - "strconv" "time" - "github.com/streamingfast/bstream" + sink "github.com/streamingfast/substreams-sink" + "github.com/streamingfast/dmetrics" "github.com/streamingfast/shutter" "go.uber.org/zap" ) +var ProcessedBlockCount = metrics.NewGauge("processed_block_count", "The number of processed block") + type Stats struct { *shutter.Shutter headFetcher *HeadTracker - lastBlock bstream.BlockRef stopBlock uint64 + processedBlockRate *dmetrics.AvgRatePromGauge + backprocessingCompletion *dmetrics.ValueFromMetric headBlockReached *dmetrics.ValueFromMetric + fetchCursor func() *sink.Cursor } -func NewStats(stopBlock uint64, headFetcher *HeadTracker) *Stats { +func NewStats(stopBlock uint64, headFetcher *HeadTracker, fetchCursor func() *sink.Cursor) *Stats { return &Stats{ - Shutter: shutter.New(), - stopBlock: stopBlock, - + Shutter: shutter.New(), + stopBlock: stopBlock, + fetchCursor: fetchCursor, headFetcher: headFetcher, backprocessingCompletion: dmetrics.NewValueFromMetric(BackprocessingCompletion, "completion"), headBlockReached: dmetrics.NewValueFromMetric(HeadBlockReached, "reached"), + processedBlockRate: dmetrics.MustNewAvgRateFromPromGauge(ProcessedBlockCount, 1*time.Second, 30*time.Second, "processed_block_sec"), } } @@ -60,27 +65,17 @@ func (s *Stats) LogNow() { fields := []zap.Field{} headBlock, headBlockFound := s.headFetcher.Current() - if s.lastBlock != nil { - toBlockNum := s.stopBlock - if s.stopBlock == 0 && headBlockFound { - toBlockNum = headBlock.Num() - } - - var blockDiff = "" - if toBlockNum > s.lastBlock.Num() { - blockDiff = strconv.FormatUint(toBlockNum-s.lastBlock.Num(), 10) - } - - fields = append(fields, zap.String("missing_block", blockDiff)) - } + cursor := s.fetchCursor() if headBlockFound { fields = append(fields, zap.Stringer("head_block", headBlock)) } fields = append(fields, - zap.Bool("backprocessing_completed", s.backprocessingCompletion.ValueUint() > 0), + zap.String("last_cursor", cursor.Block().String()), + zap.Bool("back_processing_completed", s.backprocessingCompletion.ValueUint() > 0), zap.Bool("head_block_reached", s.headBlockReached.ValueUint() > 0), + zap.Float64("avg_blocks_sec", s.processedBlockRate.Rate()), ) zlog.Info("substreams sink noop stats", fields...)