Skip to content

Commit

Permalink
metering updates. add substreams_metering_events to prometheus and so…
Browse files Browse the repository at this point in the history
…me integration tests
  • Loading branch information
colindickson committed Oct 3, 2024
1 parent f76f2b1 commit 9b24472
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 12 deletions.
8 changes: 7 additions & 1 deletion metering/metering.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"time"

"github.com/streamingfast/substreams/metrics"

"github.com/streamingfast/dmetering"
"github.com/streamingfast/dstore"
"github.com/streamingfast/substreams/reqctx"
Expand Down Expand Up @@ -77,11 +79,13 @@ func GetTotalBytesWritten(meter dmetering.Meter) uint64 {
return total
}

func Send(ctx context.Context, meter dmetering.Meter, userID, apiKeyID, ip, userMeta, endpoint string, resp proto.Message) {
func Send(ctx context.Context, userID, apiKeyID, ip, userMeta, endpoint string, resp proto.Message) {
if reqctx.IsBackfillerRequest(ctx) {
endpoint = fmt.Sprintf("%s%s", endpoint, "Backfill")
}

meter := dmetering.GetBytesMeter(ctx)

bytesRead := meter.BytesReadDelta()
bytesWritten := meter.BytesWrittenDelta()
egressBytes := proto.Size(resp)
Expand Down Expand Up @@ -135,4 +139,6 @@ func Send(ctx context.Context, meter dmetering.Meter, userID, apiKeyID, ip, user
} else {
emitter.Emit(context.WithoutCancel(ctx), event)
}

metrics.MeteringEvents.Inc()
}
2 changes: 2 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ var Tier2RequestCounter = MetricSet.NewCounter("substreams_tier2_request_counter
var AppReadinessTier1 = MetricSet.NewAppReadiness("substreams_tier1")
var AppReadinessTier2 = MetricSet.NewAppReadiness("substreams_tier2")

var MeteringEvents = MetricSet.NewCounter("substreams_metering_events", "Number of metering events")

var registerOnce sync.Once

func RegisterMetricSet(zlog *zap.Logger) {
Expand Down
4 changes: 2 additions & 2 deletions service/tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ func tier1ResponseHandler(ctx context.Context, mut *sync.Mutex, logger *zap.Logg
apiKeyID := auth.APIKeyID()
userMeta := auth.Meta()
ip := auth.RealIP()
meter := dmetering.GetBytesMeter(ctx)

ctx = reqctx.WithEmitter(ctx, dmetering.GetDefaultEmitter())

return func(respAny substreams.ResponseFromAnyTier) error {
Expand All @@ -652,7 +652,7 @@ func tier1ResponseHandler(ctx context.Context, mut *sync.Mutex, logger *zap.Logg
return connect.NewError(connect.CodeUnavailable, err)
}

metering.Send(ctx, meter, userID, apiKeyID, ip, userMeta, "sf.substreams.rpc.v2/Blocks", resp)
metering.Send(ctx, userID, apiKeyID, ip, userMeta, "sf.substreams.rpc.v2/Blocks", resp)
return nil
}
}
Expand Down
4 changes: 1 addition & 3 deletions service/tier2.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,8 +512,6 @@ func canSkipBlockSource(existingExecOuts map[string]*execout.File, requiredModul
}

func tier2ResponseHandler(ctx context.Context, logger *zap.Logger, streamSrv pbssinternal.Substreams_ProcessRangeServer) substreams.ResponseFunc {
meter := dmetering.GetBytesMeter(ctx)

var userID, apiKeyID, userMeta, ip string
if auth := dauth.FromContext(ctx); auth != nil {
userID = auth.UserID()
Expand All @@ -539,7 +537,7 @@ func tier2ResponseHandler(ctx context.Context, logger *zap.Logger, streamSrv pbs
zap.String("user_meta", userMeta),
zap.String("endpoint", "sf.substreams.internal.v2/ProcessRange"),
)
metering.Send(ctx, meter, userID, apiKeyID, ip, userMeta, "sf.substreams.internal.v2/ProcessRange", resp)
metering.Send(ctx, userID, apiKeyID, ip, userMeta, "sf.substreams.internal.v2/ProcessRange", resp)
return nil
}
}
Expand Down
48 changes: 46 additions & 2 deletions test/collector_test.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,70 @@
package integration

import (
"context"

"github.com/streamingfast/dmetering"
"github.com/streamingfast/substreams"
"github.com/streamingfast/substreams/metering"
pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2"
pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2"
"github.com/streamingfast/substreams/reqctx"
)

type eventsCollector struct {
events []dmetering.Event
}

func (c *eventsCollector) Emit(_ context.Context, ev dmetering.Event) {
c.events = append(c.events, ev)
}

func (c *eventsCollector) Shutdown(_ error) {
return
}

func (c *eventsCollector) Events() []dmetering.Event {
return c.events
}

var eventsCollectorKey = "eventsCollector"

func withEventsCollector(ctx context.Context, collector *eventsCollector) context.Context {
return context.WithValue(ctx, eventsCollectorKey, collector)
}

func eventsCollectorFromContext(ctx context.Context) *eventsCollector {
if ev, ok := ctx.Value(eventsCollectorKey).(*eventsCollector); ok {
return ev
}
return &eventsCollector{}
}

type responseCollector struct {
*eventsCollector

responses []*pbsubstreamsrpc.Response
internalResponses []*pbssinternal.ProcessRangeResponse

ctx context.Context
}

func newResponseCollector() *responseCollector {
return &responseCollector{}
func newResponseCollector(ctx context.Context) *responseCollector {
rc := &responseCollector{}
rc.ctx = reqctx.WithEmitter(ctx, rc)
rc.eventsCollector = eventsCollectorFromContext(ctx)

return rc
}

func (c *responseCollector) Collect(respAny substreams.ResponseFromAnyTier) error {
switch resp := respAny.(type) {
case *pbsubstreamsrpc.Response:
c.responses = append(c.responses, resp)
metering.Send(c.ctx, "test_user", "test_api_key", "10.0.0.1", "test_meta", "tier1", resp)
case *pbssinternal.ProcessRangeResponse:
c.internalResponses = append(c.internalResponses, resp)
metering.Send(c.ctx, "test_user", "test_api_key", "10.0.0.1", "test_meta", "tier2", resp)
}
return nil
}
35 changes: 34 additions & 1 deletion test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ func TestOneStoreOneMap(t *testing.T) {
spkg string
expectFiles []string
expectError string
expectTier1Events bool
expectTier2Events bool
}{
{
name: "dev_mode_backprocess",
Expand All @@ -208,12 +210,13 @@ func TestOneStoreOneMap(t *testing.T) {
production: false,
expectedResponseCount: 4,
expectFiles: []string{

testStoreAddI64Hash + "/outputs/0000000001-0000000010.output", // store outputs
testStoreAddI64Hash + "/outputs/0000000010-0000000020.output",
testStoreAddI64Hash + "/states/0000000010-0000000001.kv", // store states
testStoreAddI64Hash + "/states/0000000020-0000000001.kv",
},
expectTier1Events: true,
expectTier2Events: true,
},
{
name: "dev_mode_backprocess_then_save_state",
Expand All @@ -229,6 +232,8 @@ func TestOneStoreOneMap(t *testing.T) {
testStoreAddI64Hash + "/states/0000000010-0000000001.kv", // store states
testStoreAddI64Hash + "/states/0000000020-0000000001.kv",
},
expectTier1Events: true,
expectTier2Events: true,
},
{
name: "prod_mode_back_forward_to_lib",
Expand All @@ -244,6 +249,8 @@ func TestOneStoreOneMap(t *testing.T) {
testStoreAddI64Hash + "/states/0000000010-0000000001.kv",
testStoreAddI64Hash + "/states/0000000020-0000000001.kv",
},
expectTier1Events: true,
expectTier2Events: true,
},
{
name: "prod_mode_back_forward_to_stop",
Expand All @@ -262,6 +269,8 @@ func TestOneStoreOneMap(t *testing.T) {
testStoreAddI64Hash + "/states/0000000030-0000000001.kv",
assertTestStoreAddI64Hash + "/outputs/0000000020-0000000030.output", // map
},
expectTier1Events: true,
expectTier2Events: true,
},
{
name: "prod_mode_back_forward_to_stop_nonzero_first_streamable",
Expand All @@ -280,6 +289,8 @@ func TestOneStoreOneMap(t *testing.T) {
testStoreAddI64Hash + "/states/0000000020-0000000016.kv",
testStoreAddI64Hash + "/states/0000000030-0000000016.kv",
},
expectTier1Events: true,
expectTier2Events: true,
},
{
name: "nonzero_first_streamable on nonzero module",
Expand Down Expand Up @@ -312,6 +323,8 @@ func TestOneStoreOneMap(t *testing.T) {
assertTestStoreAddI64Hash + "/outputs/0000000020-0000000030.output", // map
assertTestStoreAddI64Hash + "/outputs/0000000030-0000000040.output",
},
expectTier1Events: true,
expectTier2Events: true,
},
{
name: "prod_mode_partial_existing",
Expand All @@ -335,6 +348,8 @@ func TestOneStoreOneMap(t *testing.T) {
assertTestStoreAddI64Hash + "/outputs/0000000010-0000000020.output",
assertTestStoreAddI64Hash + "/outputs/0000000020-0000000030.output",
},
expectTier1Events: true,
expectTier2Events: true,
},
}

Expand All @@ -359,6 +374,24 @@ func TestOneStoreOneMap(t *testing.T) {
mapOutput := run.MapOutputString("assert_test_store_add_i64")
assert.Contains(t, mapOutput, `assert_test_store_add_i64: 0801`)

var tier1EventsFound bool
for _, ev := range run.Events {
if ev.Endpoint == "tier1" {
tier1EventsFound = true
break
}
}
assert.Equal(t, test.expectTier1Events, tier1EventsFound)

var tier2EventsFound bool
for _, ev := range run.Events {
if ev.Endpoint == "tier2" {
tier2EventsFound = true
break
}
}
assert.Equal(t, test.expectTier2Events, tier2EventsFound)

assert.Equal(t, test.expectedResponseCount, strings.Count(mapOutput, "\n"))

withZST := func(s []string) []string {
Expand Down
10 changes: 8 additions & 2 deletions test/runnable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"testing"
"time"

"github.com/streamingfast/dmetering"

"go.opentelemetry.io/otel/attribute"

"github.com/streamingfast/bstream"
Expand Down Expand Up @@ -59,6 +61,7 @@ type testRun struct {
Params map[string]string

Responses []*pbsubstreamsrpc.Response
Events []dmetering.Event
TempDir string
}

Expand All @@ -85,6 +88,7 @@ func (f *testRun) run(t *testing.T, testName string) error {
}

ctx = reqctx.WithLogger(ctx, zlog)
ctx = dmetering.WithBytesMeter(ctx)

os.Setenv("TEST_TEMP_DIR", f.TempDir)

Expand Down Expand Up @@ -127,7 +131,8 @@ func (f *testRun) run(t *testing.T, testName string) error {
f.ParallelSubrequests = 1
}

responseCollector := newResponseCollector()
ctx = withEventsCollector(ctx, &eventsCollector{})
responseCollector := newResponseCollector(ctx)

newBlockGenerator := func(startBlock uint64, inclusiveStopBlock uint64) TestBlockGenerator {
return &LinearBlockGenerator{
Expand All @@ -142,7 +147,7 @@ func (f *testRun) run(t *testing.T, testName string) error {
workerFactory := func(_ *zap.Logger) work.Worker {
return &TestWorker{
t: t,
responseCollector: newResponseCollector(),
responseCollector: newResponseCollector(ctx),
newBlockGenerator: newBlockGenerator,
blockProcessedCallBack: f.BlockProcessedCallback,
jobCallBack: f.JobCallback,
Expand All @@ -161,6 +166,7 @@ func (f *testRun) run(t *testing.T, testName string) error {
}

f.Responses = responseCollector.responses
f.Events = responseCollector.eventsCollector.Events()

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion test/tier2_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ func TestTier2Call(t *testing.T) {
StateStoreDefaultTag: "tag",
})

responseCollector := newResponseCollector()
responseCollector := newResponseCollector(ctx)

newBlockGenerator := func(startBlock uint64, inclusiveStopBlock uint64) TestBlockGenerator {
return &LinearBlockGenerator{
Expand Down

0 comments on commit 9b24472

Please sign in to comment.