Skip to content

Commit

Permalink
metering: add test for Send
Browse files Browse the repository at this point in the history
  • Loading branch information
colindickson committed Oct 12, 2024
1 parent 1f36d5b commit fc843ca
Showing 1 changed file with 60 additions and 0 deletions.
60 changes: 60 additions & 0 deletions metering/metering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
"github.com/streamingfast/dmetering"
"github.com/streamingfast/dstore"
pbsubstreamstest "github.com/streamingfast/substreams/pb/sf/substreams/v1/test"
"github.com/streamingfast/substreams/reqctx"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
)

Expand Down Expand Up @@ -291,6 +293,52 @@ func TestLiveSourceMiddlewareHandlerFactory(t *testing.T) {
}
}

func TestSend(t *testing.T) {
ctx := dmetering.WithBytesMeter(context.Background())
meter := dmetering.GetBytesMeter(ctx)

// Set initial meter values
meter.CountInc(MeterWasmInputBytes, 100)
meter.CountInc(MeterLiveUncompressedReadBytes, 200)
meter.CountInc(MeterFileUncompressedReadBytes, 300)
meter.CountInc(MeterFileCompressedReadBytes, 400)
meter.CountInc(MeterFileUncompressedWriteBytes, 500)
meter.CountInc(MeterFileCompressedWriteBytes, 600)

// Mock response
resp := &pbbstream.Block{
Id: "test-block",
Number: 1,
}

// Mock emitter
emitter := &mockEmitter{}
ctx = reqctx.WithEmitter(ctx, emitter)

// Call the Send function
Send(ctx, "user1", "apiKey1", "127.0.0.1", "meta", "endpoint", resp)

// Verify the emitted event
assert.Len(t, emitter.events, 1)
event := emitter.events[0]

assert.Equal(t, "user1", event.UserID)
assert.Equal(t, "apiKey1", event.ApiKeyID)
assert.Equal(t, "127.0.0.1", event.IpAddress)
assert.Equal(t, "meta", event.Meta)
assert.Equal(t, "endpoint", event.Endpoint)
assert.Equal(t, float64(proto.Size(resp)), event.Metrics["egress_bytes"])
assert.Equal(t, float64(0), event.Metrics["written_bytes"])
assert.Equal(t, float64(0), event.Metrics["read_bytes"])
assert.Equal(t, float64(100), event.Metrics[MeterWasmInputBytes])
assert.Equal(t, float64(200), event.Metrics[MeterLiveUncompressedReadBytes])
assert.Equal(t, float64(300), event.Metrics[MeterFileUncompressedReadBytes])
assert.Equal(t, float64(400), event.Metrics[MeterFileCompressedReadBytes])
assert.Equal(t, float64(500), event.Metrics[MeterFileUncompressedWriteBytes])
assert.Equal(t, float64(600), event.Metrics[MeterFileCompressedWriteBytes])
assert.Equal(t, float64(1), event.Metrics["message_count"])
}

func bstreamBlk(t *testing.T, blk *pbsubstreamstest.Block) *pbbstream.Block {
payload, err := anypb.New(blk)
assert.NoError(t, err)
Expand All @@ -317,3 +365,15 @@ func (t *testStepableObject) FinalBlockHeight() uint64 {
func (t *testStepableObject) ReorgJunctionBlock() bstream.BlockRef {
return nil
}

type mockEmitter struct {
events []dmetering.Event
}

func (m *mockEmitter) Emit(ctx context.Context, event dmetering.Event) {
m.events = append(m.events, event)
}

func (m *mockEmitter) Shutdown(err error) {
return
}

0 comments on commit fc843ca

Please sign in to comment.