Skip to content

Commit

Permalink
log StepNew and StepNewIrreversible stats, fix wasm input size metering
Browse files Browse the repository at this point in the history
  • Loading branch information
colindickson committed Feb 27, 2024
1 parent 96559d8 commit e8479cb
Showing 7 changed files with 31 additions and 11 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -51,7 +51,7 @@ require (
github.com/schollz/closestmatch v2.1.0+incompatible
github.com/shopspring/decimal v1.3.1
github.com/streamingfast/dhttp v0.0.2-0.20220314180036-95936809c4b8
github.com/streamingfast/dmetering v0.0.0-20240226195549-85c4cb6f8a46
github.com/streamingfast/dmetering v0.0.0-20240227170539-29b479694f8f
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545
github.com/streamingfast/eth-go v0.0.0-20230410173454-433bd8803da1
github.com/streamingfast/sf-tracing v0.0.0-20240209202324-9daa52c71a52
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -563,8 +563,8 @@ github.com/streamingfast/dgrpc v0.0.0-20240219152146-57bb131c39ca h1:/k5H6MUo5Vi
github.com/streamingfast/dgrpc v0.0.0-20240219152146-57bb131c39ca/go.mod h1:NuKCwOHjbT0nRji0O+7+c70AiBfLHEKNoovs/gFfMPY=
github.com/streamingfast/dhttp v0.0.2-0.20220314180036-95936809c4b8 h1:NSXRm1UBMQ9LoKMh5gBY410v6Gc1ZCvKtdoarsAIzb4=
github.com/streamingfast/dhttp v0.0.2-0.20220314180036-95936809c4b8/go.mod h1:lBxqLQNmDm+1rQwNP/kAgucXGKDg+e/FkSnQhWyLT8E=
github.com/streamingfast/dmetering v0.0.0-20240226195549-85c4cb6f8a46 h1:YedWshNbK4kcRNdkCN9bTv9gGOgBc8lz5i9HMkSBWew=
github.com/streamingfast/dmetering v0.0.0-20240226195549-85c4cb6f8a46/go.mod h1:UqWuX3REU/IInBUaymFN2eLjuvz+/0SsoUFjeQlLNyI=
github.com/streamingfast/dmetering v0.0.0-20240227170539-29b479694f8f h1:GEi9o8xu++upU2AWAR2WG/0Digk3KlTT/ZKzDko0vpk=
github.com/streamingfast/dmetering v0.0.0-20240227170539-29b479694f8f/go.mod h1:UqWuX3REU/IInBUaymFN2eLjuvz+/0SsoUFjeQlLNyI=
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545 h1:SUl04bZKGAv207lp7/6CHOJIRpjUKunwItrno3K463Y=
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545/go.mod h1:JbxEDbzWRG1dHdNIPrYfuPllEkktZMgm40AwVIBENcw=
github.com/streamingfast/dstore v0.1.1-0.20240215171730-493ad5a0f537 h1:HWqY7nS+kmQXPjcASsMr6n4rGYQvMc1iJEv4/+EL1O0=
4 changes: 3 additions & 1 deletion pipeline/pipeline.go
Original file line number Diff line number Diff line change
@@ -75,7 +75,8 @@ type Pipeline struct {
// (for chains with potential block skips)
lastFinalClock *pbsubstreams.Clock

traceID string
traceID string
blockStepMap map[bstream.StepType]uint64
}

func New(
@@ -102,6 +103,7 @@ func New(
execoutStorage: execoutStorage,
forkHandler: NewForkHandler(),
traceID: traceID,
blockStepMap: make(map[bstream.StepType]uint64),
startTime: time.Now(),
}
for _, opt := range opts {
21 changes: 18 additions & 3 deletions pipeline/process_block.go
Original file line number Diff line number Diff line change
@@ -87,18 +87,20 @@ func (p *Pipeline) processBlock(
reorgJunctionBlock bstream.BlockRef,
) (err error) {
var eof bool

switch step {
case bstream.StepUndo:
p.blockStepMap[bstream.StepUndo]++
if err = p.handleStepUndo(ctx, clock, cursor, reorgJunctionBlock); err != nil {
return fmt.Errorf("step undo: %w", err)
}

case bstream.StepStalled:
p.blockStepMap[bstream.StepStalled]++
if err := p.handleStepStalled(clock); err != nil {
return fmt.Errorf("step stalled: %w", err)
}

case bstream.StepNew:
p.blockStepMap[bstream.StepNew]++
// metering of live blocks
payload := block.Payload.Value
dmetering.GetBytesMeter(ctx).AddBytesRead(len(payload))
@@ -111,6 +113,7 @@ func (p *Pipeline) processBlock(
eof = true
}
case bstream.StepNewIrreversible:
p.blockStepMap[bstream.StepNewIrreversible]++
err := p.handleStepNew(ctx, block, clock, cursor)
if err != nil && err != io.EOF {
return fmt.Errorf("step new irr: handler step new: %w", err)
@@ -122,14 +125,26 @@ func (p *Pipeline) processBlock(
if err != nil {
return fmt.Errorf("handling step irreversible: %w", err)
}

case bstream.StepIrreversible:
p.blockStepMap[bstream.StepIrreversible]++
err = p.handleStepFinal(clock)
if err != nil {
return fmt.Errorf("handling step irreversible: %w", err)
}
}

if block.Number%500 == 0 {
logger := reqctx.Logger(ctx)
// log the total number of StepNew and StepNewIrreversible blocks, and the ratio of the two
logger.Debug("block stats",
zap.String("trace_id", p.traceID),
zap.Uint64("block_num", block.Number),
zap.Uint64("step_new", p.blockStepMap[bstream.StepNew]),
zap.Uint64("step_new_irreversible", p.blockStepMap[bstream.StepNewIrreversible]),
zap.Float64("ratio", float64(p.blockStepMap[bstream.StepNewIrreversible])/float64(p.blockStepMap[bstream.StepNew])),
)
}

if eof {
return io.EOF
}
7 changes: 5 additions & 2 deletions service/metering.go
Original file line number Diff line number Diff line change
@@ -4,13 +4,16 @@ import (
"context"
"time"

"go.uber.org/zap"

"github.com/streamingfast/dmetering"
"google.golang.org/protobuf/proto"
)

func sendMetering(meter dmetering.Meter, userID, apiKeyID, ip, userMeta, endpoint string, resp proto.Message) {
func sendMetering(meter dmetering.Meter, userID, apiKeyID, ip, userMeta, endpoint string, resp proto.Message, logger *zap.Logger) {
bytesRead := meter.BytesReadDelta()
bytesWritten := meter.BytesWrittenDelta()
egressBytes := proto.Size(resp)

inputBytes := meter.GetCount("wasm_input_bytes")
meter.ResetCount("wasm_input_bytes")
@@ -23,7 +26,7 @@ func sendMetering(meter dmetering.Meter, userID, apiKeyID, ip, userMeta, endpoin

Endpoint: endpoint,
Metrics: map[string]float64{
"egress_bytes": float64(proto.Size(resp)),
"egress_bytes": float64(egressBytes),
"written_bytes": float64(bytesWritten),
"read_bytes": float64(bytesRead),
"wasm_input_bytes": float64(inputBytes),
2 changes: 1 addition & 1 deletion service/tier1.go
Original file line number Diff line number Diff line change
@@ -538,7 +538,7 @@ func tier1ResponseHandler(ctx context.Context, mut *sync.Mutex, logger *zap.Logg
return connect.NewError(connect.CodeUnavailable, err)
}

sendMetering(meter, userID, apiKeyID, ip, userMeta, "sf.substreams.rpc.v2/Blocks", resp)
sendMetering(meter, userID, apiKeyID, ip, userMeta, "sf.substreams.rpc.v2/Blocks", resp, logger)
return nil
}
}
2 changes: 1 addition & 1 deletion service/tier2.go
Original file line number Diff line number Diff line change
@@ -334,7 +334,7 @@ func tier2ResponseHandler(ctx context.Context, logger *zap.Logger, streamSrv pbs
return connect.NewError(connect.CodeUnavailable, err)
}

sendMetering(meter, userID, apiKeyID, ip, userMeta, "sf.substreams.internal.v2/ProcessRange", resp)
sendMetering(meter, userID, apiKeyID, ip, userMeta, "sf.substreams.internal.v2/ProcessRange", resp, logger)
return nil
}
}

0 comments on commit e8479cb

Please sign in to comment.