diff --git a/metering/metering.go b/metering/metering.go index a24502fd..477ea998 100644 --- a/metering/metering.go +++ b/metering/metering.go @@ -5,6 +5,8 @@ import ( "fmt" "time" + "github.com/streamingfast/substreams/metrics" + "github.com/streamingfast/dmetering" "github.com/streamingfast/dstore" "github.com/streamingfast/substreams/reqctx" @@ -77,11 +79,13 @@ func GetTotalBytesWritten(meter dmetering.Meter) uint64 { return total } -func Send(ctx context.Context, meter dmetering.Meter, userID, apiKeyID, ip, userMeta, endpoint string, resp proto.Message) { +func Send(ctx context.Context, userID, apiKeyID, ip, userMeta, endpoint string, resp proto.Message) { if reqctx.IsBackfillerRequest(ctx) { endpoint = fmt.Sprintf("%s%s", endpoint, "Backfill") } + meter := dmetering.GetBytesMeter(ctx) + bytesRead := meter.BytesReadDelta() bytesWritten := meter.BytesWrittenDelta() egressBytes := proto.Size(resp) @@ -135,4 +139,6 @@ func Send(ctx context.Context, meter dmetering.Meter, userID, apiKeyID, ip, user } else { emitter.Emit(context.WithoutCancel(ctx), event) } + + metrics.MeteringEvents.Inc() } diff --git a/metrics/metrics.go b/metrics/metrics.go index 71db6cee..98d84496 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -30,6 +30,8 @@ var Tier2RequestCounter = MetricSet.NewCounter("substreams_tier2_request_counter var AppReadinessTier1 = MetricSet.NewAppReadiness("substreams_tier1") var AppReadinessTier2 = MetricSet.NewAppReadiness("substreams_tier2") +var MeteringEvents = MetricSet.NewCounter("substreams_metering_events", "Number of metering events") + var registerOnce sync.Once func RegisterMetricSet(zlog *zap.Logger) { diff --git a/service/tier1.go b/service/tier1.go index da77a9eb..f7343b56 100644 --- a/service/tier1.go +++ b/service/tier1.go @@ -635,7 +635,7 @@ func tier1ResponseHandler(ctx context.Context, mut *sync.Mutex, logger *zap.Logg apiKeyID := auth.APIKeyID() userMeta := auth.Meta() ip := auth.RealIP() - meter := dmetering.GetBytesMeter(ctx) + ctx = reqctx.WithEmitter(ctx, dmetering.GetDefaultEmitter()) return func(respAny substreams.ResponseFromAnyTier) error { @@ -652,7 +652,7 @@ func tier1ResponseHandler(ctx context.Context, mut *sync.Mutex, logger *zap.Logg return connect.NewError(connect.CodeUnavailable, err) } - metering.Send(ctx, meter, userID, apiKeyID, ip, userMeta, "sf.substreams.rpc.v2/Blocks", resp) + metering.Send(ctx, userID, apiKeyID, ip, userMeta, "sf.substreams.rpc.v2/Blocks", resp) return nil } } diff --git a/service/tier2.go b/service/tier2.go index 9092bdb9..aa67a3a1 100644 --- a/service/tier2.go +++ b/service/tier2.go @@ -512,8 +512,6 @@ func canSkipBlockSource(existingExecOuts map[string]*execout.File, requiredModul } func tier2ResponseHandler(ctx context.Context, logger *zap.Logger, streamSrv pbssinternal.Substreams_ProcessRangeServer) substreams.ResponseFunc { - meter := dmetering.GetBytesMeter(ctx) - var userID, apiKeyID, userMeta, ip string if auth := dauth.FromContext(ctx); auth != nil { userID = auth.UserID() @@ -539,7 +537,7 @@ func tier2ResponseHandler(ctx context.Context, logger *zap.Logger, streamSrv pbs zap.String("user_meta", userMeta), zap.String("endpoint", "sf.substreams.internal.v2/ProcessRange"), ) - metering.Send(ctx, meter, userID, apiKeyID, ip, userMeta, "sf.substreams.internal.v2/ProcessRange", resp) + metering.Send(ctx, userID, apiKeyID, ip, userMeta, "sf.substreams.internal.v2/ProcessRange", resp) return nil } } diff --git a/test/collector_test.go b/test/collector_test.go index adb100a5..d152530f 100644 --- a/test/collector_test.go +++ b/test/collector_test.go @@ -1,26 +1,70 @@ package integration import ( + "context" + + "github.com/streamingfast/dmetering" "github.com/streamingfast/substreams" + "github.com/streamingfast/substreams/metering" pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2" pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2" + "github.com/streamingfast/substreams/reqctx" ) +type eventsCollector struct { + events []dmetering.Event +} + +func (c *eventsCollector) Emit(_ context.Context, ev dmetering.Event) { + c.events = append(c.events, ev) +} + +func (c *eventsCollector) Shutdown(_ error) { + return +} + +func (c *eventsCollector) Events() []dmetering.Event { + return c.events +} + +var eventsCollectorKey = "eventsCollector" + +func withEventsCollector(ctx context.Context, collector *eventsCollector) context.Context { + return context.WithValue(ctx, eventsCollectorKey, collector) +} + +func eventsCollectorFromContext(ctx context.Context) *eventsCollector { + if ev, ok := ctx.Value(eventsCollectorKey).(*eventsCollector); ok { + return ev + } + return &eventsCollector{} +} + type responseCollector struct { + *eventsCollector + responses []*pbsubstreamsrpc.Response internalResponses []*pbssinternal.ProcessRangeResponse + + ctx context.Context } -func newResponseCollector() *responseCollector { - return &responseCollector{} +func newResponseCollector(ctx context.Context) *responseCollector { + rc := &responseCollector{} + rc.ctx = reqctx.WithEmitter(ctx, rc) + rc.eventsCollector = eventsCollectorFromContext(ctx) + + return rc } func (c *responseCollector) Collect(respAny substreams.ResponseFromAnyTier) error { switch resp := respAny.(type) { case *pbsubstreamsrpc.Response: c.responses = append(c.responses, resp) + metering.Send(c.ctx, "test_user", "test_api_key", "10.0.0.1", "test_meta", "tier1", resp) case *pbssinternal.ProcessRangeResponse: c.internalResponses = append(c.internalResponses, resp) + metering.Send(c.ctx, "test_user", "test_api_key", "10.0.0.1", "test_meta", "tier2", resp) } return nil } diff --git a/test/integration_test.go b/test/integration_test.go index dd734fc5..9fa2fb2a 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -198,6 +198,8 @@ func TestOneStoreOneMap(t *testing.T) { spkg string expectFiles []string expectError string + expectTier1Events bool + expectTier2Events bool }{ { name: "dev_mode_backprocess", @@ -208,12 +210,13 @@ func TestOneStoreOneMap(t *testing.T) { production: false, expectedResponseCount: 4, expectFiles: []string{ - testStoreAddI64Hash + "/outputs/0000000001-0000000010.output", // store outputs testStoreAddI64Hash + "/outputs/0000000010-0000000020.output", testStoreAddI64Hash + "/states/0000000010-0000000001.kv", // store states testStoreAddI64Hash + "/states/0000000020-0000000001.kv", }, + expectTier1Events: true, + expectTier2Events: true, }, { name: "dev_mode_backprocess_then_save_state", @@ -229,6 +232,8 @@ func TestOneStoreOneMap(t *testing.T) { testStoreAddI64Hash + "/states/0000000010-0000000001.kv", // store states testStoreAddI64Hash + "/states/0000000020-0000000001.kv", }, + expectTier1Events: true, + expectTier2Events: true, }, { name: "prod_mode_back_forward_to_lib", @@ -244,6 +249,8 @@ func TestOneStoreOneMap(t *testing.T) { testStoreAddI64Hash + "/states/0000000010-0000000001.kv", testStoreAddI64Hash + "/states/0000000020-0000000001.kv", }, + expectTier1Events: true, + expectTier2Events: true, }, { name: "prod_mode_back_forward_to_stop", @@ -262,6 +269,8 @@ func TestOneStoreOneMap(t *testing.T) { testStoreAddI64Hash + "/states/0000000030-0000000001.kv", assertTestStoreAddI64Hash + "/outputs/0000000020-0000000030.output", // map }, + expectTier1Events: true, + expectTier2Events: true, }, { name: "prod_mode_back_forward_to_stop_nonzero_first_streamable", @@ -280,6 +289,8 @@ func TestOneStoreOneMap(t *testing.T) { testStoreAddI64Hash + "/states/0000000020-0000000016.kv", testStoreAddI64Hash + "/states/0000000030-0000000016.kv", }, + expectTier1Events: true, + expectTier2Events: true, }, { name: "nonzero_first_streamable on nonzero module", @@ -312,6 +323,8 @@ func TestOneStoreOneMap(t *testing.T) { assertTestStoreAddI64Hash + "/outputs/0000000020-0000000030.output", // map assertTestStoreAddI64Hash + "/outputs/0000000030-0000000040.output", }, + expectTier1Events: true, + expectTier2Events: true, }, { name: "prod_mode_partial_existing", @@ -335,6 +348,8 @@ func TestOneStoreOneMap(t *testing.T) { assertTestStoreAddI64Hash + "/outputs/0000000010-0000000020.output", assertTestStoreAddI64Hash + "/outputs/0000000020-0000000030.output", }, + expectTier1Events: true, + expectTier2Events: true, }, } @@ -359,6 +374,24 @@ func TestOneStoreOneMap(t *testing.T) { mapOutput := run.MapOutputString("assert_test_store_add_i64") assert.Contains(t, mapOutput, `assert_test_store_add_i64: 0801`) + var tier1EventsFound bool + for _, ev := range run.Events { + if ev.Endpoint == "tier1" { + tier1EventsFound = true + break + } + } + assert.Equal(t, test.expectTier1Events, tier1EventsFound) + + var tier2EventsFound bool + for _, ev := range run.Events { + if ev.Endpoint == "tier2" { + tier2EventsFound = true + break + } + } + assert.Equal(t, test.expectTier2Events, tier2EventsFound) + assert.Equal(t, test.expectedResponseCount, strings.Count(mapOutput, "\n")) withZST := func(s []string) []string { diff --git a/test/runnable_test.go b/test/runnable_test.go index 67b169b2..f4720688 100644 --- a/test/runnable_test.go +++ b/test/runnable_test.go @@ -12,6 +12,8 @@ import ( "testing" "time" + "github.com/streamingfast/dmetering" + "go.opentelemetry.io/otel/attribute" "github.com/streamingfast/bstream" @@ -59,6 +61,7 @@ type testRun struct { Params map[string]string Responses []*pbsubstreamsrpc.Response + Events []dmetering.Event TempDir string } @@ -85,6 +88,7 @@ func (f *testRun) run(t *testing.T, testName string) error { } ctx = reqctx.WithLogger(ctx, zlog) + ctx = dmetering.WithBytesMeter(ctx) os.Setenv("TEST_TEMP_DIR", f.TempDir) @@ -127,7 +131,8 @@ func (f *testRun) run(t *testing.T, testName string) error { f.ParallelSubrequests = 1 } - responseCollector := newResponseCollector() + ctx = withEventsCollector(ctx, &eventsCollector{}) + responseCollector := newResponseCollector(ctx) newBlockGenerator := func(startBlock uint64, inclusiveStopBlock uint64) TestBlockGenerator { return &LinearBlockGenerator{ @@ -142,7 +147,7 @@ func (f *testRun) run(t *testing.T, testName string) error { workerFactory := func(_ *zap.Logger) work.Worker { return &TestWorker{ t: t, - responseCollector: newResponseCollector(), + responseCollector: newResponseCollector(ctx), newBlockGenerator: newBlockGenerator, blockProcessedCallBack: f.BlockProcessedCallback, jobCallBack: f.JobCallback, @@ -161,6 +166,7 @@ func (f *testRun) run(t *testing.T, testName string) error { } f.Responses = responseCollector.responses + f.Events = responseCollector.eventsCollector.Events() return nil } diff --git a/test/tier2_integration_test.go b/test/tier2_integration_test.go index 5d22a32d..5d54cb6c 100644 --- a/test/tier2_integration_test.go +++ b/test/tier2_integration_test.go @@ -434,7 +434,7 @@ func TestTier2Call(t *testing.T) { StateStoreDefaultTag: "tag", }) - responseCollector := newResponseCollector() + responseCollector := newResponseCollector(ctx) newBlockGenerator := func(startBlock uint64, inclusiveStopBlock uint64) TestBlockGenerator { return &LinearBlockGenerator{