diff --git a/firehose/block_getter.go b/firehose/block_getter.go index 6ac564b..11c67e9 100644 --- a/firehose/block_getter.go +++ b/firehose/block_getter.go @@ -11,6 +11,7 @@ import ( "github.com/streamingfast/derr" "github.com/streamingfast/dmetering" "github.com/streamingfast/dstore" + "github.com/streamingfast/firehose-core/metering" "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -59,10 +60,12 @@ func (g *BlockGetter) Get( mergedBlocksStore := g.mergedBlocksStore if clonable, ok := mergedBlocksStore.(dstore.Clonable); ok { var err error - mergedBlocksStore, err = clonable.Clone(ctx) + mergedBlocksStore, err = clonable.Clone(ctx, metering.WithBlockBytesReadMeteringOptions(dmetering.GetBytesMeter(ctx), logger)...) if err != nil { return nil, err } + + //todo: (deprecated) remove this mergedBlocksStore.SetMeter(dmetering.GetBytesMeter(ctx)) } @@ -91,10 +94,12 @@ func (g *BlockGetter) Get( forkedBlocksStore := g.forkedBlocksStore if clonable, ok := forkedBlocksStore.(dstore.Clonable); ok { var err error - forkedBlocksStore, err = clonable.Clone(ctx) + forkedBlocksStore, err = clonable.Clone(ctx, metering.WithForkedBlockBytesReadMeteringOptions(dmetering.GetBytesMeter(ctx), logger)...) if err != nil { return nil, err } + + //todo: (deprecated) remove this forkedBlocksStore.SetMeter(dmetering.GetBytesMeter(ctx)) } diff --git a/firehose/server/blocks.go b/firehose/server/blocks.go index b73da6c..7e57fe9 100644 --- a/firehose/server/blocks.go +++ b/firehose/server/blocks.go @@ -14,6 +14,7 @@ import ( "github.com/streamingfast/dauth" "github.com/streamingfast/dmetering" "github.com/streamingfast/firehose-core/firehose/metrics" + "github.com/streamingfast/firehose-core/metering" "github.com/streamingfast/logging" pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" "go.uber.org/zap" @@ -66,29 +67,9 @@ func (s *Server) Block(ctx context.Context, request *pbfirehose.SingleBlockReque }, } - ////////////////////////////////////////////////////////////////////// meter := dmetering.GetBytesMeter(ctx) - bytesRead := meter.BytesReadDelta() - bytesWritten := meter.BytesWrittenDelta() - size := proto.Size(resp) - auth := dauth.FromContext(ctx) - event := dmetering.Event{ - UserID: auth.UserID(), - ApiKeyID: auth.APIKeyID(), - IpAddress: auth.RealIP(), - Meta: auth.Meta(), - Endpoint: "sf.firehose.v2.Firehose/Block", - Metrics: map[string]float64{ - "egress_bytes": float64(size), - "written_bytes": float64(bytesWritten), - "read_bytes": float64(bytesRead), - "block_count": 1, - }, - Timestamp: time.Now(), - } - dmetering.Emit(ctx, event) - ////////////////////////////////////////////////////////////////////// + metering.Send(ctx, meter, auth.UserID(), auth.APIKeyID(), auth.RealIP(), auth.Meta(), "sf.firehose.v2.Firehose/Block", resp) return resp, nil } @@ -127,10 +108,6 @@ func (s *Server) Blocks(request *pbfirehose.Request, streamSrv pbfirehose.Stream } } - isLiveBlock := func(step pbfirehose.ForkStep) bool { - return step == pbfirehose.ForkStep_STEP_NEW - } - var blockCount uint64 handlerFunc := bstream.HandlerFunc(func(block *pbbstream.Block, obj interface{}) error { blockCount++ @@ -188,10 +165,6 @@ func (s *Server) Blocks(request *pbfirehose.Request, streamSrv pbfirehose.Stream return NewErrSendBlock(err) } - if isLiveBlock(protoStep) { - dmetering.GetBytesMeter(ctx).AddBytesRead(len(block.Payload.Value)) - } - level := zap.DebugLevel if block.Number%200 == 0 { level = zap.InfoLevel @@ -206,8 +179,45 @@ func (s *Server) Blocks(request *pbfirehose.Request, streamSrv pbfirehose.Stream return status.Errorf(codes.Unimplemented, "no transforms registry configured within this instance") } + liveSourceMiddlewareHandler := func(next bstream.Handler) bstream.Handler { + return bstream.HandlerFunc(func(blk *pbbstream.Block, obj interface{}) error { + if stepable, ok := obj.(bstream.Stepable); ok { + if stepable.Step().Matches(bstream.StepNew) { + dmetering.GetBytesMeter(ctx).CountInc(metering.MeterLiveUncompressedReadBytes, len(blk.GetPayload().GetValue())) + + // legacy metering + // todo(colin): remove this once we are sure the new metering is working + dmetering.GetBytesMeter(ctx).AddBytesRead(len(blk.GetPayload().GetValue())) + } else { + dmetering.GetBytesMeter(ctx).CountInc(metering.MeterLiveUncompressedReadForkedBytes, len(blk.GetPayload().GetValue())) + } + } + return next.ProcessBlock(blk, obj) + }) + } + + fileSourceMiddlewareHandler := func(next bstream.Handler) bstream.Handler { + return bstream.HandlerFunc(func(blk *pbbstream.Block, obj interface{}) error { + if stepable, ok := obj.(bstream.Stepable); ok { + if stepable.Step().Matches(bstream.StepNew) { + dmetering.GetBytesMeter(ctx).CountInc(metering.MeterFileUncompressedReadBytes, len(blk.GetPayload().GetValue())) + } else { + dmetering.GetBytesMeter(ctx).CountInc(metering.MeterFileUncompressedReadForkedBytes, len(blk.GetPayload().GetValue())) + } + } + return next.ProcessBlock(blk, obj) + }) + } + ctx = s.initFunc(ctx, request) - str, err := s.streamFactory.New(ctx, handlerFunc, request, logger) + str, err := s.streamFactory.New( + ctx, + handlerFunc, + request, + logger, + stream.WithLiveSourceHandlerMiddleware(liveSourceMiddlewareHandler), + stream.WithFileSourceHandlerMiddleware(fileSourceMiddlewareHandler), + ) if err != nil { return err } diff --git a/firehose/server/server.go b/firehose/server/server.go index 0a9a208..6014527 100644 --- a/firehose/server/server.go +++ b/firehose/server/server.go @@ -17,6 +17,7 @@ import ( firecore "github.com/streamingfast/firehose-core" "github.com/streamingfast/firehose-core/firehose" "github.com/streamingfast/firehose-core/firehose/rate" + "github.com/streamingfast/firehose-core/metering" pbfirehoseV1 "github.com/streamingfast/pbgo/sf/firehose/v1" pbfirehoseV2 "github.com/streamingfast/pbgo/sf/firehose/v2" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" @@ -64,41 +65,19 @@ func New( opts ...Option, ) *Server { initFunc := func(ctx context.Context, _ *pbfirehoseV2.Request) context.Context { - ////////////////////////////////////////////////////////////////////// ctx = dmetering.WithBytesMeter(ctx) ctx = withRequestMeter(ctx) return ctx - ////////////////////////////////////////////////////////////////////// } postHookFunc := func(ctx context.Context, response *pbfirehoseV2.Response) { - ////////////////////////////////////////////////////////////////////// - meter := dmetering.GetBytesMeter(ctx) - bytesRead := meter.BytesReadDelta() - bytesWritten := meter.BytesWrittenDelta() - size := proto.Size(response) - - auth := dauth.FromContext(ctx) - event := dmetering.Event{ - UserID: auth.UserID(), - ApiKeyID: auth.APIKeyID(), - IpAddress: auth.RealIP(), - Meta: auth.Meta(), - Endpoint: "sf.firehose.v2.Firehose/Blocks", - Metrics: map[string]float64{ - "egress_bytes": float64(size), - "written_bytes": float64(bytesWritten), - "read_bytes": float64(bytesRead), - "block_count": 1, - }, - Timestamp: time.Now(), - } - requestMeter := getRequestMeter(ctx) requestMeter.blocks++ - requestMeter.egressBytes += size - dmetering.Emit(ctx, event) - ////////////////////////////////////////////////////////////////////// + requestMeter.egressBytes += proto.Size(response) + + meter := dmetering.GetBytesMeter(ctx) + auth := dauth.FromContext(ctx) + metering.Send(ctx, meter, auth.UserID(), auth.APIKeyID(), auth.RealIP(), auth.Meta(), "sf.firehose.v2.Firehose/Block", response) } tracerProvider := otel.GetTracerProvider() diff --git a/go.mod b/go.mod index 893fcf8..bad8288 100644 --- a/go.mod +++ b/go.mod @@ -17,22 +17,22 @@ require ( github.com/spf13/cobra v1.7.0 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.15.0 - github.com/streamingfast/bstream v0.0.2-0.20240619142813-9d23840859bf + github.com/streamingfast/bstream v0.0.2-0.20240819202225-ca1b790abf0b github.com/streamingfast/cli v0.0.4-0.20240412191021-5f81842cb71d github.com/streamingfast/dauth v0.0.0-20240222213226-519afc16cf84 github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1 github.com/streamingfast/dgrpc v0.0.0-20240423143010-f36784700c9a github.com/streamingfast/dhammer v0.0.0-20230125192823-c34bbd561bd4 - github.com/streamingfast/dmetering v0.0.0-20240422183130-658027cbb7a1 + github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951 github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545 - github.com/streamingfast/dstore v0.1.1-0.20240325191553-bcce8892a9bb + github.com/streamingfast/dstore v0.1.1-0.20240826190906-91345d4a31f2 github.com/streamingfast/jsonpb v0.0.0-20210811021341-3670f0aa02d0 github.com/streamingfast/logging v0.0.0-20230608130331-f22c91403091 github.com/streamingfast/payment-gateway v0.0.0-20240426151444-581e930c76e2 - github.com/streamingfast/pbgo v0.0.6-0.20240430190514-722fe9d82e5d + github.com/streamingfast/pbgo v0.0.6-0.20240823134334-812f6a16c5cb github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0 - github.com/streamingfast/substreams v1.9.4-0.20240812210000-635f7bcba6cf + github.com/streamingfast/substreams v1.9.4-0.20240827160230-05a454855aaf github.com/stretchr/testify v1.8.4 github.com/test-go/testify v1.1.4 go.uber.org/multierr v1.10.0 @@ -160,7 +160,7 @@ require ( github.com/streamingfast/shutter v1.5.0 github.com/subosito/gotenv v1.4.2 // indirect github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf // indirect - github.com/tetratelabs/wazero v1.7.1 // indirect + github.com/tetratelabs/wazero v1.8.0 // indirect github.com/yourbasic/graph v0.0.0-20210606180040-8ecfec1c2869 // indirect go.opencensus.io v0.24.0 go.opentelemetry.io/contrib/detectors/gcp v1.9.0 // indirect diff --git a/go.sum b/go.sum index 6b6666c..164175f 100644 --- a/go.sum +++ b/go.sum @@ -441,8 +441,8 @@ github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaO github.com/mattn/go-ieproxy v0.0.1 h1:qiyop7gCflfhwCzGyeT0gro3sF9AIg9HU98JORTkqfI= github.com/mattn/go-ieproxy v0.0.1/go.mod h1:pYabZ6IHcRpFh7vIaLfK7rdcWgFEb3SFJ6/gNWuh88E= github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= -github.com/mattn/go-sqlite3 v1.14.17 h1:mCRHCLDUBXgpKAqIKsaAaAsrAlbkeomtRFKXh2L6YIM= -github.com/mattn/go-sqlite3 v1.14.17/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= +github.com/mattn/go-sqlite3 v1.14.12 h1:TJ1bhYJPV44phC+IMu1u2K/i5RriLTPe+yc68XDJ1Z0= +github.com/mattn/go-sqlite3 v1.14.12/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/mitchellh/go-testing-interface v1.14.1 h1:jrgshOhYAUVNMAJiKbEu7EqAwgJJ2JqpQmpLJOu07cU= @@ -537,8 +537,8 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An github.com/spf13/viper v1.15.0 h1:js3yy885G8xwJa6iOISGFwd+qlUo5AvyXb7CiihdtiU= github.com/spf13/viper v1.15.0/go.mod h1:fFcTBJxvhhzSJiZy8n+PeW6t8l+KeT/uTARa0jHOQLA= github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= -github.com/streamingfast/bstream v0.0.2-0.20240619142813-9d23840859bf h1:LXFIz2pyTlIMNzyifvKsZpFLcLbJkTcRyu7OlABV1S0= -github.com/streamingfast/bstream v0.0.2-0.20240619142813-9d23840859bf/go.mod h1:n5wy+Vmwp4xbjXO7B81MAkAgjnf1vJ/lI2y6hWWyFbg= +github.com/streamingfast/bstream v0.0.2-0.20240819202225-ca1b790abf0b h1:LbT8xpXFY5bsZbQfhQJGcXUBXbl/QZZ7CqfN6nLtpwM= +github.com/streamingfast/bstream v0.0.2-0.20240819202225-ca1b790abf0b/go.mod h1:n5wy+Vmwp4xbjXO7B81MAkAgjnf1vJ/lI2y6hWWyFbg= github.com/streamingfast/cli v0.0.4-0.20240412191021-5f81842cb71d h1:9tsEt2tLCp94CW6MyJZY+Rw6+t0WH2kioBR6ucO6P/E= github.com/streamingfast/cli v0.0.4-0.20240412191021-5f81842cb71d/go.mod h1:og+6lDBPLZ24lbF/YISmVsSduZUZwXSmJGD3pZ/sW2Y= github.com/streamingfast/dauth v0.0.0-20240222213226-519afc16cf84 h1:yCvuNcwQ21J4Ua6YrAmHDBx3bjK04y+ssEYBe65BXRU= @@ -551,12 +551,12 @@ github.com/streamingfast/dgrpc v0.0.0-20240423143010-f36784700c9a h1:JwAGZ7f5vkB github.com/streamingfast/dgrpc v0.0.0-20240423143010-f36784700c9a/go.mod h1:EPtUX/vhRphE37Zo6sDcgD/S3sm5YqXHhxAgzS6Ebwo= github.com/streamingfast/dhammer v0.0.0-20230125192823-c34bbd561bd4 h1:HKi8AIkLBzxZWmbCRUo1RxoOLK33iXO6gZprfsE9rf4= github.com/streamingfast/dhammer v0.0.0-20230125192823-c34bbd561bd4/go.mod h1:ehPytv7E4rI65iLcrwTes4rNGGqPPiugnH+20nDQyp4= -github.com/streamingfast/dmetering v0.0.0-20240422183130-658027cbb7a1 h1:zPqUBv2dBJ/N898pZ9W+1qDamQjbtdD7cwtwQB8PWTQ= -github.com/streamingfast/dmetering v0.0.0-20240422183130-658027cbb7a1/go.mod h1:UqWuX3REU/IInBUaymFN2eLjuvz+/0SsoUFjeQlLNyI= +github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951 h1:6o6MS3JHrp9A7V6EBHbR7W7mzVCFmXc8U0AjTfvz7PI= +github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951/go.mod h1:UqWuX3REU/IInBUaymFN2eLjuvz+/0SsoUFjeQlLNyI= github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545 h1:SUl04bZKGAv207lp7/6CHOJIRpjUKunwItrno3K463Y= github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545/go.mod h1:JbxEDbzWRG1dHdNIPrYfuPllEkktZMgm40AwVIBENcw= -github.com/streamingfast/dstore v0.1.1-0.20240325191553-bcce8892a9bb h1:tmu8wGiSTzdqk2CnPnI7GywKwepGieqNOQDRKKSiVJg= -github.com/streamingfast/dstore v0.1.1-0.20240325191553-bcce8892a9bb/go.mod h1:kNzxgv2MzYFn2T4kelBVpGp/yP/1njtr3+csWuqxK3w= +github.com/streamingfast/dstore v0.1.1-0.20240826190906-91345d4a31f2 h1:BB3VSDl8/OHBSvjqfgufwqr4tD5l7XPjXybDm6uudj4= +github.com/streamingfast/dstore v0.1.1-0.20240826190906-91345d4a31f2/go.mod h1:kNzxgv2MzYFn2T4kelBVpGp/yP/1njtr3+csWuqxK3w= github.com/streamingfast/dtracing v0.0.0-20220305214756-b5c0e8699839 h1:K6mJPvh1jAL+/gBS7Bh9jyzWaTib6N47m06gZOTUPwQ= github.com/streamingfast/dtracing v0.0.0-20220305214756-b5c0e8699839/go.mod h1:huOJyjMYS6K8upTuxDxaNd+emD65RrXoVBvh8f1/7Ns= github.com/streamingfast/jsonpb v0.0.0-20210811021341-3670f0aa02d0 h1:g8eEYbFSykyzIyuxNMmHEUGGUvJE0ivmqZagLDK42gw= @@ -572,8 +572,8 @@ github.com/streamingfast/overseer v0.2.1-0.20210326144022-ee491780e3ef h1:9IVFHR github.com/streamingfast/overseer v0.2.1-0.20210326144022-ee491780e3ef/go.mod h1:cq8CvbZ3ioFmGrHokSAJalS0lC+pVXLKhITScItUGXY= github.com/streamingfast/payment-gateway v0.0.0-20240426151444-581e930c76e2 h1:bliib3pAObbM+6cKYQFa8axbCY/x6RczQZrOxdM7OZA= github.com/streamingfast/payment-gateway v0.0.0-20240426151444-581e930c76e2/go.mod h1:DsnLrpKZ3DIDL6FmYVuxbC44fXvQdY7aCdSLMpbqZ8Q= -github.com/streamingfast/pbgo v0.0.6-0.20240430190514-722fe9d82e5d h1:rgXXfBFlQ9C8casyay7UL53VSGR6JoUnhqGv4h6lhxM= -github.com/streamingfast/pbgo v0.0.6-0.20240430190514-722fe9d82e5d/go.mod h1:eDQjKBYg9BWE2BTaV3UZeLZ5xw05+ywA9RCFTmM1w5Y= +github.com/streamingfast/pbgo v0.0.6-0.20240823134334-812f6a16c5cb h1:Xqt4ned9ELmQMKcg7cFbm56MKG2gBjnE1M+2HObOs6w= +github.com/streamingfast/pbgo v0.0.6-0.20240823134334-812f6a16c5cb/go.mod h1:eDQjKBYg9BWE2BTaV3UZeLZ5xw05+ywA9RCFTmM1w5Y= github.com/streamingfast/protoreflect v0.0.0-20231205191344-4b629d20ce8d h1:33VIARqUqBUKXJcuQoOS1rVSms54tgxhhNCmrLptpLg= github.com/streamingfast/protoreflect v0.0.0-20231205191344-4b629d20ce8d/go.mod h1:aBJivEdekmFWYSQ29EE/fN9IanJWJXbtjy3ky0XD/jE= github.com/streamingfast/sf-tracing v0.0.0-20240430173521-888827872b90 h1:94HllkX4ttYVilo8ZJv05b5z8JiMmqBvv4+Jdgk/+2A= @@ -582,8 +582,8 @@ github.com/streamingfast/shutter v1.5.0 h1:NpzDYzj0HVpSiDJVO/FFSL6QIK/YKOxY0gJAt github.com/streamingfast/shutter v1.5.0/go.mod h1:B/T6efqdeMGbGwjzPS1ToXzYZI4kDzI5/u4I+7qbjY8= github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0 h1:Y15G1Z4fpEdm2b+/70owI7TLuXadlqBtGM7rk4Hxrzk= github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0/go.mod h1:/Rnz2TJvaShjUct0scZ9kKV2Jr9/+KBAoWy4UMYxgv4= -github.com/streamingfast/substreams v1.9.4-0.20240812210000-635f7bcba6cf h1:/5LEFtd/ws7Gl4Di3mMaZYbgasRC1ooK3einImpmVsg= -github.com/streamingfast/substreams v1.9.4-0.20240812210000-635f7bcba6cf/go.mod h1:Q/h8Mxe+MKVZqU9wIpMxLKZHb0hLIACZvDiBnR+IVyI= +github.com/streamingfast/substreams v1.9.4-0.20240827160230-05a454855aaf h1:SxYuU+ox5Jow5j+d2xXvfmuxBIoJXEyWFZg+C6T9Kdw= +github.com/streamingfast/substreams v1.9.4-0.20240827160230-05a454855aaf/go.mod h1:htDRslKI5Fj+JUqmKVsNj4Ph1DIzYig/K+VvP6SUIt0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= @@ -605,8 +605,8 @@ github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf h1:Z2X3Os7oRzpdJ7 github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf/go.mod h1:M8agBzgqHIhgj7wEn9/0hJUZcrvt9VY+Ln+S1I5Mha0= github.com/test-go/testify v1.1.4 h1:Tf9lntrKUMHiXQ07qBScBTSA0dhYQlu83hswqelv1iE= github.com/test-go/testify v1.1.4/go.mod h1:rH7cfJo/47vWGdi4GPj16x3/t1xGOj2YxzmNQzk2ghU= -github.com/tetratelabs/wazero v1.7.1 h1:QtSfd6KLc41DIMpDYlJdoMc6k7QTN246DM2+n2Y/Dx8= -github.com/tetratelabs/wazero v1.7.1/go.mod h1:ytl6Zuh20R/eROuyDaGPkp82O9C/DJfXAwJfQ3X6/7Y= +github.com/tetratelabs/wazero v1.8.0 h1:iEKu0d4c2Pd+QSRieYbnQC9yiFlMS9D+Jr0LsRmcF4g= +github.com/tetratelabs/wazero v1.8.0/go.mod h1:yAI0XTsMBhREkM/YDAK/zNou3GoiAce1P6+rp/wQhjs= github.com/tsenart/deadcode v0.0.0-20160724212837-210d2dc333e9/go.mod h1:q+QjxYvZ+fpjMXqs+XEriussHjSYqeXVnAdSV1tkMYk= github.com/yourbasic/graph v0.0.0-20210606180040-8ecfec1c2869 h1:7v7L5lsfw4w8iqBBXETukHo4IPltmD+mWoLRYUmeGN8= github.com/yourbasic/graph v0.0.0-20210606180040-8ecfec1c2869/go.mod h1:Rfzr+sqaDreiCaoQbFCu3sTXxeFq/9kXRuyOoSlGQHE= diff --git a/metering/metering.go b/metering/metering.go new file mode 100644 index 0000000..e642e85 --- /dev/null +++ b/metering/metering.go @@ -0,0 +1,80 @@ +package metering + +import ( + "context" + "time" + + "go.uber.org/zap" + + "github.com/streamingfast/dstore" + + "github.com/streamingfast/dmetering" + "github.com/streamingfast/substreams/reqctx" + "google.golang.org/protobuf/proto" +) + +const ( + MeterLiveUncompressedReadBytes = "live_uncompressed_read_bytes" + MeterLiveUncompressedReadForkedBytes = "live_uncompressed_read_forked_bytes" + + MeterFileUncompressedReadBytes = "file_uncompressed_read_bytes" + MeterFileUncompressedReadForkedBytes = "file_uncompressed_read_forked_bytes" + MeterFileCompressedReadForkedBytes = "file_compressed_read_forked_bytes" + MeterFileCompressedReadBytes = "file_compressed_read_bytes" +) + +func WithBlockBytesReadMeteringOptions(meter dmetering.Meter, logger *zap.Logger) []dstore.Option { + return []dstore.Option{dstore.WithCompressedReadCallback(func(ctx context.Context, n int) { + meter.CountInc(MeterFileCompressedReadBytes, n) + })} +} + +func WithForkedBlockBytesReadMeteringOptions(meter dmetering.Meter, logger *zap.Logger) []dstore.Option { + return []dstore.Option{dstore.WithCompressedReadCallback(func(ctx context.Context, n int) { + meter.CountInc(MeterFileCompressedReadForkedBytes, n) + })} +} + +func Send(ctx context.Context, meter dmetering.Meter, userID, apiKeyID, ip, userMeta, endpoint string, resp proto.Message) { + bytesRead := meter.BytesReadDelta() + bytesWritten := meter.BytesWrittenDelta() + egressBytes := proto.Size(resp) + + liveUncompressedReadBytes := meter.GetCountAndReset(MeterLiveUncompressedReadBytes) + liveUncompressedReadForkedBytes := meter.GetCountAndReset(MeterLiveUncompressedReadForkedBytes) + + fileUncompressedReadBytes := meter.GetCountAndReset(MeterFileUncompressedReadBytes) + fileUncompressedReadForkedBytes := meter.GetCountAndReset(MeterFileUncompressedReadForkedBytes) + fileCompressedReadForkedBytes := meter.GetCountAndReset(MeterFileCompressedReadForkedBytes) + fileCompressedReadBytes := meter.GetCountAndReset(MeterFileCompressedReadBytes) + + event := dmetering.Event{ + UserID: userID, + ApiKeyID: apiKeyID, + IpAddress: ip, + Meta: userMeta, + + Endpoint: endpoint, + Metrics: map[string]float64{ + "egress_bytes": float64(egressBytes), + "written_bytes": float64(bytesWritten), + "read_bytes": float64(bytesRead), + MeterLiveUncompressedReadBytes: float64(liveUncompressedReadBytes), + MeterLiveUncompressedReadForkedBytes: float64(liveUncompressedReadForkedBytes), + MeterLiveCompressedReadBytes: float64(liveCompressedReadBytes), + MeterFileUncompressedReadBytes: float64(fileUncompressedReadBytes), + MeterFileUncompressedReadForkedBytes: float64(fileUncompressedReadForkedBytes), + MeterFileCompressedReadForkedBytes: float64(fileCompressedReadForkedBytes), + MeterFileCompressedReadBytes: float64(fileCompressedReadBytes), + "block_count": 1, + }, + Timestamp: time.Now(), + } + + emitter := reqctx.Emitter(ctx) + if emitter == nil { + dmetering.Emit(context.WithoutCancel(ctx), event) + } else { + emitter.Emit(context.WithoutCancel(ctx), event) + } +} diff --git a/stream_factory.go b/stream_factory.go index 25f3d31..31346be 100644 --- a/stream_factory.go +++ b/stream_factory.go @@ -4,12 +4,15 @@ import ( "context" "fmt" + "github.com/streamingfast/dmetering" + + "github.com/streamingfast/firehose-core/metering" + "github.com/streamingfast/bstream" "github.com/streamingfast/bstream/hub" "github.com/streamingfast/bstream/stream" "github.com/streamingfast/bstream/transform" "github.com/streamingfast/dauth" - "github.com/streamingfast/dmetering" "github.com/streamingfast/dstore" pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" "go.uber.org/zap" @@ -44,7 +47,8 @@ func (sf *StreamFactory) New( ctx context.Context, handler bstream.Handler, request *pbfirehose.Request, - logger *zap.Logger) (*stream.Stream, error) { + logger *zap.Logger, + extraOpts ...stream.Option) (*stream.Stream, error) { reqLogger := logger.With( zap.Int64("req_start_block", request.StartBlockNum), @@ -108,23 +112,31 @@ func (sf *StreamFactory) New( forkedBlocksStore := sf.forkedBlocksStore if clonable, ok := forkedBlocksStore.(dstore.Clonable); ok { var err error - forkedBlocksStore, err = clonable.Clone(ctx) + forkedBlocksStore, err = clonable.Clone(ctx, metering.WithForkedBlockBytesReadMeteringOptions(dmetering.GetBytesMeter(ctx), logger)...) if err != nil { return nil, err } + + //todo: (deprecated) remove this forkedBlocksStore.SetMeter(dmetering.GetBytesMeter(ctx)) } mergedBlocksStore := sf.mergedBlocksStore if clonable, ok := mergedBlocksStore.(dstore.Clonable); ok { var err error - mergedBlocksStore, err = clonable.Clone(ctx) + mergedBlocksStore, err = clonable.Clone(ctx, metering.WithBlockBytesReadMeteringOptions(dmetering.GetBytesMeter(ctx), logger)...) if err != nil { return nil, err } + + //todo: (deprecated) remove this mergedBlocksStore.SetMeter(dmetering.GetBytesMeter(ctx)) } + for _, opt := range extraOpts { + options = append(options, opt) + } + str := stream.New( forkedBlocksStore, mergedBlocksStore,