From 44e490cc07f065855cd59b9969033b61f24dce15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Duchesneau?= Date: Wed, 13 Mar 2024 11:24:53 -0400 Subject: [PATCH 1/4] tier2 re-read existing mapper outputs to skip some modules processing --- docs/release-notes/change-log.md | 1 + pipeline/cache/engine.go | 19 +++++++++++------ pipeline/process_block.go | 36 ++++++++++++++++---------------- service/tier1.go | 9 ++++---- service/tier2.go | 24 ++++++++++++++++++++- storage/execout/buffer.go | 17 ++++++++------- storage/execout/config.go | 9 ++++++++ 7 files changed, 78 insertions(+), 37 deletions(-) diff --git a/docs/release-notes/change-log.md b/docs/release-notes/change-log.md index 9ed842148..6bc550648 100644 --- a/docs/release-notes/change-log.md +++ b/docs/release-notes/change-log.md @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), * Pick up docs from the README.md or README in the same directory as the manifest, when top-level package.doc is empty * Tier2 service now supports a maximum concurrent requests limit. Default set to 0 (unlimited). * Improved file listing performance for Google Storage backends by 25% +* Tier2 will now read back mapper outputs (if they exist) to prevent running them again. It will skip processing completely if their output_module is a mapper that has already been processed. ## v1.3.7 diff --git a/pipeline/cache/engine.go b/pipeline/cache/engine.go index c33dfa914..2c0224d77 100644 --- a/pipeline/cache/engine.go +++ b/pipeline/cache/engine.go @@ -27,11 +27,12 @@ type Engine struct { blockType string reversibleBuffers map[uint64]*execout.Buffer // block num to modules' outputs for that given block execOutputWriter *execout.Writer // moduleName => irreversible File - runtimeConfig config.RuntimeConfig // TODO(abourget): Deprecated: remove this as it's not used + existingExecOuts map[string]*execout.File + runtimeConfig config.RuntimeConfig // TODO(abourget): Deprecated: remove this as it's not used logger *zap.Logger } -func NewEngine(ctx context.Context, runtimeConfig config.RuntimeConfig, execOutWriter *execout.Writer, blockType string) (*Engine, error) { +func NewEngine(ctx context.Context, runtimeConfig config.RuntimeConfig, execOutWriter *execout.Writer, blockType string, existingExecOuts map[string]*execout.File) (*Engine, error) { e := &Engine{ ctx: ctx, runtimeConfig: runtimeConfig, @@ -39,19 +40,25 @@ func NewEngine(ctx context.Context, runtimeConfig config.RuntimeConfig, execOutW execOutputWriter: execOutWriter, logger: reqctx.Logger(ctx), blockType: blockType, + existingExecOuts: existingExecOuts, } return e, nil } -func (e *Engine) NewBuffer(block *pbbstream.Block, clock *pbsubstreams.Clock, cursor *bstream.Cursor) (execout.ExecutionOutput, error) { - execOutBuf, err := execout.NewBuffer(e.blockType, block, clock) +func (e *Engine) NewBuffer(optionalBlock *pbbstream.Block, clock *pbsubstreams.Clock, cursor *bstream.Cursor) (execout.ExecutionOutput, error) { + out, err := execout.NewBuffer(e.blockType, optionalBlock, clock) if err != nil { return nil, fmt.Errorf("setting up map: %w", err) } - e.reversibleBuffers[clock.Number] = execOutBuf + e.reversibleBuffers[clock.Number] = out + for moduleName, existingExecOut := range e.existingExecOuts { + if val, ok := existingExecOut.Get(clock); ok { + out.Set(moduleName, val) + } + } - return execOutBuf, nil + return out, nil } func (e *Engine) HandleUndo(clock *pbsubstreams.Clock) { diff --git a/pipeline/process_block.go b/pipeline/process_block.go index 9776363b3..702b38085 100644 --- a/pipeline/process_block.go +++ b/pipeline/process_block.go @@ -14,7 +14,6 @@ import ( "go.uber.org/zap" "google.golang.org/protobuf/types/known/timestamppb" - "github.com/streamingfast/dmetering" "github.com/streamingfast/substreams/metrics" pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2" pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2" @@ -51,12 +50,16 @@ func (p *Pipeline) ProcessBlock(block *pbbstream.Block, obj interface{}) (err er step = bstream.StepNewIrreversible // with finalBlocksOnly, we never get NEW signals so we fake any 'irreversible' signal as both } - finalBlockHeight := obj.(bstream.Stepable).FinalBlockHeight() reorgJunctionBlock := obj.(bstream.Stepable).ReorgJunctionBlock() reqctx.ReqStats(ctx).RecordBlock(block.AsRef()) p.gate.processBlock(block.Number, step) - if err = p.processBlock(ctx, block, clock, cursor, step, finalBlockHeight, reorgJunctionBlock); err != nil { + execOutput, err := p.execOutputCache.NewBuffer(block, clock, cursor) + if err != nil { + return fmt.Errorf("setting up exec output: %w", err) + } + + if err = p.processBlock(ctx, execOutput, clock, cursor, step, reorgJunctionBlock); err != nil { return err // watch out, io.EOF needs to go through undecorated } return @@ -79,11 +82,10 @@ func blockRefToPB(block bstream.BlockRef) *pbsubstreams.BlockRef { func (p *Pipeline) processBlock( ctx context.Context, - block *pbbstream.Block, + execOutput execout.ExecutionOutput, clock *pbsubstreams.Clock, cursor *bstream.Cursor, step bstream.StepType, - finalBlockHeight uint64, reorgJunctionBlock bstream.BlockRef, ) (err error) { var eof bool @@ -102,10 +104,11 @@ func (p *Pipeline) processBlock( case bstream.StepNew: p.blockStepMap[bstream.StepNew]++ // metering of live blocks - payload := block.Payload.Value - dmetering.GetBytesMeter(ctx).AddBytesRead(len(payload)) + // FIXME stepd + //payload := block.Payload.Value + //dmetering.GetBytesMeter(ctx).AddBytesRead(len(payload)) - err = p.handleStepNew(ctx, block, clock, cursor) + err = p.handleStepNew(ctx, clock, cursor, execOutput) if err != nil && err != io.EOF { return fmt.Errorf("step new: handler step new: %w", err) } @@ -114,7 +117,7 @@ func (p *Pipeline) processBlock( } case bstream.StepNewIrreversible: p.blockStepMap[bstream.StepNewIrreversible]++ - err := p.handleStepNew(ctx, block, clock, cursor) + err = p.handleStepNew(ctx, clock, cursor, execOutput) if err != nil && err != io.EOF { return fmt.Errorf("step new irr: handler step new: %w", err) } @@ -133,11 +136,11 @@ func (p *Pipeline) processBlock( } } - if block.Number%500 == 0 { + if clock.Number%500 == 0 { logger := reqctx.Logger(ctx) // log the total number of StepNew and StepNewIrreversible blocks, and the ratio of the two logger.Debug("block stats", - zap.Uint64("block_num", block.Number), + zap.Uint64("block_num", clock.Number), zap.Uint64("step_new", p.blockStepMap[bstream.StepNew]), zap.Uint64("step_new_irreversible", p.blockStepMap[bstream.StepNewIrreversible]), zap.Float64("ratio", float64(p.blockStepMap[bstream.StepNewIrreversible])/float64(p.blockStepMap[bstream.StepNew])), @@ -197,7 +200,7 @@ func (p *Pipeline) handleStepFinal(clock *pbsubstreams.Clock) error { return nil } -func (p *Pipeline) handleStepNew(ctx context.Context, block *pbbstream.Block, clock *pbsubstreams.Clock, cursor *bstream.Cursor) (err error) { +func (p *Pipeline) handleStepNew(ctx context.Context, clock *pbsubstreams.Clock, cursor *bstream.Cursor, execOutput execout.ExecutionOutput) (err error) { p.insideReorgUpTo = nil reqDetails := reqctx.Details(ctx) @@ -241,16 +244,13 @@ func (p *Pipeline) handleStepNew(ctx context.Context, block *pbbstream.Block, cl } logger := reqctx.Logger(ctx) - execOutput, err := p.execOutputCache.NewBuffer(block, clock, cursor) - if err != nil { - return fmt.Errorf("setting up exec output: %w", err) - } if err := p.runPreBlockHooks(ctx, clock); err != nil { return fmt.Errorf("pre block hook: %w", err) } - dmetering.GetBytesMeter(ctx).CountInc("wasm_input_bytes", len(block.Payload.Value)) + // FIXME stepd + //dmetering.GetBytesMeter(ctx).CountInc("wasm_input_bytes", len(block.Payload.Value)) if err := p.executeModules(ctx, execOutput); err != nil { return fmt.Errorf("execute modules: %w", err) @@ -270,7 +270,7 @@ func (p *Pipeline) handleStepNew(ctx context.Context, block *pbbstream.Block, cl } p.stores.resetStores() - logger.Debug("block processed", zap.Uint64("block_num", block.Number)) + logger.Debug("block processed", zap.Uint64("block_num", clock.Number)) return nil } diff --git a/service/tier1.go b/service/tier1.go index 04d15d105..edaeab5e8 100644 --- a/service/tier1.go +++ b/service/tier1.go @@ -17,7 +17,6 @@ import ( "github.com/streamingfast/bstream/hub" pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" - "github.com/streamingfast/bstream/stream" bsstream "github.com/streamingfast/bstream/stream" "github.com/streamingfast/dauth" "github.com/streamingfast/dmetering" @@ -325,7 +324,7 @@ var IsValidCacheTag = regexp.MustCompile(`^[a-zA-Z0-9_-]+$`).MatchString func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Request, outputGraph *outputmodules.Graph, respFunc substreams.ResponseFunc) error { chainFirstStreamableBlock := bstream.GetProtocolFirstStreamableBlock if request.StartBlockNum > 0 && request.StartBlockNum < int64(chainFirstStreamableBlock) { - return stream.NewErrInvalidArg("invalid start block %d, must be >= %d (the first streamable block of the chain)", request.StartBlockNum, chainFirstStreamableBlock) + return bsstream.NewErrInvalidArg("invalid start block %d, must be >= %d (the first streamable block of the chain)", request.StartBlockNum, chainFirstStreamableBlock) } else if request.StartBlockNum < 0 && request.StopBlockNum > 0 { if int64(request.StopBlockNum)+int64(request.StartBlockNum) < int64(chainFirstStreamableBlock) { request.StartBlockNum = int64(chainFirstStreamableBlock) @@ -385,7 +384,7 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ } if err := outputGraph.ValidateRequestStartBlock(requestDetails.ResolvedStartBlockNum); err != nil { - return stream.NewErrInvalidArg(err.Error()) + return bsstream.NewErrInvalidArg(err.Error()) } wasmRuntime := wasm.NewRegistry(s.wasmExtensions, s.runtimeConfig.MaxWasmFuel) @@ -407,7 +406,7 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ stores := pipeline.NewStores(ctx, storeConfigs, s.runtimeConfig.StateBundleSize, requestDetails.LinearHandoffBlockNum, request.StopBlockNum, false) - execOutputCacheEngine, err := cache.NewEngine(ctx, s.runtimeConfig, nil, s.blockType) + execOutputCacheEngine, err := cache.NewEngine(ctx, s.runtimeConfig, nil, s.blockType, nil) // we don't read existing ExecOuts on tier1 if err != nil { return fmt.Errorf("error building caching engine: %w", err) } @@ -620,7 +619,7 @@ func toConnectError(ctx context.Context, err error) error { return connect.NewError(connect.CodeInvalidArgument, err) } - var errInvalidArg *stream.ErrInvalidArg + var errInvalidArg *bsstream.ErrInvalidArg if errors.As(err, &errInvalidArg) { return connect.NewError(connect.CodeInvalidArgument, errInvalidArg) } diff --git a/service/tier2.go b/service/tier2.go index e89783665..e0c0e271a 100644 --- a/service/tier2.go +++ b/service/tier2.go @@ -21,8 +21,10 @@ import ( pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" "github.com/streamingfast/substreams" + "github.com/streamingfast/substreams/block" "github.com/streamingfast/substreams/metrics" pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2" + pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1" "github.com/streamingfast/substreams/pipeline" "github.com/streamingfast/substreams/pipeline/cache" "github.com/streamingfast/substreams/pipeline/exec" @@ -299,7 +301,26 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P ) } - execOutputCacheEngine, err := cache.NewEngine(ctx, s.runtimeConfig, execOutWriter, s.blockType) + existingExecOuts := make(map[string]*execout.File) + for name, c := range execOutputConfigs.ConfigMap { + if c.ModuleKind() == pbsubstreams.ModuleKindStore { + continue // TODO @stepd add support for store modules + } + file, err := c.ReadFile(ctx, &block.Range{StartBlock: request.StartBlockNum, ExclusiveEndBlock: request.StopBlockNum}) + if err != nil { + continue + } + if name == request.OutputModule { + logger.Info("found existing exec output for output_module, skipping run", zap.String("output_module", name)) + return nil + + } + existingExecOuts[name] = file + } + // TODO @stepd: check through all inputs if we need blocks, if not, we can set up another kind of pipeline from the existing outputconfigs + // outputGraph.UsedModules()[0].Inputs[0].GetInput() + + execOutputCacheEngine, err := cache.NewEngine(ctx, s.runtimeConfig, execOutWriter, s.blockType, existingExecOuts) if err != nil { return fmt.Errorf("error building caching engine: %w", err) } @@ -328,6 +349,7 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P zap.String("output_module", request.OutputModule), zap.Uint32("stage", request.Stage), ) + // TODO @stepd: don't need to set up tier2 stores if we're not going to use them if err := pipe.InitTier2Stores(ctx); err != nil { return fmt.Errorf("error building pipeline: %w", err) } diff --git a/storage/execout/buffer.go b/storage/execout/buffer.go index 5f4eef89b..43e53a8dd 100644 --- a/storage/execout/buffer.go +++ b/storage/execout/buffer.go @@ -19,18 +19,21 @@ type Buffer struct { } func NewBuffer(blockType string, block *pbbstream.Block, clock *pbsubstreams.Clock) (*Buffer, error) { - blkBytes := block.Payload.Value + values := make(map[string][]byte) + clockBytes, err := proto.Marshal(clock) if err != nil { return nil, fmt.Errorf("marshalling clock %d %q: %w", clock.Number, clock.Id, err) } + values[wasm.ClockType] = clockBytes + + if block != nil { + values[blockType] = block.Payload.Value + } return &Buffer{ - clock: clock, - values: map[string][]byte{ - blockType: blkBytes, - wasm.ClockType: clockBytes, - }, + clock: clock, + values: values, }, nil } @@ -43,7 +46,7 @@ func (i *Buffer) Get(moduleName string) (value []byte, cached bool, err error) { if !found { return nil, false, NotFound } - return val, false, nil + return val, true, nil } func (i *Buffer) Set(moduleName string, value []byte) (err error) { diff --git a/storage/execout/config.go b/storage/execout/config.go index 01a025054..9d8b3cea9 100644 --- a/storage/execout/config.go +++ b/storage/execout/config.go @@ -80,3 +80,12 @@ func (c *Config) ListSnapshotFiles(ctx context.Context, inRange *bstream.Range) return files, nil } + +func (c *Config) ReadFile(ctx context.Context, inrange *block.Range) (*File, error) { + + file := c.NewFile(inrange) + if err := file.Load(ctx); err != nil { + return nil, err + } + return file, nil +} From 4b4040a84a8b638f96a136936b2efb940e48e9b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Duchesneau?= Date: Wed, 13 Mar 2024 14:42:17 -0400 Subject: [PATCH 2/4] run tier2 without reading full blocks when not necessary --- docs/release-notes/change-log.md | 3 +- pipeline/pipeline.go | 8 +-- pipeline/process_block.go | 19 ++++++ service/tier1.go | 3 + service/tier2.go | 104 ++++++++++++++++++++++++------- 5 files changed, 108 insertions(+), 29 deletions(-) diff --git a/docs/release-notes/change-log.md b/docs/release-notes/change-log.md index 6bc550648..7dc055239 100644 --- a/docs/release-notes/change-log.md +++ b/docs/release-notes/change-log.md @@ -14,7 +14,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), * Pick up docs from the README.md or README in the same directory as the manifest, when top-level package.doc is empty * Tier2 service now supports a maximum concurrent requests limit. Default set to 0 (unlimited). * Improved file listing performance for Google Storage backends by 25% -* Tier2 will now read back mapper outputs (if they exist) to prevent running them again. It will skip processing completely if their output_module is a mapper that has already been processed. +* Tier2 will now read back mapper outputs (if they exist) to prevent running them again. Additionally, it will not read back the full blocks if its inputs can be satisfied from existing cached mapper outputs. +* Tier2 will skip processing completely if the output_module is a mapper that has already been processed (ex: when multiple requests are indexing the same data at the same time) ## v1.3.7 diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 184d59fb2..813ddf71f 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -112,7 +112,7 @@ func New( return pipe } -func (p *Pipeline) init(ctx context.Context) (err error) { +func (p *Pipeline) Init(ctx context.Context) (err error) { reqDetails := reqctx.Details(ctx) p.forkHandler.registerUndoHandler(func(clock *pbsubstreams.Clock, moduleOutputs []*pbssinternal.ModuleOutput) { @@ -138,9 +138,6 @@ func (p *Pipeline) init(ctx context.Context) (err error) { } func (p *Pipeline) InitTier2Stores(ctx context.Context) (err error) { - if err := p.init(ctx); err != nil { - return err - } storeMap, err := p.setupSubrequestStores(ctx) if err != nil { @@ -156,9 +153,6 @@ func (p *Pipeline) InitTier2Stores(ctx context.Context) (err error) { } func (p *Pipeline) InitTier1StoresAndBackprocess(ctx context.Context, reqPlan *plan.RequestPlan) (err error) { - if err := p.init(ctx); err != nil { - return err - } if reqPlan.RequiresParallelProcessing() { storeMap, err := p.runParallelProcess(ctx, reqPlan) diff --git a/pipeline/process_block.go b/pipeline/process_block.go index 702b38085..2c5406816 100644 --- a/pipeline/process_block.go +++ b/pipeline/process_block.go @@ -23,6 +23,25 @@ import ( "github.com/streamingfast/substreams/storage/execout" ) +func (p *Pipeline) ProcessFromExecOutput( + ctx context.Context, + clock *pbsubstreams.Clock, + cursor *bstream.Cursor, +) (err error) { + // TODO @stepd: add metrics back here too + p.gate.processBlock(clock.Number, bstream.StepNewIrreversible) + execOutput, err := p.execOutputCache.NewBuffer(nil, clock, cursor) + if err != nil { + return fmt.Errorf("setting up exec output: %w", err) + } + + if err = p.processBlock(ctx, execOutput, clock, cursor, bstream.StepNewIrreversible, nil); err != nil { + return err + } + + return nil +} + func (p *Pipeline) ProcessBlock(block *pbbstream.Block, obj interface{}) (err error) { ctx := p.ctx diff --git a/service/tier1.go b/service/tier1.go index edaeab5e8..1b2f4f144 100644 --- a/service/tier1.go +++ b/service/tier1.go @@ -468,6 +468,9 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ zap.String("output_module", request.OutputModule), ) + if err := pipe.Init(ctx); err != nil { + return fmt.Errorf("error during pipeline init: %w", err) + } if err := pipe.InitTier1StoresAndBackprocess(ctx, reqPlan); err != nil { return fmt.Errorf("error during init_stores_and_backprocess: %w", err) } diff --git a/service/tier2.go b/service/tier2.go index e0c0e271a..8f71bd371 100644 --- a/service/tier2.go +++ b/service/tier2.go @@ -317,8 +317,8 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P } existingExecOuts[name] = file } - // TODO @stepd: check through all inputs if we need blocks, if not, we can set up another kind of pipeline from the existing outputconfigs - // outputGraph.UsedModules()[0].Inputs[0].GetInput() + + skipBlocks, skipStores := checkSkipBlocksAndStores(existingExecOuts, outputGraph, s.blockType) execOutputCacheEngine, err := cache.NewEngine(ctx, s.runtimeConfig, execOutWriter, s.blockType, existingExecOuts) if err != nil { @@ -349,33 +349,95 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P zap.String("output_module", request.OutputModule), zap.Uint32("stage", request.Stage), ) - // TODO @stepd: don't need to set up tier2 stores if we're not going to use them - if err := pipe.InitTier2Stores(ctx); err != nil { - return fmt.Errorf("error building pipeline: %w", err) + + if err := pipe.Init(ctx); err != nil { + return fmt.Errorf("error during pipeline init: %w", err) + } + if !skipStores { + if err := pipe.InitTier2Stores(ctx); err != nil { + return fmt.Errorf("error building pipeline: %w", err) + } } var streamErr error - blockStream, err := s.streamFactoryFunc( - ctx, - pipe, - int64(requestDetails.ResolvedStartBlockNum), - request.StopBlockNum, - "", - true, - false, - logger.Named("stream"), - ) - if err != nil { - return fmt.Errorf("error getting stream: %w", err) - } + if skipBlocks { + var referenceMapper *execout.File + for k, v := range existingExecOuts { + referenceMapper = v + logger.Info("running from mapper", zap.String("module", k)) + break + } + + ctx, span := reqctx.WithSpan(ctx, "substreams/tier2/pipeline/mapper_stream") + for _, v := range referenceMapper.SortedItems() { + if v.BlockNum < request.StartBlockNum || v.BlockNum >= request.StopBlockNum { + panic("reading from mapper, block was out of range") // we don't want to have this case undetected + } + clock := &pbsubstreams.Clock{ + Id: v.BlockId, + Number: v.BlockNum, + Timestamp: v.Timestamp, + } - ctx, span := reqctx.WithSpan(ctx, "substreams/tier2/pipeline/blocks_stream") - streamErr = blockStream.Run(ctx) - span.EndWithErr(&streamErr) + cursor := &bstream.Cursor{ + Step: bstream.StepNewIrreversible, + Block: bstream.NewBlockRef(v.BlockId, v.BlockNum), + LIB: bstream.NewBlockRef(v.BlockId, v.BlockNum), + HeadBlock: bstream.NewBlockRef(v.BlockId, v.BlockNum), + } + + if err := pipe.ProcessFromExecOutput(ctx, clock, cursor); err != nil { + span.EndWithErr(&err) + return err + } + } + streamErr = io.EOF + span.EndWithErr(&streamErr) + } else { + blockStream, err := s.streamFactoryFunc( + ctx, + pipe, + int64(requestDetails.ResolvedStartBlockNum), + request.StopBlockNum, + "", + true, + false, + logger.Named("stream"), + ) + if err != nil { + return fmt.Errorf("error getting stream: %w", err) + } + + ctx, span := reqctx.WithSpan(ctx, "substreams/tier2/pipeline/blocks_stream") + streamErr = blockStream.Run(ctx) + span.EndWithErr(&streamErr) + } return pipe.OnStreamTerminated(ctx, streamErr) } +func checkSkipBlocksAndStores(existingExecOuts map[string]*execout.File, outputGraph *outputmodules.Graph, blockType string) (skipBlocks, skipStores bool) { + if len(existingExecOuts) == 0 { + return + } + skipBlocks = true + skipStores = true + for _, module := range outputGraph.UsedModules() { + if existingExecOuts[module.Name] != nil { + continue + } + for _, input := range module.Inputs { + if src := input.GetSource(); src != nil && src.Type == blockType { + skipBlocks = false + } + if input.GetStore() != nil { + skipStores = false + } + } + } + return +} + func (s *Tier2Service) buildPipelineOptions(ctx context.Context, request *pbssinternal.ProcessRangeRequest) (opts []pipeline.Option) { requestDetails := reqctx.Details(ctx) for _, pipeOpts := range s.pipelineOptions { From 07eabd8c5fb74ddccf0f82dbf20eb9005c8eb3e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Duchesneau?= Date: Wed, 13 Mar 2024 15:07:24 -0400 Subject: [PATCH 3/4] add back the metrics for read bytes/blocks --- pipeline/exec/module_executor_test.go | 4 ++++ pipeline/init_test.go | 7 +++++++ pipeline/process_block.go | 11 +++-------- storage/execout/buffer.go | 8 ++++++++ storage/execout/interface.go | 1 + 5 files changed, 23 insertions(+), 8 deletions(-) diff --git a/pipeline/exec/module_executor_test.go b/pipeline/exec/module_executor_test.go index ea64310ad..0ee5afd8d 100644 --- a/pipeline/exec/module_executor_test.go +++ b/pipeline/exec/module_executor_test.go @@ -25,6 +25,10 @@ func (t *MockExecOutput) Clock() *pbsubstreams.Clock { return t.clockFunc() } +func (t *MockExecOutput) Len() int { + return 0 +} + func (t *MockExecOutput) Get(name string) ([]byte, bool, error) { v, ok := t.cacheMap[name] if !ok { diff --git a/pipeline/init_test.go b/pipeline/init_test.go index 7b157f0f6..721ad034e 100644 --- a/pipeline/init_test.go +++ b/pipeline/init_test.go @@ -64,6 +64,13 @@ func NewExecOutputTesting(t *testing.T, block *pbbstream.Block, clock *pbsubstre } } +func (i *ExecOutputTesting) Len() (out int) { + for _, v := range i.Values { + out += len(v) + } + return +} + func (i *ExecOutputTesting) Get(moduleName string) (value []byte, cached bool, err error) { val, found := i.Values[moduleName] if !found { diff --git a/pipeline/process_block.go b/pipeline/process_block.go index 2c5406816..41a35f1a6 100644 --- a/pipeline/process_block.go +++ b/pipeline/process_block.go @@ -9,6 +9,7 @@ import ( "sync" pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" + "github.com/streamingfast/dmetering" "github.com/streamingfast/bstream" "go.uber.org/zap" @@ -28,7 +29,6 @@ func (p *Pipeline) ProcessFromExecOutput( clock *pbsubstreams.Clock, cursor *bstream.Cursor, ) (err error) { - // TODO @stepd: add metrics back here too p.gate.processBlock(clock.Number, bstream.StepNewIrreversible) execOutput, err := p.execOutputCache.NewBuffer(nil, clock, cursor) if err != nil { @@ -122,11 +122,8 @@ func (p *Pipeline) processBlock( } case bstream.StepNew: p.blockStepMap[bstream.StepNew]++ - // metering of live blocks - // FIXME stepd - //payload := block.Payload.Value - //dmetering.GetBytesMeter(ctx).AddBytesRead(len(payload)) + dmetering.GetBytesMeter(ctx).AddBytesRead(execOutput.Len()) err = p.handleStepNew(ctx, clock, cursor, execOutput) if err != nil && err != io.EOF { return fmt.Errorf("step new: handler step new: %w", err) @@ -268,9 +265,7 @@ func (p *Pipeline) handleStepNew(ctx context.Context, clock *pbsubstreams.Clock, return fmt.Errorf("pre block hook: %w", err) } - // FIXME stepd - //dmetering.GetBytesMeter(ctx).CountInc("wasm_input_bytes", len(block.Payload.Value)) - + dmetering.GetBytesMeter(ctx).CountInc("wasm_input_bytes", execOutput.Len()) if err := p.executeModules(ctx, execOutput); err != nil { return fmt.Errorf("execute modules: %w", err) } diff --git a/storage/execout/buffer.go b/storage/execout/buffer.go index 43e53a8dd..986c94fe7 100644 --- a/storage/execout/buffer.go +++ b/storage/execout/buffer.go @@ -18,6 +18,14 @@ type Buffer struct { clock *pbsubstreams.Clock } +func (b *Buffer) Len() (out int) { + for _, v := range b.values { + out += len(v) + } + + return +} + func NewBuffer(blockType string, block *pbbstream.Block, clock *pbsubstreams.Clock) (*Buffer, error) { values := make(map[string][]byte) diff --git a/storage/execout/interface.go b/storage/execout/interface.go index 3655e414f..329c94c19 100644 --- a/storage/execout/interface.go +++ b/storage/execout/interface.go @@ -7,6 +7,7 @@ import ( ) type ExecutionOutputGetter interface { + Len() int Clock() *pbsubstreams.Clock Get(name string) (value []byte, cached bool, err error) } From 5e6e4a3334ebd1e1f1bb59efaf69a1ebc94f7059 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Duchesneau?= Date: Thu, 14 Mar 2024 09:38:48 -0400 Subject: [PATCH 4/4] fix 'skip store' mechanism, don't skip the store that we are actually working on --- service/tier2.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/service/tier2.go b/service/tier2.go index 8f71bd371..bee9e3fc0 100644 --- a/service/tier2.go +++ b/service/tier2.go @@ -292,7 +292,8 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P outputModule := outputGraph.OutputModule() var execOutWriter *execout.Writer - if !outputGraph.StagedUsedModules()[request.Stage].LastLayer().IsStoreLayer() { + isOutputMapperStage := !outputGraph.StagedUsedModules()[request.Stage].LastLayer().IsStoreLayer() + if isOutputMapperStage { execOutWriter = execout.NewWriter( requestDetails.ResolvedStartBlockNum, requestDetails.StopBlockNum, @@ -310,10 +311,9 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P if err != nil { continue } - if name == request.OutputModule { + if isOutputMapperStage && name == request.OutputModule { logger.Info("found existing exec output for output_module, skipping run", zap.String("output_module", name)) return nil - } existingExecOuts[name] = file } @@ -423,7 +423,7 @@ func checkSkipBlocksAndStores(existingExecOuts map[string]*execout.File, outputG skipBlocks = true skipStores = true for _, module := range outputGraph.UsedModules() { - if existingExecOuts[module.Name] != nil { + if existingExecOuts[module.Name] != nil && outputGraph.OutputModule().Name != module.Name { // we don't skip the store if that store is actually our output module continue } for _, input := range module.Inputs {