diff --git a/docs/release-notes/change-log.md b/docs/release-notes/change-log.md index 9ed842148..7dc055239 100644 --- a/docs/release-notes/change-log.md +++ b/docs/release-notes/change-log.md @@ -14,6 +14,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), * Pick up docs from the README.md or README in the same directory as the manifest, when top-level package.doc is empty * Tier2 service now supports a maximum concurrent requests limit. Default set to 0 (unlimited). * Improved file listing performance for Google Storage backends by 25% +* Tier2 will now read back mapper outputs (if they exist) to prevent running them again. Additionally, it will not read back the full blocks if its inputs can be satisfied from existing cached mapper outputs. +* Tier2 will skip processing completely if the output_module is a mapper that has already been processed (ex: when multiple requests are indexing the same data at the same time) ## v1.3.7 diff --git a/pipeline/cache/engine.go b/pipeline/cache/engine.go index c33dfa914..2c0224d77 100644 --- a/pipeline/cache/engine.go +++ b/pipeline/cache/engine.go @@ -27,11 +27,12 @@ type Engine struct { blockType string reversibleBuffers map[uint64]*execout.Buffer // block num to modules' outputs for that given block execOutputWriter *execout.Writer // moduleName => irreversible File - runtimeConfig config.RuntimeConfig // TODO(abourget): Deprecated: remove this as it's not used + existingExecOuts map[string]*execout.File + runtimeConfig config.RuntimeConfig // TODO(abourget): Deprecated: remove this as it's not used logger *zap.Logger } -func NewEngine(ctx context.Context, runtimeConfig config.RuntimeConfig, execOutWriter *execout.Writer, blockType string) (*Engine, error) { +func NewEngine(ctx context.Context, runtimeConfig config.RuntimeConfig, execOutWriter *execout.Writer, blockType string, existingExecOuts map[string]*execout.File) (*Engine, error) { e := &Engine{ ctx: ctx, runtimeConfig: runtimeConfig, @@ -39,19 +40,25 @@ func NewEngine(ctx context.Context, runtimeConfig config.RuntimeConfig, execOutW execOutputWriter: execOutWriter, logger: reqctx.Logger(ctx), blockType: blockType, + existingExecOuts: existingExecOuts, } return e, nil } -func (e *Engine) NewBuffer(block *pbbstream.Block, clock *pbsubstreams.Clock, cursor *bstream.Cursor) (execout.ExecutionOutput, error) { - execOutBuf, err := execout.NewBuffer(e.blockType, block, clock) +func (e *Engine) NewBuffer(optionalBlock *pbbstream.Block, clock *pbsubstreams.Clock, cursor *bstream.Cursor) (execout.ExecutionOutput, error) { + out, err := execout.NewBuffer(e.blockType, optionalBlock, clock) if err != nil { return nil, fmt.Errorf("setting up map: %w", err) } - e.reversibleBuffers[clock.Number] = execOutBuf + e.reversibleBuffers[clock.Number] = out + for moduleName, existingExecOut := range e.existingExecOuts { + if val, ok := existingExecOut.Get(clock); ok { + out.Set(moduleName, val) + } + } - return execOutBuf, nil + return out, nil } func (e *Engine) HandleUndo(clock *pbsubstreams.Clock) { diff --git a/pipeline/exec/module_executor_test.go b/pipeline/exec/module_executor_test.go index ea64310ad..0ee5afd8d 100644 --- a/pipeline/exec/module_executor_test.go +++ b/pipeline/exec/module_executor_test.go @@ -25,6 +25,10 @@ func (t *MockExecOutput) Clock() *pbsubstreams.Clock { return t.clockFunc() } +func (t *MockExecOutput) Len() int { + return 0 +} + func (t *MockExecOutput) Get(name string) ([]byte, bool, error) { v, ok := t.cacheMap[name] if !ok { diff --git a/pipeline/init_test.go b/pipeline/init_test.go index 7b157f0f6..721ad034e 100644 --- a/pipeline/init_test.go +++ b/pipeline/init_test.go @@ -64,6 +64,13 @@ func NewExecOutputTesting(t *testing.T, block *pbbstream.Block, clock *pbsubstre } } +func (i *ExecOutputTesting) Len() (out int) { + for _, v := range i.Values { + out += len(v) + } + return +} + func (i *ExecOutputTesting) Get(moduleName string) (value []byte, cached bool, err error) { val, found := i.Values[moduleName] if !found { diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 184d59fb2..813ddf71f 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -112,7 +112,7 @@ func New( return pipe } -func (p *Pipeline) init(ctx context.Context) (err error) { +func (p *Pipeline) Init(ctx context.Context) (err error) { reqDetails := reqctx.Details(ctx) p.forkHandler.registerUndoHandler(func(clock *pbsubstreams.Clock, moduleOutputs []*pbssinternal.ModuleOutput) { @@ -138,9 +138,6 @@ func (p *Pipeline) init(ctx context.Context) (err error) { } func (p *Pipeline) InitTier2Stores(ctx context.Context) (err error) { - if err := p.init(ctx); err != nil { - return err - } storeMap, err := p.setupSubrequestStores(ctx) if err != nil { @@ -156,9 +153,6 @@ func (p *Pipeline) InitTier2Stores(ctx context.Context) (err error) { } func (p *Pipeline) InitTier1StoresAndBackprocess(ctx context.Context, reqPlan *plan.RequestPlan) (err error) { - if err := p.init(ctx); err != nil { - return err - } if reqPlan.RequiresParallelProcessing() { storeMap, err := p.runParallelProcess(ctx, reqPlan) diff --git a/pipeline/process_block.go b/pipeline/process_block.go index 9776363b3..41a35f1a6 100644 --- a/pipeline/process_block.go +++ b/pipeline/process_block.go @@ -9,12 +9,12 @@ import ( "sync" pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" + "github.com/streamingfast/dmetering" "github.com/streamingfast/bstream" "go.uber.org/zap" "google.golang.org/protobuf/types/known/timestamppb" - "github.com/streamingfast/dmetering" "github.com/streamingfast/substreams/metrics" pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2" pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2" @@ -24,6 +24,24 @@ import ( "github.com/streamingfast/substreams/storage/execout" ) +func (p *Pipeline) ProcessFromExecOutput( + ctx context.Context, + clock *pbsubstreams.Clock, + cursor *bstream.Cursor, +) (err error) { + p.gate.processBlock(clock.Number, bstream.StepNewIrreversible) + execOutput, err := p.execOutputCache.NewBuffer(nil, clock, cursor) + if err != nil { + return fmt.Errorf("setting up exec output: %w", err) + } + + if err = p.processBlock(ctx, execOutput, clock, cursor, bstream.StepNewIrreversible, nil); err != nil { + return err + } + + return nil +} + func (p *Pipeline) ProcessBlock(block *pbbstream.Block, obj interface{}) (err error) { ctx := p.ctx @@ -51,12 +69,16 @@ func (p *Pipeline) ProcessBlock(block *pbbstream.Block, obj interface{}) (err er step = bstream.StepNewIrreversible // with finalBlocksOnly, we never get NEW signals so we fake any 'irreversible' signal as both } - finalBlockHeight := obj.(bstream.Stepable).FinalBlockHeight() reorgJunctionBlock := obj.(bstream.Stepable).ReorgJunctionBlock() reqctx.ReqStats(ctx).RecordBlock(block.AsRef()) p.gate.processBlock(block.Number, step) - if err = p.processBlock(ctx, block, clock, cursor, step, finalBlockHeight, reorgJunctionBlock); err != nil { + execOutput, err := p.execOutputCache.NewBuffer(block, clock, cursor) + if err != nil { + return fmt.Errorf("setting up exec output: %w", err) + } + + if err = p.processBlock(ctx, execOutput, clock, cursor, step, reorgJunctionBlock); err != nil { return err // watch out, io.EOF needs to go through undecorated } return @@ -79,11 +101,10 @@ func blockRefToPB(block bstream.BlockRef) *pbsubstreams.BlockRef { func (p *Pipeline) processBlock( ctx context.Context, - block *pbbstream.Block, + execOutput execout.ExecutionOutput, clock *pbsubstreams.Clock, cursor *bstream.Cursor, step bstream.StepType, - finalBlockHeight uint64, reorgJunctionBlock bstream.BlockRef, ) (err error) { var eof bool @@ -101,11 +122,9 @@ func (p *Pipeline) processBlock( } case bstream.StepNew: p.blockStepMap[bstream.StepNew]++ - // metering of live blocks - payload := block.Payload.Value - dmetering.GetBytesMeter(ctx).AddBytesRead(len(payload)) - err = p.handleStepNew(ctx, block, clock, cursor) + dmetering.GetBytesMeter(ctx).AddBytesRead(execOutput.Len()) + err = p.handleStepNew(ctx, clock, cursor, execOutput) if err != nil && err != io.EOF { return fmt.Errorf("step new: handler step new: %w", err) } @@ -114,7 +133,7 @@ func (p *Pipeline) processBlock( } case bstream.StepNewIrreversible: p.blockStepMap[bstream.StepNewIrreversible]++ - err := p.handleStepNew(ctx, block, clock, cursor) + err = p.handleStepNew(ctx, clock, cursor, execOutput) if err != nil && err != io.EOF { return fmt.Errorf("step new irr: handler step new: %w", err) } @@ -133,11 +152,11 @@ func (p *Pipeline) processBlock( } } - if block.Number%500 == 0 { + if clock.Number%500 == 0 { logger := reqctx.Logger(ctx) // log the total number of StepNew and StepNewIrreversible blocks, and the ratio of the two logger.Debug("block stats", - zap.Uint64("block_num", block.Number), + zap.Uint64("block_num", clock.Number), zap.Uint64("step_new", p.blockStepMap[bstream.StepNew]), zap.Uint64("step_new_irreversible", p.blockStepMap[bstream.StepNewIrreversible]), zap.Float64("ratio", float64(p.blockStepMap[bstream.StepNewIrreversible])/float64(p.blockStepMap[bstream.StepNew])), @@ -197,7 +216,7 @@ func (p *Pipeline) handleStepFinal(clock *pbsubstreams.Clock) error { return nil } -func (p *Pipeline) handleStepNew(ctx context.Context, block *pbbstream.Block, clock *pbsubstreams.Clock, cursor *bstream.Cursor) (err error) { +func (p *Pipeline) handleStepNew(ctx context.Context, clock *pbsubstreams.Clock, cursor *bstream.Cursor, execOutput execout.ExecutionOutput) (err error) { p.insideReorgUpTo = nil reqDetails := reqctx.Details(ctx) @@ -241,17 +260,12 @@ func (p *Pipeline) handleStepNew(ctx context.Context, block *pbbstream.Block, cl } logger := reqctx.Logger(ctx) - execOutput, err := p.execOutputCache.NewBuffer(block, clock, cursor) - if err != nil { - return fmt.Errorf("setting up exec output: %w", err) - } if err := p.runPreBlockHooks(ctx, clock); err != nil { return fmt.Errorf("pre block hook: %w", err) } - dmetering.GetBytesMeter(ctx).CountInc("wasm_input_bytes", len(block.Payload.Value)) - + dmetering.GetBytesMeter(ctx).CountInc("wasm_input_bytes", execOutput.Len()) if err := p.executeModules(ctx, execOutput); err != nil { return fmt.Errorf("execute modules: %w", err) } @@ -270,7 +284,7 @@ func (p *Pipeline) handleStepNew(ctx context.Context, block *pbbstream.Block, cl } p.stores.resetStores() - logger.Debug("block processed", zap.Uint64("block_num", block.Number)) + logger.Debug("block processed", zap.Uint64("block_num", clock.Number)) return nil } diff --git a/service/tier1.go b/service/tier1.go index 04d15d105..1b2f4f144 100644 --- a/service/tier1.go +++ b/service/tier1.go @@ -17,7 +17,6 @@ import ( "github.com/streamingfast/bstream/hub" pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" - "github.com/streamingfast/bstream/stream" bsstream "github.com/streamingfast/bstream/stream" "github.com/streamingfast/dauth" "github.com/streamingfast/dmetering" @@ -325,7 +324,7 @@ var IsValidCacheTag = regexp.MustCompile(`^[a-zA-Z0-9_-]+$`).MatchString func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Request, outputGraph *outputmodules.Graph, respFunc substreams.ResponseFunc) error { chainFirstStreamableBlock := bstream.GetProtocolFirstStreamableBlock if request.StartBlockNum > 0 && request.StartBlockNum < int64(chainFirstStreamableBlock) { - return stream.NewErrInvalidArg("invalid start block %d, must be >= %d (the first streamable block of the chain)", request.StartBlockNum, chainFirstStreamableBlock) + return bsstream.NewErrInvalidArg("invalid start block %d, must be >= %d (the first streamable block of the chain)", request.StartBlockNum, chainFirstStreamableBlock) } else if request.StartBlockNum < 0 && request.StopBlockNum > 0 { if int64(request.StopBlockNum)+int64(request.StartBlockNum) < int64(chainFirstStreamableBlock) { request.StartBlockNum = int64(chainFirstStreamableBlock) @@ -385,7 +384,7 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ } if err := outputGraph.ValidateRequestStartBlock(requestDetails.ResolvedStartBlockNum); err != nil { - return stream.NewErrInvalidArg(err.Error()) + return bsstream.NewErrInvalidArg(err.Error()) } wasmRuntime := wasm.NewRegistry(s.wasmExtensions, s.runtimeConfig.MaxWasmFuel) @@ -407,7 +406,7 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ stores := pipeline.NewStores(ctx, storeConfigs, s.runtimeConfig.StateBundleSize, requestDetails.LinearHandoffBlockNum, request.StopBlockNum, false) - execOutputCacheEngine, err := cache.NewEngine(ctx, s.runtimeConfig, nil, s.blockType) + execOutputCacheEngine, err := cache.NewEngine(ctx, s.runtimeConfig, nil, s.blockType, nil) // we don't read existing ExecOuts on tier1 if err != nil { return fmt.Errorf("error building caching engine: %w", err) } @@ -469,6 +468,9 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ zap.String("output_module", request.OutputModule), ) + if err := pipe.Init(ctx); err != nil { + return fmt.Errorf("error during pipeline init: %w", err) + } if err := pipe.InitTier1StoresAndBackprocess(ctx, reqPlan); err != nil { return fmt.Errorf("error during init_stores_and_backprocess: %w", err) } @@ -620,7 +622,7 @@ func toConnectError(ctx context.Context, err error) error { return connect.NewError(connect.CodeInvalidArgument, err) } - var errInvalidArg *stream.ErrInvalidArg + var errInvalidArg *bsstream.ErrInvalidArg if errors.As(err, &errInvalidArg) { return connect.NewError(connect.CodeInvalidArgument, errInvalidArg) } diff --git a/service/tier2.go b/service/tier2.go index e89783665..bee9e3fc0 100644 --- a/service/tier2.go +++ b/service/tier2.go @@ -21,8 +21,10 @@ import ( pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" "github.com/streamingfast/substreams" + "github.com/streamingfast/substreams/block" "github.com/streamingfast/substreams/metrics" pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2" + pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1" "github.com/streamingfast/substreams/pipeline" "github.com/streamingfast/substreams/pipeline/cache" "github.com/streamingfast/substreams/pipeline/exec" @@ -290,7 +292,8 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P outputModule := outputGraph.OutputModule() var execOutWriter *execout.Writer - if !outputGraph.StagedUsedModules()[request.Stage].LastLayer().IsStoreLayer() { + isOutputMapperStage := !outputGraph.StagedUsedModules()[request.Stage].LastLayer().IsStoreLayer() + if isOutputMapperStage { execOutWriter = execout.NewWriter( requestDetails.ResolvedStartBlockNum, requestDetails.StopBlockNum, @@ -299,7 +302,25 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P ) } - execOutputCacheEngine, err := cache.NewEngine(ctx, s.runtimeConfig, execOutWriter, s.blockType) + existingExecOuts := make(map[string]*execout.File) + for name, c := range execOutputConfigs.ConfigMap { + if c.ModuleKind() == pbsubstreams.ModuleKindStore { + continue // TODO @stepd add support for store modules + } + file, err := c.ReadFile(ctx, &block.Range{StartBlock: request.StartBlockNum, ExclusiveEndBlock: request.StopBlockNum}) + if err != nil { + continue + } + if isOutputMapperStage && name == request.OutputModule { + logger.Info("found existing exec output for output_module, skipping run", zap.String("output_module", name)) + return nil + } + existingExecOuts[name] = file + } + + skipBlocks, skipStores := checkSkipBlocksAndStores(existingExecOuts, outputGraph, s.blockType) + + execOutputCacheEngine, err := cache.NewEngine(ctx, s.runtimeConfig, execOutWriter, s.blockType, existingExecOuts) if err != nil { return fmt.Errorf("error building caching engine: %w", err) } @@ -328,32 +349,95 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P zap.String("output_module", request.OutputModule), zap.Uint32("stage", request.Stage), ) - if err := pipe.InitTier2Stores(ctx); err != nil { - return fmt.Errorf("error building pipeline: %w", err) + + if err := pipe.Init(ctx); err != nil { + return fmt.Errorf("error during pipeline init: %w", err) + } + if !skipStores { + if err := pipe.InitTier2Stores(ctx); err != nil { + return fmt.Errorf("error building pipeline: %w", err) + } } var streamErr error - blockStream, err := s.streamFactoryFunc( - ctx, - pipe, - int64(requestDetails.ResolvedStartBlockNum), - request.StopBlockNum, - "", - true, - false, - logger.Named("stream"), - ) - if err != nil { - return fmt.Errorf("error getting stream: %w", err) - } + if skipBlocks { + var referenceMapper *execout.File + for k, v := range existingExecOuts { + referenceMapper = v + logger.Info("running from mapper", zap.String("module", k)) + break + } + + ctx, span := reqctx.WithSpan(ctx, "substreams/tier2/pipeline/mapper_stream") + for _, v := range referenceMapper.SortedItems() { + if v.BlockNum < request.StartBlockNum || v.BlockNum >= request.StopBlockNum { + panic("reading from mapper, block was out of range") // we don't want to have this case undetected + } + clock := &pbsubstreams.Clock{ + Id: v.BlockId, + Number: v.BlockNum, + Timestamp: v.Timestamp, + } - ctx, span := reqctx.WithSpan(ctx, "substreams/tier2/pipeline/blocks_stream") - streamErr = blockStream.Run(ctx) - span.EndWithErr(&streamErr) + cursor := &bstream.Cursor{ + Step: bstream.StepNewIrreversible, + Block: bstream.NewBlockRef(v.BlockId, v.BlockNum), + LIB: bstream.NewBlockRef(v.BlockId, v.BlockNum), + HeadBlock: bstream.NewBlockRef(v.BlockId, v.BlockNum), + } + + if err := pipe.ProcessFromExecOutput(ctx, clock, cursor); err != nil { + span.EndWithErr(&err) + return err + } + } + streamErr = io.EOF + span.EndWithErr(&streamErr) + } else { + blockStream, err := s.streamFactoryFunc( + ctx, + pipe, + int64(requestDetails.ResolvedStartBlockNum), + request.StopBlockNum, + "", + true, + false, + logger.Named("stream"), + ) + if err != nil { + return fmt.Errorf("error getting stream: %w", err) + } + + ctx, span := reqctx.WithSpan(ctx, "substreams/tier2/pipeline/blocks_stream") + streamErr = blockStream.Run(ctx) + span.EndWithErr(&streamErr) + } return pipe.OnStreamTerminated(ctx, streamErr) } +func checkSkipBlocksAndStores(existingExecOuts map[string]*execout.File, outputGraph *outputmodules.Graph, blockType string) (skipBlocks, skipStores bool) { + if len(existingExecOuts) == 0 { + return + } + skipBlocks = true + skipStores = true + for _, module := range outputGraph.UsedModules() { + if existingExecOuts[module.Name] != nil && outputGraph.OutputModule().Name != module.Name { // we don't skip the store if that store is actually our output module + continue + } + for _, input := range module.Inputs { + if src := input.GetSource(); src != nil && src.Type == blockType { + skipBlocks = false + } + if input.GetStore() != nil { + skipStores = false + } + } + } + return +} + func (s *Tier2Service) buildPipelineOptions(ctx context.Context, request *pbssinternal.ProcessRangeRequest) (opts []pipeline.Option) { requestDetails := reqctx.Details(ctx) for _, pipeOpts := range s.pipelineOptions { diff --git a/storage/execout/buffer.go b/storage/execout/buffer.go index 5f4eef89b..986c94fe7 100644 --- a/storage/execout/buffer.go +++ b/storage/execout/buffer.go @@ -18,19 +18,30 @@ type Buffer struct { clock *pbsubstreams.Clock } +func (b *Buffer) Len() (out int) { + for _, v := range b.values { + out += len(v) + } + + return +} + func NewBuffer(blockType string, block *pbbstream.Block, clock *pbsubstreams.Clock) (*Buffer, error) { - blkBytes := block.Payload.Value + values := make(map[string][]byte) + clockBytes, err := proto.Marshal(clock) if err != nil { return nil, fmt.Errorf("marshalling clock %d %q: %w", clock.Number, clock.Id, err) } + values[wasm.ClockType] = clockBytes + + if block != nil { + values[blockType] = block.Payload.Value + } return &Buffer{ - clock: clock, - values: map[string][]byte{ - blockType: blkBytes, - wasm.ClockType: clockBytes, - }, + clock: clock, + values: values, }, nil } @@ -43,7 +54,7 @@ func (i *Buffer) Get(moduleName string) (value []byte, cached bool, err error) { if !found { return nil, false, NotFound } - return val, false, nil + return val, true, nil } func (i *Buffer) Set(moduleName string, value []byte) (err error) { diff --git a/storage/execout/config.go b/storage/execout/config.go index 01a025054..9d8b3cea9 100644 --- a/storage/execout/config.go +++ b/storage/execout/config.go @@ -80,3 +80,12 @@ func (c *Config) ListSnapshotFiles(ctx context.Context, inRange *bstream.Range) return files, nil } + +func (c *Config) ReadFile(ctx context.Context, inrange *block.Range) (*File, error) { + + file := c.NewFile(inrange) + if err := file.Load(ctx); err != nil { + return nil, err + } + return file, nil +} diff --git a/storage/execout/interface.go b/storage/execout/interface.go index 3655e414f..329c94c19 100644 --- a/storage/execout/interface.go +++ b/storage/execout/interface.go @@ -7,6 +7,7 @@ import ( ) type ExecutionOutputGetter interface { + Len() int Clock() *pbsubstreams.Clock Get(name string) (value []byte, cached bool, err error) }