Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add output module hash to metering #557

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ require (
github.com/rs/cors v1.10.0
github.com/schollz/closestmatch v2.1.0+incompatible
github.com/shopspring/decimal v1.3.1
github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951
github.com/streamingfast/dmetering v0.0.0-20241028183059-d11d4ec85e05
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545
github.com/streamingfast/sf-tracing v0.0.0-20240430173521-888827872b90
github.com/streamingfast/shutter v1.5.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -533,8 +533,8 @@ github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1 h1:xJB7rXnOHLes
github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1/go.mod h1:QSm/AfaDsE0k1xBYi0lW580YJ/WDV/FKZI628tkZR0Y=
github.com/streamingfast/dgrpc v0.0.0-20240219152146-57bb131c39ca h1:/k5H6MUo5Vi8AKPsSr+TMeA/XJ0uMyEX6feHpOozTlQ=
github.com/streamingfast/dgrpc v0.0.0-20240219152146-57bb131c39ca/go.mod h1:NuKCwOHjbT0nRji0O+7+c70AiBfLHEKNoovs/gFfMPY=
github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951 h1:6o6MS3JHrp9A7V6EBHbR7W7mzVCFmXc8U0AjTfvz7PI=
github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951/go.mod h1:UqWuX3REU/IInBUaymFN2eLjuvz+/0SsoUFjeQlLNyI=
github.com/streamingfast/dmetering v0.0.0-20241028183059-d11d4ec85e05 h1:jjx6kO2z0mNSKElL8YHnjt65+NB/CSsJH5C9efyrzw8=
github.com/streamingfast/dmetering v0.0.0-20241028183059-d11d4ec85e05/go.mod h1:UqWuX3REU/IInBUaymFN2eLjuvz+/0SsoUFjeQlLNyI=
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545 h1:SUl04bZKGAv207lp7/6CHOJIRpjUKunwItrno3K463Y=
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545/go.mod h1:JbxEDbzWRG1dHdNIPrYfuPllEkktZMgm40AwVIBENcw=
github.com/streamingfast/dstore v0.1.1-0.20241011152904-9acd6205dc14 h1:/2HxIOzAgUBKyxjDO4IJPzBBaEAtzwipb/2/JGsOArA=
Expand Down
11 changes: 7 additions & 4 deletions metering/metering.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ func (ms *MetricsSender) Send(ctx context.Context, userID, apiKeyID, ip, userMet
endpoint = fmt.Sprintf("%s%s", endpoint, "Backfill")
}

outputModuleHash := reqctx.OutputModuleHash(ctx)

meter := dmetering.GetBytesMeter(ctx)

bytesRead := meter.BytesReadDelta()
Expand All @@ -143,10 +145,11 @@ func (ms *MetricsSender) Send(ctx context.Context, userID, apiKeyID, ip, userMet
meter.CountInc(TotalWriteBytes, int(totalWriteBytes))

event := dmetering.Event{
UserID: userID,
ApiKeyID: apiKeyID,
IpAddress: ip,
Meta: userMeta,
UserID: userID,
ApiKeyID: apiKeyID,
IpAddress: ip,
Meta: userMeta,
OutputModuleHash: outputModuleHash,

Endpoint: endpoint,
Metrics: map[string]float64{
Expand Down
2 changes: 2 additions & 0 deletions reqctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ var tracerKey = contextKeyType(2)
var spanKey = contextKeyType(3)
var reqStatsKey = contextKeyType(4)
var moduleExecutionTracingConfigKey = contextKeyType(5)
var outputModuleHashKey = contextKeyType(6)
var tier2RequestParametersKeyKey = contextKeyType(7)

func Logger(ctx context.Context) *zap.Logger {
return logging.Logger(ctx, zap.NewNop())
Expand Down
14 changes: 14 additions & 0 deletions reqctx/metering.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,17 @@ func IsBackfillerRequest(ctx context.Context) bool {
_, ok = md[backFillerKey]
return ok
}

type outputModuleKeyType int

func WithOutputModuleHash(ctx context.Context, hash string) context.Context {
return context.WithValue(ctx, outputModuleHashKey, hash)
}

func OutputModuleHash(ctx context.Context) string {
hash, ok := ctx.Value(outputModuleHashKey).(string)
if !ok {
return ""
}
return hash
}
4 changes: 0 additions & 4 deletions reqctx/tier2request.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ type Tier2RequestParameters struct {
WASMModules map[string]string
}

type tier2RequestParametersKey int

const tier2RequestParametersKeyKey = tier2RequestParametersKey(0)

func WithTier2RequestParameters(ctx context.Context, parameters Tier2RequestParameters) context.Context {
return context.WithValue(ctx, tier2RequestParametersKeyKey, parameters)
}
Expand Down
17 changes: 9 additions & 8 deletions service/tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,24 +219,25 @@ func (s *Tier1Service) Blocks(
mut.Unlock()
}()

respFunc := tier1ResponseHandler(respContext, &mut, logger, stream)

span.SetAttributes(attribute.Int64("substreams.tier", 1))

request := req.Msg
if request.Modules == nil {
return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("missing modules in request"))
}

if err := ValidateTier1Request(request, s.blockType); err != nil {
return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("validate request: %w", err))
}

execGraph, err := exec.NewOutputModuleGraph(request.OutputModule, request.ProductionMode, request.Modules, bstream.GetProtocolFirstStreamableBlock)
if err != nil {
return bsstream.NewErrInvalidArg(err.Error())
}
outputModuleHash := execGraph.ModuleHashes().Get(request.OutputModule)
ctx = reqctx.WithOutputModuleHash(ctx, outputModuleHash)

respFunc := tier1ResponseHandler(respContext, &mut, logger, stream)

span.SetAttributes(attribute.Int64("substreams.tier", 1))

if err := ValidateTier1Request(request, s.blockType); err != nil {
return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("validate request: %w", err))
}

moduleNames := make([]string, len(request.Modules.Modules))
for i := 0; i < len(moduleNames); i++ {
Expand Down
7 changes: 7 additions & 0 deletions service/tier2.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,13 @@ func (s *Tier2Service) ProcessRange(request *pbssinternal.ProcessRangeRequest, s
return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("validate request: %w", err))
}

execGraph, err := exec.NewOutputModuleGraph(request.OutputModule, true, request.Modules, request.FirstStreamableBlock) //production-mode flag is irrelevant here because it isn't used to calculate the hashes
if err != nil {
return bsstream.NewErrInvalidArg(err.Error())
}
outputModuleHash := execGraph.ModuleHashes().Get(request.OutputModule)
ctx = reqctx.WithOutputModuleHash(ctx, outputModuleHash)

emitter, err := dmetering.New(request.MeteringConfig, logger)
if err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("unable to initialize dmetering: %w", err))
Expand Down
Loading