From ced4b37a494b93ff732e2cc515ec7a76088282ef Mon Sep 17 00:00:00 2001 From: colindickson Date: Mon, 14 Oct 2024 11:24:51 -0400 Subject: [PATCH] metering: refactor wasm input bytes metering to a function in the metering package --- metering/metering.go | 4 ++++ metering/metering_test.go | 9 +++++++++ pipeline/process_block.go | 3 ++- 3 files changed, 15 insertions(+), 1 deletion(-) diff --git a/metering/metering.go b/metering/metering.go index 6c4843be..76ef04cb 100644 --- a/metering/metering.go +++ b/metering/metering.go @@ -60,6 +60,10 @@ func WithBytesMeteringOptions(meter dmetering.Meter, logger *zap.Logger) []dstor return opts } +func AddWasmInputBytes(ctx context.Context, n int) { + dmetering.GetBytesMeter(ctx).CountInc(MeterWasmInputBytes, n) +} + func GetTotalBytesRead(meter dmetering.Meter) uint64 { total := uint64(meter.GetCount(TotalReadBytes)) return total diff --git a/metering/metering_test.go b/metering/metering_test.go index 08011ea8..c2fbe511 100644 --- a/metering/metering_test.go +++ b/metering/metering_test.go @@ -293,6 +293,15 @@ func TestLiveSourceMiddlewareHandlerFactory(t *testing.T) { } } +func TestAddWasmInputBytes(t *testing.T) { + ctx := dmetering.WithBytesMeter(context.Background()) + meter := dmetering.GetBytesMeter(ctx) + + AddWasmInputBytes(ctx, 10) + + assert.Equal(t, 10, meter.GetCount(MeterWasmInputBytes)) +} + func TestSend(t *testing.T) { ctx := dmetering.WithBytesMeter(context.Background()) meter := dmetering.GetBytesMeter(ctx) diff --git a/pipeline/process_block.go b/pipeline/process_block.go index 48df2a6a..17d93b45 100644 --- a/pipeline/process_block.go +++ b/pipeline/process_block.go @@ -256,7 +256,8 @@ func (p *Pipeline) handleStepNew(ctx context.Context, clock *pbsubstreams.Clock, return fmt.Errorf("pre block hook: %w", err) } - dmetering.GetBytesMeter(ctx).CountInc(metering.MeterWasmInputBytes, execOutput.Len()) + metering.AddWasmInputBytes(ctx, execOutput.Len()) + if err := p.executeModules(ctx, execOutput); err != nil { return fmt.Errorf("execute modules: %w", err) }