Skip to content

Commit

Permalink
bump dmetering, add measurement of wasm input bytes to metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
colindickson committed Feb 26, 2024
1 parent e938947 commit b978e3d
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 7 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ require (
github.com/schollz/closestmatch v2.1.0+incompatible
github.com/shopspring/decimal v1.3.1
github.com/streamingfast/dhttp v0.0.2-0.20220314180036-95936809c4b8
github.com/streamingfast/dmetering v0.0.0-20240215171500-4f0413a948bb
github.com/streamingfast/dmetering v0.0.0-20240226195549-85c4cb6f8a46
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545
github.com/streamingfast/eth-go v0.0.0-20230410173454-433bd8803da1
github.com/streamingfast/sf-tracing v0.0.0-20240209202324-9daa52c71a52
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -563,8 +563,8 @@ github.com/streamingfast/dgrpc v0.0.0-20240219152146-57bb131c39ca h1:/k5H6MUo5Vi
github.com/streamingfast/dgrpc v0.0.0-20240219152146-57bb131c39ca/go.mod h1:NuKCwOHjbT0nRji0O+7+c70AiBfLHEKNoovs/gFfMPY=
github.com/streamingfast/dhttp v0.0.2-0.20220314180036-95936809c4b8 h1:NSXRm1UBMQ9LoKMh5gBY410v6Gc1ZCvKtdoarsAIzb4=
github.com/streamingfast/dhttp v0.0.2-0.20220314180036-95936809c4b8/go.mod h1:lBxqLQNmDm+1rQwNP/kAgucXGKDg+e/FkSnQhWyLT8E=
github.com/streamingfast/dmetering v0.0.0-20240215171500-4f0413a948bb h1:3zVpm+OkXNyremxpRpoW+HO2OpvxTTLsYMDNUg1zsFQ=
github.com/streamingfast/dmetering v0.0.0-20240215171500-4f0413a948bb/go.mod h1:UqWuX3REU/IInBUaymFN2eLjuvz+/0SsoUFjeQlLNyI=
github.com/streamingfast/dmetering v0.0.0-20240226195549-85c4cb6f8a46 h1:YedWshNbK4kcRNdkCN9bTv9gGOgBc8lz5i9HMkSBWew=
github.com/streamingfast/dmetering v0.0.0-20240226195549-85c4cb6f8a46/go.mod h1:UqWuX3REU/IInBUaymFN2eLjuvz+/0SsoUFjeQlLNyI=
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545 h1:SUl04bZKGAv207lp7/6CHOJIRpjUKunwItrno3K463Y=
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545/go.mod h1:JbxEDbzWRG1dHdNIPrYfuPllEkktZMgm40AwVIBENcw=
github.com/streamingfast/dstore v0.1.1-0.20240215171730-493ad5a0f537 h1:HWqY7nS+kmQXPjcASsMr6n4rGYQvMc1iJEv4/+EL1O0=
Expand Down
2 changes: 2 additions & 0 deletions pipeline/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ func (p *Pipeline) handleStepNew(ctx context.Context, block *pbbstream.Block, cl
return fmt.Errorf("pre block hook: %w", err)
}

dmetering.GetBytesMeter(ctx).CountInc("wasm_input_bytes", len(block.Payload.Value))

if err := p.executeModules(ctx, execOutput); err != nil {
return fmt.Errorf("execute modules: %w", err)
}
Expand Down
12 changes: 8 additions & 4 deletions service/metering.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ func sendMetering(meter dmetering.Meter, userID, apiKeyID, ip, userMeta, endpoin
bytesRead := meter.BytesReadDelta()
bytesWritten := meter.BytesWrittenDelta()

inputBytes := meter.GetCount("wasm_input_bytes")
meter.ResetCount("wasm_input_bytes")

event := dmetering.Event{
UserID: userID,
ApiKeyID: apiKeyID,
Expand All @@ -20,10 +23,11 @@ func sendMetering(meter dmetering.Meter, userID, apiKeyID, ip, userMeta, endpoin

Endpoint: endpoint,
Metrics: map[string]float64{
"egress_bytes": float64(proto.Size(resp)),
"written_bytes": float64(bytesWritten),
"read_bytes": float64(bytesRead),
"message_count": 1,
"egress_bytes": float64(proto.Size(resp)),
"written_bytes": float64(bytesWritten),
"read_bytes": float64(bytesRead),
"wasm_input_bytes": float64(inputBytes),
"message_count": 1,
},
Timestamp: time.Now(),
}
Expand Down
1 change: 1 addition & 0 deletions service/tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func (s *Tier1Service) Blocks(
ctx = logging.WithLogger(ctx, logger)
ctx = reqctx.WithTracer(ctx, s.tracer)
ctx = dmetering.WithBytesMeter(ctx)
ctx = dmetering.WithCounter(ctx, "wasm_input_bytes")

ctx, span := reqctx.WithSpan(ctx, "substreams/tier1/request")
defer span.EndWithErr(&err)
Expand Down
1 change: 1 addition & 0 deletions service/tier2.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (s *Tier2Service) ProcessRange(request *pbssinternal.ProcessRangeRequest, s

ctx = logging.WithLogger(ctx, logger)
ctx = dmetering.WithBytesMeter(ctx)
ctx = dmetering.WithCounter(ctx, "wasm_input_bytes")
ctx = reqctx.WithTracer(ctx, s.tracer)

ctx, span := reqctx.WithSpan(ctx, "substreams/tier2/request")
Expand Down

0 comments on commit b978e3d

Please sign in to comment.