Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add output module hash to metering #557

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ require (
github.com/rs/cors v1.10.0
github.com/schollz/closestmatch v2.1.0+incompatible
github.com/shopspring/decimal v1.3.1
github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951
github.com/streamingfast/dmetering v0.0.0-20241028183059-d11d4ec85e05
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545
github.com/streamingfast/sf-tracing v0.0.0-20240430173521-888827872b90
github.com/streamingfast/shutter v1.5.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -533,8 +533,8 @@ github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1 h1:xJB7rXnOHLes
github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1/go.mod h1:QSm/AfaDsE0k1xBYi0lW580YJ/WDV/FKZI628tkZR0Y=
github.com/streamingfast/dgrpc v0.0.0-20240219152146-57bb131c39ca h1:/k5H6MUo5Vi8AKPsSr+TMeA/XJ0uMyEX6feHpOozTlQ=
github.com/streamingfast/dgrpc v0.0.0-20240219152146-57bb131c39ca/go.mod h1:NuKCwOHjbT0nRji0O+7+c70AiBfLHEKNoovs/gFfMPY=
github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951 h1:6o6MS3JHrp9A7V6EBHbR7W7mzVCFmXc8U0AjTfvz7PI=
github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951/go.mod h1:UqWuX3REU/IInBUaymFN2eLjuvz+/0SsoUFjeQlLNyI=
github.com/streamingfast/dmetering v0.0.0-20241028183059-d11d4ec85e05 h1:jjx6kO2z0mNSKElL8YHnjt65+NB/CSsJH5C9efyrzw8=
github.com/streamingfast/dmetering v0.0.0-20241028183059-d11d4ec85e05/go.mod h1:UqWuX3REU/IInBUaymFN2eLjuvz+/0SsoUFjeQlLNyI=
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545 h1:SUl04bZKGAv207lp7/6CHOJIRpjUKunwItrno3K463Y=
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545/go.mod h1:JbxEDbzWRG1dHdNIPrYfuPllEkktZMgm40AwVIBENcw=
github.com/streamingfast/dstore v0.1.1-0.20241011152904-9acd6205dc14 h1:/2HxIOzAgUBKyxjDO4IJPzBBaEAtzwipb/2/JGsOArA=
Expand Down
11 changes: 6 additions & 5 deletions metering/metering.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func NewMetricsSender() *MetricsSender {
}
}

