From b978e3d2fcd1272bd6704da00629646e510ebbb1 Mon Sep 17 00:00:00 2001 From: colindickson Date: Mon, 26 Feb 2024 15:18:07 -0500 Subject: [PATCH] bump dmetering, add measurement of wasm input bytes to metrics --- go.mod | 2 +- go.sum | 4 ++-- pipeline/process_block.go | 2 ++ service/metering.go | 12 ++++++++---- service/tier1.go | 1 + service/tier2.go | 1 + 6 files changed, 15 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index 9b79b6f37..08e02cee3 100644 --- a/go.mod +++ b/go.mod @@ -51,7 +51,7 @@ require ( github.com/schollz/closestmatch v2.1.0+incompatible github.com/shopspring/decimal v1.3.1 github.com/streamingfast/dhttp v0.0.2-0.20220314180036-95936809c4b8 - github.com/streamingfast/dmetering v0.0.0-20240215171500-4f0413a948bb + github.com/streamingfast/dmetering v0.0.0-20240226195549-85c4cb6f8a46 github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545 github.com/streamingfast/eth-go v0.0.0-20230410173454-433bd8803da1 github.com/streamingfast/sf-tracing v0.0.0-20240209202324-9daa52c71a52 diff --git a/go.sum b/go.sum index ce2c2957e..9621eb26b 100644 --- a/go.sum +++ b/go.sum @@ -563,8 +563,8 @@ github.com/streamingfast/dgrpc v0.0.0-20240219152146-57bb131c39ca h1:/k5H6MUo5Vi github.com/streamingfast/dgrpc v0.0.0-20240219152146-57bb131c39ca/go.mod h1:NuKCwOHjbT0nRji0O+7+c70AiBfLHEKNoovs/gFfMPY= github.com/streamingfast/dhttp v0.0.2-0.20220314180036-95936809c4b8 h1:NSXRm1UBMQ9LoKMh5gBY410v6Gc1ZCvKtdoarsAIzb4= github.com/streamingfast/dhttp v0.0.2-0.20220314180036-95936809c4b8/go.mod h1:lBxqLQNmDm+1rQwNP/kAgucXGKDg+e/FkSnQhWyLT8E= -github.com/streamingfast/dmetering v0.0.0-20240215171500-4f0413a948bb h1:3zVpm+OkXNyremxpRpoW+HO2OpvxTTLsYMDNUg1zsFQ= -github.com/streamingfast/dmetering v0.0.0-20240215171500-4f0413a948bb/go.mod h1:UqWuX3REU/IInBUaymFN2eLjuvz+/0SsoUFjeQlLNyI= +github.com/streamingfast/dmetering v0.0.0-20240226195549-85c4cb6f8a46 h1:YedWshNbK4kcRNdkCN9bTv9gGOgBc8lz5i9HMkSBWew= +github.com/streamingfast/dmetering v0.0.0-20240226195549-85c4cb6f8a46/go.mod h1:UqWuX3REU/IInBUaymFN2eLjuvz+/0SsoUFjeQlLNyI= github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545 h1:SUl04bZKGAv207lp7/6CHOJIRpjUKunwItrno3K463Y= github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545/go.mod h1:JbxEDbzWRG1dHdNIPrYfuPllEkktZMgm40AwVIBENcw= github.com/streamingfast/dstore v0.1.1-0.20240215171730-493ad5a0f537 h1:HWqY7nS+kmQXPjcASsMr6n4rGYQvMc1iJEv4/+EL1O0= diff --git a/pipeline/process_block.go b/pipeline/process_block.go index b382bd5f6..08083f198 100644 --- a/pipeline/process_block.go +++ b/pipeline/process_block.go @@ -236,6 +236,8 @@ func (p *Pipeline) handleStepNew(ctx context.Context, block *pbbstream.Block, cl return fmt.Errorf("pre block hook: %w", err) } + dmetering.GetBytesMeter(ctx).CountInc("wasm_input_bytes", len(block.Payload.Value)) + if err := p.executeModules(ctx, execOutput); err != nil { return fmt.Errorf("execute modules: %w", err) } diff --git a/service/metering.go b/service/metering.go index d46a5ccdb..becbde2b4 100644 --- a/service/metering.go +++ b/service/metering.go @@ -12,6 +12,9 @@ func sendMetering(meter dmetering.Meter, userID, apiKeyID, ip, userMeta, endpoin bytesRead := meter.BytesReadDelta() bytesWritten := meter.BytesWrittenDelta() + inputBytes := meter.GetCount("wasm_input_bytes") + meter.ResetCount("wasm_input_bytes") + event := dmetering.Event{ UserID: userID, ApiKeyID: apiKeyID, @@ -20,10 +23,11 @@ func sendMetering(meter dmetering.Meter, userID, apiKeyID, ip, userMeta, endpoin Endpoint: endpoint, Metrics: map[string]float64{ - "egress_bytes": float64(proto.Size(resp)), - "written_bytes": float64(bytesWritten), - "read_bytes": float64(bytesRead), - "message_count": 1, + "egress_bytes": float64(proto.Size(resp)), + "written_bytes": float64(bytesWritten), + "read_bytes": float64(bytesRead), + "wasm_input_bytes": float64(inputBytes), + "message_count": 1, }, Timestamp: time.Now(), } diff --git a/service/tier1.go b/service/tier1.go index 201031cc6..59519d03e 100644 --- a/service/tier1.go +++ b/service/tier1.go @@ -176,6 +176,7 @@ func (s *Tier1Service) Blocks( ctx = logging.WithLogger(ctx, logger) ctx = reqctx.WithTracer(ctx, s.tracer) ctx = dmetering.WithBytesMeter(ctx) + ctx = dmetering.WithCounter(ctx, "wasm_input_bytes") ctx, span := reqctx.WithSpan(ctx, "substreams/tier1/request") defer span.EndWithErr(&err) diff --git a/service/tier2.go b/service/tier2.go index 62c075f25..b81de7e69 100644 --- a/service/tier2.go +++ b/service/tier2.go @@ -134,6 +134,7 @@ func (s *Tier2Service) ProcessRange(request *pbssinternal.ProcessRangeRequest, s ctx = logging.WithLogger(ctx, logger) ctx = dmetering.WithBytesMeter(ctx) + ctx = dmetering.WithCounter(ctx, "wasm_input_bytes") ctx = reqctx.WithTracer(ctx, s.tracer) ctx, span := reqctx.WithSpan(ctx, "substreams/tier2/request")