diff --git a/go.mod b/go.mod index 2732f221..2c209df8 100644 --- a/go.mod +++ b/go.mod @@ -57,7 +57,7 @@ require ( github.com/rs/cors v1.10.0 github.com/schollz/closestmatch v2.1.0+incompatible github.com/shopspring/decimal v1.3.1 - github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951 + github.com/streamingfast/dmetering v0.0.0-20241028183059-d11d4ec85e05 github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545 github.com/streamingfast/sf-tracing v0.0.0-20240430173521-888827872b90 github.com/streamingfast/shutter v1.5.0 diff --git a/go.sum b/go.sum index 6790b5a7..979703a0 100644 --- a/go.sum +++ b/go.sum @@ -533,8 +533,8 @@ github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1 h1:xJB7rXnOHLes github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1/go.mod h1:QSm/AfaDsE0k1xBYi0lW580YJ/WDV/FKZI628tkZR0Y= github.com/streamingfast/dgrpc v0.0.0-20240219152146-57bb131c39ca h1:/k5H6MUo5Vi8AKPsSr+TMeA/XJ0uMyEX6feHpOozTlQ= github.com/streamingfast/dgrpc v0.0.0-20240219152146-57bb131c39ca/go.mod h1:NuKCwOHjbT0nRji0O+7+c70AiBfLHEKNoovs/gFfMPY= -github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951 h1:6o6MS3JHrp9A7V6EBHbR7W7mzVCFmXc8U0AjTfvz7PI= -github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951/go.mod h1:UqWuX3REU/IInBUaymFN2eLjuvz+/0SsoUFjeQlLNyI= +github.com/streamingfast/dmetering v0.0.0-20241028183059-d11d4ec85e05 h1:jjx6kO2z0mNSKElL8YHnjt65+NB/CSsJH5C9efyrzw8= +github.com/streamingfast/dmetering v0.0.0-20241028183059-d11d4ec85e05/go.mod h1:UqWuX3REU/IInBUaymFN2eLjuvz+/0SsoUFjeQlLNyI= github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545 h1:SUl04bZKGAv207lp7/6CHOJIRpjUKunwItrno3K463Y= github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545/go.mod h1:JbxEDbzWRG1dHdNIPrYfuPllEkktZMgm40AwVIBENcw= github.com/streamingfast/dstore v0.1.1-0.20241011152904-9acd6205dc14 h1:/2HxIOzAgUBKyxjDO4IJPzBBaEAtzwipb/2/JGsOArA= diff --git a/metering/metering.go b/metering/metering.go index 93b2c7df..7fb11048 100644 --- a/metering/metering.go +++ b/metering/metering.go @@ -113,7 +113,7 @@ func NewMetricsSender() *MetricsSender { } } -func (ms *MetricsSender) Send(ctx context.Context, userID, apiKeyID, ip, userMeta, endpoint string, resp proto.Message) { +func (ms *MetricsSender) Send(ctx context.Context, userID, apiKeyID, ip, userMeta, outputModuleHash, endpoint string, resp proto.Message) { ms.Lock() defer ms.Unlock() @@ -143,10 +143,11 @@ func (ms *MetricsSender) Send(ctx context.Context, userID, apiKeyID, ip, userMet meter.CountInc(TotalWriteBytes, int(totalWriteBytes)) event := dmetering.Event{ - UserID: userID, - ApiKeyID: apiKeyID, - IpAddress: ip, - Meta: userMeta, + UserID: userID, + ApiKeyID: apiKeyID, + IpAddress: ip, + Meta: userMeta, + OutputModuleHash: outputModuleHash, Endpoint: endpoint, Metrics: map[string]float64{ diff --git a/metering/metering_test.go b/metering/metering_test.go index 7b893ecf..afefdb78 100644 --- a/metering/metering_test.go +++ b/metering/metering_test.go @@ -386,8 +386,10 @@ func TestSend(t *testing.T) { metericsSender := NewMetricsSender() + outputModuleHash := "outputModuleHash" + // Call the Send function - metericsSender.Send(ctx, "user1", "apiKey1", "127.0.0.1", "meta", "endpoint", resp) + metericsSender.Send(ctx, "user1", "apiKey1", "127.0.0.1", "meta", outputModuleHash, "endpoint", resp) // Verify the emitted event assert.Len(t, emitter.events, 1) @@ -398,6 +400,7 @@ func TestSend(t *testing.T) { assert.Equal(t, "127.0.0.1", event.IpAddress) assert.Equal(t, "meta", event.Meta) assert.Equal(t, "endpoint", event.Endpoint) + assert.Equal(t, "outputModuleHash", event.OutputModuleHash) assert.Equal(t, float64(proto.Size(resp)), event.Metrics["egress_bytes"]) assert.Equal(t, float64(0), event.Metrics["written_bytes"]) assert.Equal(t, float64(0), event.Metrics["read_bytes"]) @@ -448,7 +451,7 @@ func TestSendParallel(t *testing.T) { meter.CountInc(MeterFileCompressedWriteBytes, 600) time.Sleep(time.Duration(randomInt()) * time.Nanosecond) - metricsSender.Send(ctx, "user1", "apiKey1", "127.0.0.1", "meta", "endpoint", resp) + metricsSender.Send(ctx, "user1", "apiKey1", "127.0.0.1", "meta", "outputModuleHash", "endpoint", resp) }() } diff --git a/reqctx/context.go b/reqctx/context.go index 58d91531..e0531277 100644 --- a/reqctx/context.go +++ b/reqctx/context.go @@ -23,6 +23,8 @@ var tracerKey = contextKeyType(2) var spanKey = contextKeyType(3) var reqStatsKey = contextKeyType(4) var moduleExecutionTracingConfigKey = contextKeyType(5) +var outputModuleHashKey = contextKeyType(6) +var tier2RequestParametersKeyKey = contextKeyType(7) func Logger(ctx context.Context) *zap.Logger { return logging.Logger(ctx, zap.NewNop()) diff --git a/reqctx/metering.go b/reqctx/metering.go index 777d12b9..808deccc 100644 --- a/reqctx/metering.go +++ b/reqctx/metering.go @@ -31,3 +31,17 @@ func IsBackfillerRequest(ctx context.Context) bool { _, ok = md[backFillerKey] return ok } + +type outputModuleKeyType int + +func WithOutputModuleHash(ctx context.Context, hash string) context.Context { + return context.WithValue(ctx, outputModuleHashKey, hash) +} + +func OutputModuleHash(ctx context.Context) string { + hash, ok := ctx.Value(outputModuleHashKey).(string) + if !ok { + return "" + } + return hash +} diff --git a/reqctx/tier2request.go b/reqctx/tier2request.go index 3b389ea9..617e2297 100644 --- a/reqctx/tier2request.go +++ b/reqctx/tier2request.go @@ -16,10 +16,6 @@ type Tier2RequestParameters struct { WASMModules map[string]string } -type tier2RequestParametersKey int - -const tier2RequestParametersKeyKey = tier2RequestParametersKey(0) - func WithTier2RequestParameters(ctx context.Context, parameters Tier2RequestParameters) context.Context { return context.WithValue(ctx, tier2RequestParametersKeyKey, parameters) } diff --git a/service/tier1.go b/service/tier1.go index 95764647..545f7e6b 100644 --- a/service/tier1.go +++ b/service/tier1.go @@ -209,6 +209,18 @@ func (s *Tier1Service) Blocks( ctx, span := reqctx.WithSpan(ctx, "substreams/tier1/request") defer span.EndWithErr(&err) + request := req.Msg + if request.Modules == nil { + return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("missing modules in request")) + } + + execGraph, err := exec.NewOutputModuleGraph(request.OutputModule, request.ProductionMode, request.Modules, bstream.GetProtocolFirstStreamableBlock) + if err != nil { + return bsstream.NewErrInvalidArg(err.Error()) + } + outputModuleHash := execGraph.ModuleHashes().Get(request.OutputModule) + ctx = reqctx.WithOutputModuleHash(ctx, outputModuleHash) + // We need to ensure that the response function is NEVER used after this Blocks handler has returned. // We use a context that will be canceled on defer, and a lock to prevent races. The respFunc is used in various threads mut := sync.Mutex{} @@ -223,21 +235,10 @@ func (s *Tier1Service) Blocks( span.SetAttributes(attribute.Int64("substreams.tier", 1)) - request := req.Msg - if request.Modules == nil { - return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("missing modules in request")) - } - if err := ValidateTier1Request(request, s.blockType); err != nil { return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("validate request: %w", err)) } - execGraph, err := exec.NewOutputModuleGraph(request.OutputModule, request.ProductionMode, request.Modules, bstream.GetProtocolFirstStreamableBlock) - if err != nil { - return bsstream.NewErrInvalidArg(err.Error()) - } - outputModuleHash := execGraph.ModuleHashes().Get(request.OutputModule) - moduleNames := make([]string, len(request.Modules.Modules)) for i := 0; i < len(moduleNames); i++ { moduleNames[i] = request.Modules.Modules[i].Name @@ -607,6 +608,8 @@ func tier1ResponseHandler(ctx context.Context, mut *sync.Mutex, logger *zap.Logg userMeta := auth.Meta() ip := auth.RealIP() + outputModuleHash := reqctx.OutputModuleHash(ctx) + ctx = reqctx.WithEmitter(ctx, dmetering.GetDefaultEmitter()) metericsSender := metering.GetMetricsSender(ctx) @@ -624,7 +627,7 @@ func tier1ResponseHandler(ctx context.Context, mut *sync.Mutex, logger *zap.Logg return connect.NewError(connect.CodeUnavailable, err) } - metericsSender.Send(ctx, userID, apiKeyID, ip, userMeta, "sf.substreams.rpc.v2/Blocks", resp) + metericsSender.Send(ctx, userID, apiKeyID, ip, userMeta, outputModuleHash, "sf.substreams.rpc.v2/Blocks", resp) return nil } } diff --git a/service/tier2.go b/service/tier2.go index 267f5363..e649501e 100644 --- a/service/tier2.go +++ b/service/tier2.go @@ -206,6 +206,13 @@ func (s *Tier2Service) ProcessRange(request *pbssinternal.ProcessRangeRequest, s return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("validate request: %w", err)) } + execGraph, err := exec.NewOutputModuleGraph(request.OutputModule, true, request.Modules, request.FirstStreamableBlock) //production-mode flag is irrelevant here because it isn't used to calculate the hashes + if err != nil { + return bsstream.NewErrInvalidArg(err.Error()) + } + outputModuleHash := execGraph.ModuleHashes().Get(request.OutputModule) + ctx = reqctx.WithOutputModuleHash(ctx, outputModuleHash) + emitter, err := dmetering.New(request.MeteringConfig, logger) if err != nil { return connect.NewError(connect.CodeInternal, fmt.Errorf("unable to initialize dmetering: %w", err)) @@ -507,6 +514,7 @@ func tier2ResponseHandler(ctx context.Context, logger *zap.Logger, streamSrv pbs logger.Warn("no auth information available in tier2 response handler") } + outputModuleHash := reqctx.OutputModuleHash(ctx) metricsSender := metering.GetMetricsSender(ctx) return func(respAny substreams.ResponseFromAnyTier) error { @@ -523,7 +531,7 @@ func tier2ResponseHandler(ctx context.Context, logger *zap.Logger, streamSrv pbs zap.String("user_meta", userMeta), zap.String("endpoint", "sf.substreams.internal.v2/ProcessRange"), ) - metricsSender.Send(ctx, userID, apiKeyID, ip, userMeta, "sf.substreams.internal.v2/ProcessRange", resp) + metricsSender.Send(ctx, userID, apiKeyID, ip, userMeta, outputModuleHash, "sf.substreams.internal.v2/ProcessRange", resp) return nil } } diff --git a/test/collector_test.go b/test/collector_test.go index d47b6579..3921815e 100644 --- a/test/collector_test.go +++ b/test/collector_test.go @@ -64,10 +64,10 @@ func (c *responseCollector) Collect(respAny substreams.ResponseFromAnyTier) erro switch resp := respAny.(type) { case *pbsubstreamsrpc.Response: c.responses = append(c.responses, resp) - c.sender.Send(c.ctx, "test_user", "test_api_key", "10.0.0.1", "test_meta", "tier1", resp) + c.sender.Send(c.ctx, "test_user", "test_api_key", "10.0.0.1", "test_meta", "testOutputHash", "tier1", resp) case *pbssinternal.ProcessRangeResponse: c.internalResponses = append(c.internalResponses, resp) - c.sender.Send(c.ctx, "test_user", "test_api_key", "10.0.0.1", "test_meta", "tier2", resp) + c.sender.Send(c.ctx, "test_user", "test_api_key", "10.0.0.1", "test_meta", "testOutputHash", "tier2", resp) } return nil }