func (ms *MetricsSender) Send(ctx context.Context, userID, apiKeyID, ip, userMeta, endpoint string, resp proto.Message) {
func (ms *MetricsSender) Send(ctx context.Context, userID, apiKeyID, ip, userMeta, outputModuleHash, endpoint string, resp proto.Message) {
ms.Lock()
defer ms.Unlock()

Expand Down Expand Up @@ -143,10 +143,11 @@ func (ms *MetricsSender) Send(ctx context.Context, userID, apiKeyID, ip, userMet
meter.CountInc(TotalWriteBytes, int(totalWriteBytes))

event := dmetering.Event{
UserID: userID,
ApiKeyID: apiKeyID,
IpAddress: ip,
Meta: userMeta,
UserID: userID,
ApiKeyID: apiKeyID,
IpAddress: ip,
Meta: userMeta,
OutputModuleHash: outputModuleHash,

Endpoint: endpoint,
Metrics: map[string]float64{
Expand Down
7 changes: 5 additions & 2 deletions metering/metering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,10 @@ func TestSend(t *testing.T) {

metericsSender := NewMetricsSender()

outputModuleHash := "outputModuleHash"

// Call the Send function
metericsSender.Send(ctx, "user1", "apiKey1", "127.0.0.1", "meta", "endpoint", resp)
metericsSender.Send(ctx, "user1", "apiKey1", "127.0.0.1", "meta", outputModuleHash, "endpoint", resp)

// Verify the emitted event
assert.Len(t, emitter.events, 1)
Expand All @@ -398,6 +400,7 @@ func TestSend(t *testing.T) {
assert.Equal(t, "127.0.0.1", event.IpAddress)
assert.Equal(t, "meta", event.Meta)
assert.Equal(t, "endpoint", event.Endpoint)
assert.Equal(t, "outputModuleHash", event.OutputModuleHash)
assert.Equal(t, float64(proto.Size(resp)), event.Metrics["egress_bytes"])
assert.Equal(t, float64(0), event.Metrics["written_bytes"])
assert.Equal(t, float64(0), event.Metrics["read_bytes"])
Expand Down Expand Up @@ -448,7 +451,7 @@ func TestSendParallel(t *testing.T) {
meter.CountInc(MeterFileCompressedWriteBytes, 600)

time.Sleep(time.Duration(randomInt()) * time.Nanosecond)
metricsSender.Send(ctx, "user1", "apiKey1", "127.0.0.1", "meta", "endpoint", resp)
metricsSender.Send(ctx, "user1", "apiKey1", "127.0.0.1", "meta", "outputModuleHash", "endpoint", resp)
}()
}

Expand Down
2 changes: 2 additions & 0 deletions reqctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ var tracerKey = contextKeyType(2)
var spanKey = contextKeyType(3)
var reqStatsKey = contextKeyType(4)
var moduleExecutionTracingConfigKey = contextKeyType(5)
var outputModuleHashKey = contextKeyType(6)
var tier2RequestParametersKeyKey = contextKeyType(7)

func Logger(ctx context.Context) *zap.Logger {
return logging.Logger(ctx, zap.NewNop())
Expand Down
14 changes: 14 additions & 0 deletions reqctx/metering.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,17 @@ func IsBackfillerRequest(ctx context.Context) bool {
_, ok = md[backFillerKey]
return ok
}

type outputModuleKeyType int

func WithOutputModuleHash(ctx context.Context, hash string) context.Context {
return context.WithValue(ctx, outputModuleHashKey, hash)
}

func OutputModuleHash(ctx context.Context) string {
hash, ok := ctx.Value(outputModuleHashKey).(string)
if !ok {
return ""
}
return hash
}
4 changes: 0 additions & 4 deletions reqctx/tier2request.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ type Tier2RequestParameters struct {
WASMModules map[string]string
}

type tier2RequestParametersKey int

const tier2RequestParametersKeyKey = tier2RequestParametersKey(0)

func WithTier2RequestParameters(ctx context.Context, parameters Tier2RequestParameters) context.Context {
return context.WithValue(ctx, tier2RequestParametersKeyKey, parameters)
}
Expand Down
27 changes: 15 additions & 12 deletions service/tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,18 @@ func (s *Tier1Service) Blocks(
ctx, span := reqctx.WithSpan(ctx, "substreams/tier1/request")
defer span.EndWithErr(&err)

request := req.Msg
if request.Modules == nil {
return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("missing modules in request"))
}

execGraph, err := exec.NewOutputModuleGraph(request.OutputModule, request.ProductionMode, request.Modules, bstream.GetProtocolFirstStreamableBlock)
if err != nil {
return bsstream.NewErrInvalidArg(err.Error())
}
outputModuleHash := execGraph.ModuleHashes().Get(request.OutputModule)
ctx = reqctx.WithOutputModuleHash(ctx, outputModuleHash)

// We need to ensure that the response function is NEVER used after this Blocks handler has returned.
// We use a context that will be canceled on defer, and a lock to prevent races. The respFunc is used in various threads
mut := sync.Mutex{}
Expand All @@ -223,21 +235,10 @@ func (s *Tier1Service) Blocks(

span.SetAttributes(attribute.Int64("substreams.tier", 1))

request := req.Msg
if request.Modules == nil {
return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("missing modules in request"))
}

if err := ValidateTier1Request(request, s.blockType); err != nil {
return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("validate request: %w", err))
}

execGraph, err := exec.NewOutputModuleGraph(request.OutputModule, request.ProductionMode, request.Modules, bstream.GetProtocolFirstStreamableBlock)
if err != nil {
return bsstream.NewErrInvalidArg(err.Error())
}
outputModuleHash := execGraph.ModuleHashes().Get(request.OutputModule)

moduleNames := make([]string, len(request.Modules.Modules))
for i := 0; i < len(moduleNames); i++ {
moduleNames[i] = request.Modules.Modules[i].Name
Expand Down Expand Up @@ -607,6 +608,8 @@ func tier1ResponseHandler(ctx context.Context, mut *sync.Mutex, logger *zap.Logg
userMeta := auth.Meta()
ip := auth.RealIP()

outputModuleHash := reqctx.OutputModuleHash(ctx)

ctx = reqctx.WithEmitter(ctx, dmetering.GetDefaultEmitter())
metericsSender := metering.GetMetricsSender(ctx)

Expand All @@ -624,7 +627,7 @@ func tier1ResponseHandler(ctx context.Context, mut *sync.Mutex, logger *zap.Logg
return connect.NewError(connect.CodeUnavailable, err)
}

metericsSender.Send(ctx, userID, apiKeyID, ip, userMeta, "sf.substreams.rpc.v2/Blocks", resp)
metericsSender.Send(ctx, userID, apiKeyID, ip, userMeta, outputModuleHash, "sf.substreams.rpc.v2/Blocks", resp)
return nil
}
}
Expand Down
10 changes: 9 additions & 1 deletion service/tier2.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,13 @@ func (s *Tier2Service) ProcessRange(request *pbssinternal.ProcessRangeRequest, s
return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("validate request: %w", err))
}

execGraph, err := exec.NewOutputModuleGraph(request.OutputModule, true, request.Modules, request.FirstStreamableBlock) //production-mode flag is irrelevant here because it isn't used to calculate the hashes
if err != nil {
return bsstream.NewErrInvalidArg(err.Error())
}
outputModuleHash := execGraph.ModuleHashes().Get(request.OutputModule)
ctx = reqctx.WithOutputModuleHash(ctx, outputModuleHash)

emitter, err := dmetering.New(request.MeteringConfig, logger)
if err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("unable to initialize dmetering: %w", err))
Expand Down Expand Up @@ -507,6 +514,7 @@ func tier2ResponseHandler(ctx context.Context, logger *zap.Logger, streamSrv pbs
logger.Warn("no auth information available in tier2 response handler")
}

outputModuleHash := reqctx.OutputModuleHash(ctx)
metricsSender := metering.GetMetricsSender(ctx)

return func(respAny substreams.ResponseFromAnyTier) error {
Expand All @@ -523,7 +531,7 @@ func tier2ResponseHandler(ctx context.Context, logger *zap.Logger, streamSrv pbs
zap.String("user_meta", userMeta),
zap.String("endpoint", "sf.substreams.internal.v2/ProcessRange"),
)
metricsSender.Send(ctx, userID, apiKeyID, ip, userMeta, "sf.substreams.internal.v2/ProcessRange", resp)
metricsSender.Send(ctx, userID, apiKeyID, ip, userMeta, outputModuleHash, "sf.substreams.internal.v2/ProcessRange", resp)
return nil
}
}
Expand Down
4 changes: 2 additions & 2 deletions test/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ func (c *responseCollector) Collect(respAny substreams.ResponseFromAnyTier) erro
switch resp := respAny.(type) {
case *pbsubstreamsrpc.Response:
c.responses = append(c.responses, resp)
c.sender.Send(c.ctx, "test_user", "test_api_key", "10.0.0.1", "test_meta", "tier1", resp)
c.sender.Send(c.ctx, "test_user", "test_api_key", "10.0.0.1", "test_meta", "testOutputHash", "tier1", resp)
case *pbssinternal.ProcessRangeResponse:
c.internalResponses = append(c.internalResponses, resp)
c.sender.Send(c.ctx, "test_user", "test_api_key", "10.0.0.1", "test_meta", "tier2", resp)
c.sender.Send(c.ctx, "test_user", "test_api_key", "10.0.0.1", "test_meta", "testOutputHash", "tier2", resp)
}
return nil
}
Loading