Skip to content

Commit

Permalink
metering: make metering goroutine-safe. prepare for future metering t…
Browse files Browse the repository at this point in the history
…ests in integration test package
  • Loading branch information
colindickson committed Oct 16, 2024
1 parent e1e7647 commit 8091680
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 62 deletions.
34 changes: 33 additions & 1 deletion metering/metering.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package metering
import (
"context"
"fmt"
"sync"
"time"

"github.com/streamingfast/bstream"
Expand Down Expand Up @@ -102,7 +103,20 @@ func FileSourceMiddlewareHandlerFactory(ctx context.Context) func(handler bstrea
}
}

func Send(ctx context.Context, userID, apiKeyID, ip, userMeta, endpoint string, resp proto.Message) {
type MetricsSender struct {
sync.Mutex
}

func NewMetricsSender() *MetricsSender {
return &MetricsSender{
Mutex: sync.Mutex{},
}
}

func (ms *MetricsSender) Send(ctx context.Context, userID, apiKeyID, ip, userMeta, endpoint string, resp proto.Message) {
ms.Lock()
defer ms.Unlock()

if reqctx.IsBackfillerRequest(ctx) {
endpoint = fmt.Sprintf("%s%s", endpoint, "Backfill")
}
Expand Down Expand Up @@ -157,3 +171,21 @@ func Send(ctx context.Context, userID, apiKeyID, ip, userMeta, endpoint string,
emitter.Emit(context.WithoutCancel(ctx), event)
}
}

func WithMetricsSender(ctx context.Context) context.Context {
//check if already set
if GetMetricsSender(ctx) != nil {
return ctx
}

sender := NewMetricsSender()
return context.WithValue(ctx, "metrics_sender", sender)
}

func GetMetricsSender(ctx context.Context) *MetricsSender {
sender, ok := ctx.Value("metrics_sender").(*MetricsSender)
if !ok {
panic("metrics sender not set")
}
return sender
}
158 changes: 148 additions & 10 deletions metering/metering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import (
"bytes"
"context"
"io"
"math/rand"
"sync"
"testing"
"time"

"github.com/streamingfast/bstream"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
Expand All @@ -22,12 +25,14 @@ func TestWithBlockBytesReadMeteringOptions(t *testing.T) {

opts := WithBlockBytesReadMeteringOptions(meter, nil)

store, err := dstore.NewStore("memory://test", ".test", "zstd", false, opts...)
// fake store. doesn't literally need to contain blocks data for the purposes of this test.
store, err := dstore.NewStore("memory://test", ".test", "gzip", false, opts...)
if err != nil {
t.Fatal(err)
}

err = store.WriteObject(nil, "test", bytes.NewReader([]byte("1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111")))
// write some random bytes to the store
err = store.WriteObject(nil, "test", bytes.NewReader([]byte("9t47flpnxpr5izkccod2rstoiyd89xdz4o7dvjunk70qvkystzb0v95noggt386dzfuozsz7ufk0xi11e2ndbbrx652yu4qe0u40zaj9oq98d1rga38d2h8f6xcjvp3oovotoczw5f8tb4jar1mfmo7mqc77ee22")))
if err != nil {
t.Fatal(err)
}
Expand All @@ -43,14 +48,18 @@ func TestWithBlockBytesReadMeteringOptions(t *testing.T) {
}
_ = r.Close()

assert.Equal(t, 24, meter.GetCount(MeterFileCompressedReadBytes))
// only compressed read bytes are metered because the uncompressed is metered via the bstream live handler middleware
assert.Equal(t, 147, meter.GetCount(MeterFileCompressedReadBytes))
assert.Equal(t, 0, meter.GetCount(MeterFileUncompressedReadBytes))

// written bytes are not metered because we do not write block files in substreams
assert.Equal(t, 0, meter.GetCount(MeterFileUncompressedWriteBytes))
assert.Equal(t, 0, meter.GetCount(MeterFileCompressedWriteBytes))

assert.Equal(t, 0, meter.GetCount(MeterLiveUncompressedReadBytes))
}

func TestWithBytesReadMeteringOptions(t *testing.T) {
func TestWithBytesReadMeteringOptionsZstd(t *testing.T) {
meter := dmetering.NewBytesMeter()

opts := WithBytesMeteringOptions(meter, nil)
Expand All @@ -60,7 +69,10 @@ func TestWithBytesReadMeteringOptions(t *testing.T) {
t.Fatal(err)
}

err = store.WriteObject(nil, "test", bytes.NewReader([]byte("1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111")))
uncompressedSize := 1024
compressedSize := 24

err = store.WriteObject(nil, "test", bytes.NewReader(bytes.Repeat([]byte("1"), uncompressedSize)))
if err != nil {
t.Fatal(err)
}
Expand All @@ -76,10 +88,58 @@ func TestWithBytesReadMeteringOptions(t *testing.T) {
}
_ = r.Close()

assert.Equal(t, 24, meter.GetCount(MeterFileCompressedReadBytes))
assert.Equal(t, 727, meter.GetCount(MeterFileUncompressedReadBytes))
assert.Equal(t, 727, meter.GetCount(MeterFileUncompressedWriteBytes))
assert.Equal(t, 24, meter.GetCount(MeterFileCompressedWriteBytes))
// sanity check
assert.Greater(t, uncompressedSize, compressedSize)

// the amount read and written should be equal
assert.Equal(t, uncompressedSize, meter.GetCount(MeterFileUncompressedReadBytes))
assert.Equal(t, uncompressedSize, meter.GetCount(MeterFileUncompressedWriteBytes))

assert.Equal(t, compressedSize, meter.GetCount(MeterFileCompressedReadBytes))
assert.Equal(t, compressedSize, meter.GetCount(MeterFileCompressedWriteBytes))

assert.Equal(t, 0, meter.GetCount(MeterLiveUncompressedReadBytes))
}

func TestWithBytesReadMeteringOptionsGzip(t *testing.T) {
meter := dmetering.NewBytesMeter()

opts := WithBytesMeteringOptions(meter, nil)

store, err := dstore.NewStore("memory://test", ".test", "gzip", false, opts...)
if err != nil {
t.Fatal(err)
}

uncompressedSize := 1024
compressedSize := 32

err = store.WriteObject(nil, "test", bytes.NewReader(bytes.Repeat([]byte("1"), uncompressedSize)))
if err != nil {
t.Fatal(err)
}

r, err := store.OpenObject(nil, "test")
if err != nil {
t.Fatal(err)
}

_, err = io.ReadAll(r)
if err != nil {
t.Fatal(err)
}
_ = r.Close()

// sanity check
assert.Greater(t, uncompressedSize, compressedSize)

// the amount read and written should be equal
assert.Equal(t, uncompressedSize, meter.GetCount(MeterFileUncompressedReadBytes))
assert.Equal(t, uncompressedSize, meter.GetCount(MeterFileUncompressedWriteBytes))

assert.Equal(t, compressedSize, meter.GetCount(MeterFileCompressedReadBytes))
assert.Equal(t, compressedSize, meter.GetCount(MeterFileCompressedWriteBytes))

assert.Equal(t, 0, meter.GetCount(MeterLiveUncompressedReadBytes))
}

Expand Down Expand Up @@ -324,8 +384,10 @@ func TestSend(t *testing.T) {
emitter := &mockEmitter{}
ctx = reqctx.WithEmitter(ctx, emitter)

metericsSender := NewMetricsSender()

// Call the Send function
Send(ctx, "user1", "apiKey1", "127.0.0.1", "meta", "endpoint", resp)
metericsSender.Send(ctx, "user1", "apiKey1", "127.0.0.1", "meta", "endpoint", resp)

// Verify the emitted event
assert.Len(t, emitter.events, 1)
Expand All @@ -348,6 +410,82 @@ func TestSend(t *testing.T) {
assert.Equal(t, float64(1), event.Metrics["message_count"])
}

func TestSendParallel(t *testing.T) {
ctx := dmetering.WithBytesMeter(context.Background())
meter := dmetering.GetBytesMeter(ctx)

// Mock response
resp := &pbbstream.Block{
Id: "test-block",
Number: 1,
}

// Mock emitter
emitter := &mockEmitter{}
ctx = reqctx.WithEmitter(ctx, emitter)

metricsSender := NewMetricsSender()

randomInt := func() int {
return rand.Intn(100) + 1
}

const numGoroutines = 10000
var wg sync.WaitGroup
wg.Add(numGoroutines)

for i := 0; i < numGoroutines; i++ {
go func() {
defer wg.Done()

time.Sleep(time.Duration(randomInt()) * time.Nanosecond)
// Set initial meter values
meter.CountInc(MeterWasmInputBytes, 100)
meter.CountInc(MeterLiveUncompressedReadBytes, 200)
meter.CountInc(MeterFileUncompressedReadBytes, 300)
meter.CountInc(MeterFileCompressedReadBytes, 400)
meter.CountInc(MeterFileUncompressedWriteBytes, 500)
meter.CountInc(MeterFileCompressedWriteBytes, 600)

time.Sleep(time.Duration(randomInt()) * time.Nanosecond)
metricsSender.Send(ctx, "user1", "apiKey1", "127.0.0.1", "meta", "endpoint", resp)
}()
}

wg.Wait()

TotalEgressBytes := 0.0
TotalMeterWasmInputBytes := 0.0
TotalMeterLiveUncompressedReadBytes := 0.0
TotalMeterFileUncompressedReadBytes := 0.0
TotalMeterFileCompressedReadBytes := 0.0
TotalMeterFileUncompressedWriteBytes := 0.0
TotalMeterFileCompressedWriteBytes := 0.0

// Verify the emitted events
assert.Len(t, emitter.events, numGoroutines)
for i, event := range emitter.events {
_ = i
TotalEgressBytes += event.Metrics["egress_bytes"]
TotalMeterWasmInputBytes += event.Metrics[MeterWasmInputBytes]
TotalMeterLiveUncompressedReadBytes += event.Metrics[MeterLiveUncompressedReadBytes]
TotalMeterFileUncompressedReadBytes += event.Metrics[MeterFileUncompressedReadBytes]
TotalMeterFileCompressedReadBytes += event.Metrics[MeterFileCompressedReadBytes]
TotalMeterFileUncompressedWriteBytes += event.Metrics[MeterFileUncompressedWriteBytes]
TotalMeterFileCompressedWriteBytes += event.Metrics[MeterFileCompressedWriteBytes]
}

assert.Equal(t, numGoroutines*float64(proto.Size(resp)), TotalEgressBytes)
assert.Equal(t, numGoroutines*float64(100), TotalMeterWasmInputBytes)
assert.Equal(t, numGoroutines*float64(200), TotalMeterLiveUncompressedReadBytes)
assert.Equal(t, numGoroutines*float64(300), TotalMeterFileUncompressedReadBytes)
assert.Equal(t, numGoroutines*float64(400), TotalMeterFileCompressedReadBytes)
assert.Equal(t, numGoroutines*float64(500), TotalMeterFileUncompressedWriteBytes)
assert.Equal(t, numGoroutines*float64(600), TotalMeterFileCompressedWriteBytes)
assert.Equal(t, numGoroutines*float64(1), float64(numGoroutines))

}

func bstreamBlk(t *testing.T, blk *pbsubstreamstest.Block) *pbbstream.Block {
payload, err := anypb.New(blk)
assert.NoError(t, err)
Expand Down
4 changes: 3 additions & 1 deletion service/tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func (s *Tier1Service) Blocks(
ctx = logging.WithLogger(ctx, logger)
ctx = reqctx.WithTracer(ctx, s.tracer)
ctx = dmetering.WithBytesMeter(ctx)
ctx = metering.WithMetricsSender(ctx)
ctx = reqctx.WithTier2RequestParameters(ctx, s.tier2RequestParameters)

ctx, span := reqctx.WithSpan(ctx, "substreams/tier1/request")
Expand Down Expand Up @@ -607,6 +608,7 @@ func tier1ResponseHandler(ctx context.Context, mut *sync.Mutex, logger *zap.Logg
ip := auth.RealIP()

ctx = reqctx.WithEmitter(ctx, dmetering.GetDefaultEmitter())
metericsSender := metering.GetMetricsSender(ctx)

return func(respAny substreams.ResponseFromAnyTier) error {
resp := respAny.(*pbsubstreamsrpc.Response)
Expand All @@ -622,7 +624,7 @@ func tier1ResponseHandler(ctx context.Context, mut *sync.Mutex, logger *zap.Logg
return connect.NewError(connect.CodeUnavailable, err)
}

metering.Send(ctx, userID, apiKeyID, ip, userMeta, "sf.substreams.rpc.v2/Blocks", resp)
metericsSender.Send(ctx, userID, apiKeyID, ip, userMeta, "sf.substreams.rpc.v2/Blocks", resp)
return nil
}
}
Expand Down
5 changes: 4 additions & 1 deletion service/tier2.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func (s *Tier2Service) ProcessRange(request *pbssinternal.ProcessRangeRequest, s
ctx = logging.WithLogger(ctx, logger)
ctx = dmetering.WithBytesMeter(ctx)
ctx = reqctx.WithTracer(ctx, s.tracer)
ctx = metering.WithMetricsSender(ctx)

ctx, span := reqctx.WithSpan(ctx, "substreams/tier2/request")
defer span.EndWithErr(&err)
Expand Down Expand Up @@ -506,6 +507,8 @@ func tier2ResponseHandler(ctx context.Context, logger *zap.Logger, streamSrv pbs
logger.Warn("no auth information available in tier2 response handler")
}

metricsSender := metering.GetMetricsSender(ctx)

return func(respAny substreams.ResponseFromAnyTier) error {
resp := respAny.(*pbssinternal.ProcessRangeResponse)
if err := streamSrv.Send(resp); err != nil {
Expand All @@ -520,7 +523,7 @@ func tier2ResponseHandler(ctx context.Context, logger *zap.Logger, streamSrv pbs
zap.String("user_meta", userMeta),
zap.String("endpoint", "sf.substreams.internal.v2/ProcessRange"),
)
metering.Send(ctx, userID, apiKeyID, ip, userMeta, "sf.substreams.internal.v2/ProcessRange", resp)
metricsSender.Send(ctx, userID, apiKeyID, ip, userMeta, "sf.substreams.internal.v2/ProcessRange", resp)
return nil
}
}
Expand Down
12 changes: 4 additions & 8 deletions test/blockgens_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,12 @@ import (
"strconv"
"strings"

"google.golang.org/protobuf/types/known/timestamppb"

"google.golang.org/protobuf/types/known/anypb"

pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"

"github.com/streamingfast/bstream/forkable"

"github.com/streamingfast/bstream"
"github.com/streamingfast/bstream/forkable"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
pbsubstreamstest "github.com/streamingfast/substreams/pb/sf/substreams/v1/test"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"
)

type BlockCursor struct {
Expand Down
7 changes: 5 additions & 2 deletions test/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,16 @@ type responseCollector struct {
responses []*pbsubstreamsrpc.Response
internalResponses []*pbssinternal.ProcessRangeResponse

sender *metering.MetricsSender

ctx context.Context
}

func newResponseCollector(ctx context.Context) *responseCollector {
rc := &responseCollector{}
rc.ctx = reqctx.WithEmitter(ctx, rc)
rc.eventsCollector = eventsCollectorFromContext(ctx)
rc.sender = metering.NewMetricsSender()

return rc
}
Expand All @@ -61,10 +64,10 @@ func (c *responseCollector) Collect(respAny substreams.ResponseFromAnyTier) erro
switch resp := respAny.(type) {
case *pbsubstreamsrpc.Response:
c.responses = append(c.responses, resp)
metering.Send(c.ctx, "test_user", "test_api_key", "10.0.0.1", "test_meta", "tier1", resp)
c.sender.Send(c.ctx, "test_user", "test_api_key", "10.0.0.1", "test_meta", "tier1", resp)
case *pbssinternal.ProcessRangeResponse:
c.internalResponses = append(c.internalResponses, resp)
metering.Send(c.ctx, "test_user", "test_api_key", "10.0.0.1", "test_meta", "tier2", resp)
c.sender.Send(c.ctx, "test_user", "test_api_key", "10.0.0.1", "test_meta", "tier2", resp)
}
return nil
}
Loading

0 comments on commit 8091680

Please sign in to comment.