diff --git a/metering/metering_test.go b/metering/metering_test.go index 3b8555c3..08011ea8 100644 --- a/metering/metering_test.go +++ b/metering/metering_test.go @@ -11,7 +11,9 @@ import ( "github.com/streamingfast/dmetering" "github.com/streamingfast/dstore" pbsubstreamstest "github.com/streamingfast/substreams/pb/sf/substreams/v1/test" + "github.com/streamingfast/substreams/reqctx" "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" ) @@ -291,6 +293,52 @@ func TestLiveSourceMiddlewareHandlerFactory(t *testing.T) { } } +func TestSend(t *testing.T) { + ctx := dmetering.WithBytesMeter(context.Background()) + meter := dmetering.GetBytesMeter(ctx) + + // Set initial meter values + meter.CountInc(MeterWasmInputBytes, 100) + meter.CountInc(MeterLiveUncompressedReadBytes, 200) + meter.CountInc(MeterFileUncompressedReadBytes, 300) + meter.CountInc(MeterFileCompressedReadBytes, 400) + meter.CountInc(MeterFileUncompressedWriteBytes, 500) + meter.CountInc(MeterFileCompressedWriteBytes, 600) + + // Mock response + resp := &pbbstream.Block{ + Id: "test-block", + Number: 1, + } + + // Mock emitter + emitter := &mockEmitter{} + ctx = reqctx.WithEmitter(ctx, emitter) + + // Call the Send function + Send(ctx, "user1", "apiKey1", "127.0.0.1", "meta", "endpoint", resp) + + // Verify the emitted event + assert.Len(t, emitter.events, 1) + event := emitter.events[0] + + assert.Equal(t, "user1", event.UserID) + assert.Equal(t, "apiKey1", event.ApiKeyID) + assert.Equal(t, "127.0.0.1", event.IpAddress) + assert.Equal(t, "meta", event.Meta) + assert.Equal(t, "endpoint", event.Endpoint) + assert.Equal(t, float64(proto.Size(resp)), event.Metrics["egress_bytes"]) + assert.Equal(t, float64(0), event.Metrics["written_bytes"]) + assert.Equal(t, float64(0), event.Metrics["read_bytes"]) + assert.Equal(t, float64(100), event.Metrics[MeterWasmInputBytes]) + assert.Equal(t, float64(200), event.Metrics[MeterLiveUncompressedReadBytes]) + assert.Equal(t, float64(300), event.Metrics[MeterFileUncompressedReadBytes]) + assert.Equal(t, float64(400), event.Metrics[MeterFileCompressedReadBytes]) + assert.Equal(t, float64(500), event.Metrics[MeterFileUncompressedWriteBytes]) + assert.Equal(t, float64(600), event.Metrics[MeterFileCompressedWriteBytes]) + assert.Equal(t, float64(1), event.Metrics["message_count"]) +} + func bstreamBlk(t *testing.T, blk *pbsubstreamstest.Block) *pbbstream.Block { payload, err := anypb.New(blk) assert.NoError(t, err) @@ -317,3 +365,15 @@ func (t *testStepableObject) FinalBlockHeight() uint64 { func (t *testStepableObject) ReorgJunctionBlock() bstream.BlockRef { return nil } + +type mockEmitter struct { + events []dmetering.Event +} + +func (m *mockEmitter) Emit(ctx context.Context, event dmetering.Event) { + m.events = append(m.events, event) +} + +func (m *mockEmitter) Shutdown(err error) { + return +}