Skip to content

Commit

Permalink
Refactor Stats to include fetchCursor and better metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
billettc committed Aug 15, 2024
1 parent 84fa84b commit 18fc86d
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 23 deletions.
6 changes: 4 additions & 2 deletions cmd/substreams-sink-noop/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ func run(cmd *cobra.Command, args []string) error {
stopBlock = *blockRange.EndBlock()
}

stats := NewStats(stopBlock, headFetcher)
stats := NewStats(stopBlock, headFetcher, func() *sink.Cursor {
return sinker.activeCursor
})
app.OnTerminating(func(_ error) { stats.Close() })
stats.OnTerminated(func(err error) { app.Shutdown(err) })

Expand Down Expand Up @@ -225,7 +227,7 @@ func (s *Sinker) HandleBlockScopedData(ctx context.Context, data *pbsubstreamsrp
}

block := bstream.NewBlockRef(data.Clock.Id, data.Clock.Number)

ProcessedBlockCount.Inc()
s.activeCursor = cursor
s.backprocessingCompleted = true

Expand Down
37 changes: 16 additions & 21 deletions cmd/substreams-sink-noop/stats.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,39 @@
package main

import (
"strconv"
"time"

"github.com/streamingfast/bstream"
sink "github.com/streamingfast/substreams-sink"

"github.com/streamingfast/dmetrics"
"github.com/streamingfast/shutter"
"go.uber.org/zap"
)

var ProcessedBlockCount = metrics.NewGauge("processed_block_count", "The number of processed block")

type Stats struct {
*shutter.Shutter

headFetcher *HeadTracker
lastBlock bstream.BlockRef
stopBlock uint64

processedBlockRate *dmetrics.AvgRatePromGauge

backprocessingCompletion *dmetrics.ValueFromMetric
headBlockReached *dmetrics.ValueFromMetric
fetchCursor func() *sink.Cursor
}

func NewStats(stopBlock uint64, headFetcher *HeadTracker) *Stats {
func NewStats(stopBlock uint64, headFetcher *HeadTracker, fetchCursor func() *sink.Cursor) *Stats {
return &Stats{
Shutter: shutter.New(),
stopBlock: stopBlock,

Shutter: shutter.New(),
stopBlock: stopBlock,
fetchCursor: fetchCursor,
headFetcher: headFetcher,
backprocessingCompletion: dmetrics.NewValueFromMetric(BackprocessingCompletion, "completion"),
headBlockReached: dmetrics.NewValueFromMetric(HeadBlockReached, "reached"),
processedBlockRate: dmetrics.MustNewAvgRateFromPromGauge(ProcessedBlockCount, 1*time.Second, 30*time.Second, "processed_block_sec"),
}
}

Expand Down Expand Up @@ -60,27 +65,17 @@ func (s *Stats) LogNow() {
fields := []zap.Field{}
headBlock, headBlockFound := s.headFetcher.Current()

if s.lastBlock != nil {
toBlockNum := s.stopBlock
if s.stopBlock == 0 && headBlockFound {
toBlockNum = headBlock.Num()
}

var blockDiff = "<N/A>"
if toBlockNum > s.lastBlock.Num() {
blockDiff = strconv.FormatUint(toBlockNum-s.lastBlock.Num(), 10)
}

fields = append(fields, zap.String("missing_block", blockDiff))
}
cursor := s.fetchCursor()

if headBlockFound {
fields = append(fields, zap.Stringer("head_block", headBlock))
}

fields = append(fields,
zap.Bool("backprocessing_completed", s.backprocessingCompletion.ValueUint() > 0),
zap.String("last_cursor", cursor.Block().String()),
zap.Bool("back_processing_completed", s.backprocessingCompletion.ValueUint() > 0),
zap.Bool("head_block_reached", s.headBlockReached.ValueUint() > 0),
zap.Float64("avg_blocks_sec", s.processedBlockRate.Rate()),
)

zlog.Info("substreams sink noop stats", fields...)
Expand Down

0 comments on commit 18fc86d

Please sign in to comment.