From 51ca4a534bc4a7eda1a2a3c08da234c493e56707 Mon Sep 17 00:00:00 2001 From: colindickson Date: Mon, 28 Oct 2024 14:34:45 -0400 Subject: [PATCH 1/3] add output module hash to metering --- go.mod | 2 +- go.sum | 4 ++-- metering/metering.go | 11 +++++++---- reqctx/context.go | 2 ++ reqctx/metering.go | 14 ++++++++++++++ reqctx/tier2request.go | 4 ---- service/tier1.go | 17 +++++++++-------- service/tier2.go | 9 +++++++++ 8 files changed, 44 insertions(+), 19 deletions(-) diff --git a/go.mod b/go.mod index 2732f221c..2c209df89 100644 --- a/go.mod +++ b/go.mod @@ -57,7 +57,7 @@ require ( github.com/rs/cors v1.10.0 github.com/schollz/closestmatch v2.1.0+incompatible github.com/shopspring/decimal v1.3.1 - github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951 + github.com/streamingfast/dmetering v0.0.0-20241028183059-d11d4ec85e05 github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545 github.com/streamingfast/sf-tracing v0.0.0-20240430173521-888827872b90 github.com/streamingfast/shutter v1.5.0 diff --git a/go.sum b/go.sum index 6790b5a78..979703a08 100644 --- a/go.sum +++ b/go.sum @@ -533,8 +533,8 @@ github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1 h1:xJB7rXnOHLes github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1/go.mod h1:QSm/AfaDsE0k1xBYi0lW580YJ/WDV/FKZI628tkZR0Y= github.com/streamingfast/dgrpc v0.0.0-20240219152146-57bb131c39ca h1:/k5H6MUo5Vi8AKPsSr+TMeA/XJ0uMyEX6feHpOozTlQ= github.com/streamingfast/dgrpc v0.0.0-20240219152146-57bb131c39ca/go.mod h1:NuKCwOHjbT0nRji0O+7+c70AiBfLHEKNoovs/gFfMPY= -github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951 h1:6o6MS3JHrp9A7V6EBHbR7W7mzVCFmXc8U0AjTfvz7PI= -github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951/go.mod h1:UqWuX3REU/IInBUaymFN2eLjuvz+/0SsoUFjeQlLNyI= +github.com/streamingfast/dmetering v0.0.0-20241028183059-d11d4ec85e05 h1:jjx6kO2z0mNSKElL8YHnjt65+NB/CSsJH5C9efyrzw8= +github.com/streamingfast/dmetering v0.0.0-20241028183059-d11d4ec85e05/go.mod h1:UqWuX3REU/IInBUaymFN2eLjuvz+/0SsoUFjeQlLNyI= github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545 h1:SUl04bZKGAv207lp7/6CHOJIRpjUKunwItrno3K463Y= github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545/go.mod h1:JbxEDbzWRG1dHdNIPrYfuPllEkktZMgm40AwVIBENcw= github.com/streamingfast/dstore v0.1.1-0.20241011152904-9acd6205dc14 h1:/2HxIOzAgUBKyxjDO4IJPzBBaEAtzwipb/2/JGsOArA= diff --git a/metering/metering.go b/metering/metering.go index 93b2c7df3..57c6d5b8c 100644 --- a/metering/metering.go +++ b/metering/metering.go @@ -121,6 +121,8 @@ func (ms *MetricsSender) Send(ctx context.Context, userID, apiKeyID, ip, userMet endpoint = fmt.Sprintf("%s%s", endpoint, "Backfill") } + outputModuleHash := reqctx.OutputModuleHash(ctx) + meter := dmetering.GetBytesMeter(ctx) bytesRead := meter.BytesReadDelta() @@ -143,10 +145,11 @@ func (ms *MetricsSender) Send(ctx context.Context, userID, apiKeyID, ip, userMet meter.CountInc(TotalWriteBytes, int(totalWriteBytes)) event := dmetering.Event{ - UserID: userID, - ApiKeyID: apiKeyID, - IpAddress: ip, - Meta: userMeta, + UserID: userID, + ApiKeyID: apiKeyID, + IpAddress: ip, + Meta: userMeta, + OutputModuleHash: outputModuleHash, Endpoint: endpoint, Metrics: map[string]float64{ diff --git a/reqctx/context.go b/reqctx/context.go index 58d915312..e05312774 100644 --- a/reqctx/context.go +++ b/reqctx/context.go @@ -23,6 +23,8 @@ var tracerKey = contextKeyType(2) var spanKey = contextKeyType(3) var reqStatsKey = contextKeyType(4) var moduleExecutionTracingConfigKey = contextKeyType(5) +var outputModuleHashKey = contextKeyType(6) +var tier2RequestParametersKeyKey = contextKeyType(7) func Logger(ctx context.Context) *zap.Logger { return logging.Logger(ctx, zap.NewNop()) diff --git a/reqctx/metering.go b/reqctx/metering.go index 777d12b90..808deccc4 100644 --- a/reqctx/metering.go +++ b/reqctx/metering.go @@ -31,3 +31,17 @@ func IsBackfillerRequest(ctx context.Context) bool { _, ok = md[backFillerKey] return ok } + +type outputModuleKeyType int + +func WithOutputModuleHash(ctx context.Context, hash string) context.Context { + return context.WithValue(ctx, outputModuleHashKey, hash) +} + +func OutputModuleHash(ctx context.Context) string { + hash, ok := ctx.Value(outputModuleHashKey).(string) + if !ok { + return "" + } + return hash +} diff --git a/reqctx/tier2request.go b/reqctx/tier2request.go index 3b389ea94..617e2297e 100644 --- a/reqctx/tier2request.go +++ b/reqctx/tier2request.go @@ -16,10 +16,6 @@ type Tier2RequestParameters struct { WASMModules map[string]string } -type tier2RequestParametersKey int - -const tier2RequestParametersKeyKey = tier2RequestParametersKey(0) - func WithTier2RequestParameters(ctx context.Context, parameters Tier2RequestParameters) context.Context { return context.WithValue(ctx, tier2RequestParametersKeyKey, parameters) } diff --git a/service/tier1.go b/service/tier1.go index 957646478..e8ddc118c 100644 --- a/service/tier1.go +++ b/service/tier1.go @@ -219,24 +219,25 @@ func (s *Tier1Service) Blocks( mut.Unlock() }() - respFunc := tier1ResponseHandler(respContext, &mut, logger, stream) - - span.SetAttributes(attribute.Int64("substreams.tier", 1)) - request := req.Msg if request.Modules == nil { return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("missing modules in request")) } - if err := ValidateTier1Request(request, s.blockType); err != nil { - return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("validate request: %w", err)) - } - execGraph, err := exec.NewOutputModuleGraph(request.OutputModule, request.ProductionMode, request.Modules, bstream.GetProtocolFirstStreamableBlock) if err != nil { return bsstream.NewErrInvalidArg(err.Error()) } outputModuleHash := execGraph.ModuleHashes().Get(request.OutputModule) + ctx = reqctx.WithOutputModuleHash(ctx, outputModuleHash) + + respFunc := tier1ResponseHandler(respContext, &mut, logger, stream) + + span.SetAttributes(attribute.Int64("substreams.tier", 1)) + + if err := ValidateTier1Request(request, s.blockType); err != nil { + return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("validate request: %w", err)) + } moduleNames := make([]string, len(request.Modules.Modules)) for i := 0; i < len(moduleNames); i++ { diff --git a/service/tier2.go b/service/tier2.go index 267f53638..b28a490fb 100644 --- a/service/tier2.go +++ b/service/tier2.go @@ -9,6 +9,8 @@ import ( "sync" "time" + "github.com/streamingfast/bstream" + "connectrpc.com/connect" "github.com/RoaringBitmap/roaring/roaring64" "github.com/streamingfast/bstream/stream" @@ -206,6 +208,13 @@ func (s *Tier2Service) ProcessRange(request *pbssinternal.ProcessRangeRequest, s return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("validate request: %w", err)) } + execGraph, err := exec.NewOutputModuleGraph(request.OutputModule, true, request.Modules, bstream.GetProtocolFirstStreamableBlock) + if err != nil { + return bsstream.NewErrInvalidArg(err.Error()) + } + outputModuleHash := execGraph.ModuleHashes().Get(request.OutputModule) + ctx = reqctx.WithOutputModuleHash(ctx, outputModuleHash) + emitter, err := dmetering.New(request.MeteringConfig, logger) if err != nil { return connect.NewError(connect.CodeInternal, fmt.Errorf("unable to initialize dmetering: %w", err)) From 543731863249664e43aa58638a7d5ee265409b74 Mon Sep 17 00:00:00 2001 From: colindickson Date: Mon, 28 Oct 2024 15:52:04 -0400 Subject: [PATCH 2/3] tier2: fix first streamable block param --- service/tier2.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/service/tier2.go b/service/tier2.go index b28a490fb..4cf05acae 100644 --- a/service/tier2.go +++ b/service/tier2.go @@ -9,8 +9,6 @@ import ( "sync" "time" - "github.com/streamingfast/bstream" - "connectrpc.com/connect" "github.com/RoaringBitmap/roaring/roaring64" "github.com/streamingfast/bstream/stream" @@ -208,7 +206,7 @@ func (s *Tier2Service) ProcessRange(request *pbssinternal.ProcessRangeRequest, s return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("validate request: %w", err)) } - execGraph, err := exec.NewOutputModuleGraph(request.OutputModule, true, request.Modules, bstream.GetProtocolFirstStreamableBlock) + execGraph, err := exec.NewOutputModuleGraph(request.OutputModule, true, request.Modules, request.FirstStreamableBlock) //production-mode flag is irrelevant here because it isn't used to calculate the hashes if err != nil { return bsstream.NewErrInvalidArg(err.Error()) } From 8ed265e2bbbb123fb616aba4073062b87b672f9a Mon Sep 17 00:00:00 2001 From: colindickson Date: Wed, 30 Oct 2024 14:26:48 -0400 Subject: [PATCH 3/3] fix output hash in tier1 respFunc --- metering/metering.go | 4 +--- metering/metering_test.go | 7 +++++-- service/tier1.go | 24 +++++++++++++----------- service/tier2.go | 3 ++- test/collector_test.go | 4 ++-- 5 files changed, 23 insertions(+), 19 deletions(-) diff --git a/metering/metering.go b/metering/metering.go index 57c6d5b8c..7fb110483 100644 --- a/metering/metering.go +++ b/metering/metering.go @@ -113,7 +113,7 @@ func NewMetricsSender() *MetricsSender { } } -func (ms *MetricsSender) Send(ctx context.Context, userID, apiKeyID, ip, userMeta, endpoint string, resp proto.Message) { +func (ms *MetricsSender) Send(ctx context.Context, userID, apiKeyID, ip, userMeta, outputModuleHash, endpoint string, resp proto.Message) { ms.Lock() defer ms.Unlock() @@ -121,8 +121,6 @@ func (ms *MetricsSender) Send(ctx context.Context, userID, apiKeyID, ip, userMet endpoint = fmt.Sprintf("%s%s", endpoint, "Backfill") } - outputModuleHash := reqctx.OutputModuleHash(ctx) - meter := dmetering.GetBytesMeter(ctx) bytesRead := meter.BytesReadDelta() diff --git a/metering/metering_test.go b/metering/metering_test.go index 7b893ecf1..afefdb784 100644 --- a/metering/metering_test.go +++ b/metering/metering_test.go @@ -386,8 +386,10 @@ func TestSend(t *testing.T) { metericsSender := NewMetricsSender() + outputModuleHash := "outputModuleHash" + // Call the Send function - metericsSender.Send(ctx, "user1", "apiKey1", "127.0.0.1", "meta", "endpoint", resp) + metericsSender.Send(ctx, "user1", "apiKey1", "127.0.0.1", "meta", outputModuleHash, "endpoint", resp) // Verify the emitted event assert.Len(t, emitter.events, 1) @@ -398,6 +400,7 @@ func TestSend(t *testing.T) { assert.Equal(t, "127.0.0.1", event.IpAddress) assert.Equal(t, "meta", event.Meta) assert.Equal(t, "endpoint", event.Endpoint) + assert.Equal(t, "outputModuleHash", event.OutputModuleHash) assert.Equal(t, float64(proto.Size(resp)), event.Metrics["egress_bytes"]) assert.Equal(t, float64(0), event.Metrics["written_bytes"]) assert.Equal(t, float64(0), event.Metrics["read_bytes"]) @@ -448,7 +451,7 @@ func TestSendParallel(t *testing.T) { meter.CountInc(MeterFileCompressedWriteBytes, 600) time.Sleep(time.Duration(randomInt()) * time.Nanosecond) - metricsSender.Send(ctx, "user1", "apiKey1", "127.0.0.1", "meta", "endpoint", resp) + metricsSender.Send(ctx, "user1", "apiKey1", "127.0.0.1", "meta", "outputModuleHash", "endpoint", resp) }() } diff --git a/service/tier1.go b/service/tier1.go index e8ddc118c..545f7e6b9 100644 --- a/service/tier1.go +++ b/service/tier1.go @@ -209,16 +209,6 @@ func (s *Tier1Service) Blocks( ctx, span := reqctx.WithSpan(ctx, "substreams/tier1/request") defer span.EndWithErr(&err) - // We need to ensure that the response function is NEVER used after this Blocks handler has returned. - // We use a context that will be canceled on defer, and a lock to prevent races. The respFunc is used in various threads - mut := sync.Mutex{} - respContext, cancel := context.WithCancel(ctx) - defer func() { - mut.Lock() - cancel() - mut.Unlock() - }() - request := req.Msg if request.Modules == nil { return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("missing modules in request")) @@ -231,6 +221,16 @@ func (s *Tier1Service) Blocks( outputModuleHash := execGraph.ModuleHashes().Get(request.OutputModule) ctx = reqctx.WithOutputModuleHash(ctx, outputModuleHash) + // We need to ensure that the response function is NEVER used after this Blocks handler has returned. + // We use a context that will be canceled on defer, and a lock to prevent races. The respFunc is used in various threads + mut := sync.Mutex{} + respContext, cancel := context.WithCancel(ctx) + defer func() { + mut.Lock() + cancel() + mut.Unlock() + }() + respFunc := tier1ResponseHandler(respContext, &mut, logger, stream) span.SetAttributes(attribute.Int64("substreams.tier", 1)) @@ -608,6 +608,8 @@ func tier1ResponseHandler(ctx context.Context, mut *sync.Mutex, logger *zap.Logg userMeta := auth.Meta() ip := auth.RealIP() + outputModuleHash := reqctx.OutputModuleHash(ctx) + ctx = reqctx.WithEmitter(ctx, dmetering.GetDefaultEmitter()) metericsSender := metering.GetMetricsSender(ctx) @@ -625,7 +627,7 @@ func tier1ResponseHandler(ctx context.Context, mut *sync.Mutex, logger *zap.Logg return connect.NewError(connect.CodeUnavailable, err) } - metericsSender.Send(ctx, userID, apiKeyID, ip, userMeta, "sf.substreams.rpc.v2/Blocks", resp) + metericsSender.Send(ctx, userID, apiKeyID, ip, userMeta, outputModuleHash, "sf.substreams.rpc.v2/Blocks", resp) return nil } } diff --git a/service/tier2.go b/service/tier2.go index 4cf05acae..e649501e1 100644 --- a/service/tier2.go +++ b/service/tier2.go @@ -514,6 +514,7 @@ func tier2ResponseHandler(ctx context.Context, logger *zap.Logger, streamSrv pbs logger.Warn("no auth information available in tier2 response handler") } + outputModuleHash := reqctx.OutputModuleHash(ctx) metricsSender := metering.GetMetricsSender(ctx) return func(respAny substreams.ResponseFromAnyTier) error { @@ -530,7 +531,7 @@ func tier2ResponseHandler(ctx context.Context, logger *zap.Logger, streamSrv pbs zap.String("user_meta", userMeta), zap.String("endpoint", "sf.substreams.internal.v2/ProcessRange"), ) - metricsSender.Send(ctx, userID, apiKeyID, ip, userMeta, "sf.substreams.internal.v2/ProcessRange", resp) + metricsSender.Send(ctx, userID, apiKeyID, ip, userMeta, outputModuleHash, "sf.substreams.internal.v2/ProcessRange", resp) return nil } } diff --git a/test/collector_test.go b/test/collector_test.go index d47b65791..3921815eb 100644 --- a/test/collector_test.go +++ b/test/collector_test.go @@ -64,10 +64,10 @@ func (c *responseCollector) Collect(respAny substreams.ResponseFromAnyTier) erro switch resp := respAny.(type) { case *pbsubstreamsrpc.Response: c.responses = append(c.responses, resp) - c.sender.Send(c.ctx, "test_user", "test_api_key", "10.0.0.1", "test_meta", "tier1", resp) + c.sender.Send(c.ctx, "test_user", "test_api_key", "10.0.0.1", "test_meta", "testOutputHash", "tier1", resp) case *pbssinternal.ProcessRangeResponse: c.internalResponses = append(c.internalResponses, resp) - c.sender.Send(c.ctx, "test_user", "test_api_key", "10.0.0.1", "test_meta", "tier2", resp) + c.sender.Send(c.ctx, "test_user", "test_api_key", "10.0.0.1", "test_meta", "testOutputHash", "tier2", resp) } return nil }