diff --git a/pipeline/cache/engine.go b/pipeline/cache/engine.go index 2c0224d77..4934a46bf 100644 --- a/pipeline/cache/engine.go +++ b/pipeline/cache/engine.go @@ -26,18 +26,19 @@ type Engine struct { ctx context.Context blockType string reversibleBuffers map[uint64]*execout.Buffer // block num to modules' outputs for that given block - execOutputWriter *execout.Writer // moduleName => irreversible File + execOutputWriters map[string]*execout.Writer // moduleName => writer (single file) existingExecOuts map[string]*execout.File - runtimeConfig config.RuntimeConfig // TODO(abourget): Deprecated: remove this as it's not used - logger *zap.Logger + + runtimeConfig config.RuntimeConfig // TODO(abourget): Deprecated: remove this as it's not used + logger *zap.Logger } -func NewEngine(ctx context.Context, runtimeConfig config.RuntimeConfig, execOutWriter *execout.Writer, blockType string, existingExecOuts map[string]*execout.File) (*Engine, error) { +func NewEngine(ctx context.Context, runtimeConfig config.RuntimeConfig, execOutWriters map[string]*execout.Writer, blockType string, existingExecOuts map[string]*execout.File) (*Engine, error) { e := &Engine{ ctx: ctx, runtimeConfig: runtimeConfig, reversibleBuffers: map[uint64]*execout.Buffer{}, - execOutputWriter: execOutWriter, + execOutputWriters: execOutWriters, logger: reqctx.Logger(ctx), blockType: blockType, existingExecOuts: existingExecOuts, @@ -73,8 +74,8 @@ func (e *Engine) HandleFinal(clock *pbsubstreams.Clock) error { return nil } - if e.execOutputWriter != nil { - e.execOutputWriter.Write(clock, execOutBuf) + for _, writer := range e.execOutputWriters { + writer.Write(clock, execOutBuf) } delete(e.reversibleBuffers, clock.Number) @@ -88,8 +89,8 @@ func (e *Engine) HandleStalled(clock *pbsubstreams.Clock) error { } func (e *Engine) EndOfStream(lastFinalClock *pbsubstreams.Clock) error { - if e.execOutputWriter != nil { - e.execOutputWriter.Close(context.Background()) + for _, writer := range e.execOutputWriters { + writer.Close(context.Background()) } return nil } diff --git a/pipeline/outputmodules/graph.go b/pipeline/outputmodules/graph.go index f552a0bc4..d2b916813 100644 --- a/pipeline/outputmodules/graph.go +++ b/pipeline/outputmodules/graph.go @@ -21,9 +21,19 @@ type Graph struct { schedulableAncestorsMap map[string][]string // modules that are ancestors (therefore dependencies) of a given module } -func (g *Graph) OutputModule() *pbsubstreams.Module { return g.outputModule } -func (g *Graph) Stores() []*pbsubstreams.Module { return g.stores } -func (g *Graph) UsedModules() []*pbsubstreams.Module { return g.usedModules } +func (g *Graph) OutputModule() *pbsubstreams.Module { return g.outputModule } +func (g *Graph) Stores() []*pbsubstreams.Module { return g.stores } +func (g *Graph) UsedModules() []*pbsubstreams.Module { return g.usedModules } +func (g *Graph) UsedModulesUpToStage(stage int) (out []*pbsubstreams.Module) { + for i := 0; i <= int(stage); i++ { + for _, layer := range g.StagedUsedModules()[i] { + for _, mod := range layer { + out = append(out, mod) + } + } + } + return +} func (g *Graph) StagedUsedModules() ExecutionStages { return g.stagedUsedModules } func (g *Graph) IsOutputModule(name string) bool { return g.outputModule.Name == name } func (g *Graph) ModuleHashes() *manifest.ModuleHashes { return g.moduleHashes } @@ -100,8 +110,8 @@ func (e ExecutionStages) LastStage() StageLayers { // a layer of mappers, followed by a layer of stores. type StageLayers []LayerModules -func (l StageLayers) isStoreStage() bool { - return l.LastLayer().IsStoreLayer() +func (l StageLayers) IsLastStage() bool { + return !l.LastLayer().IsStoreLayer() } func (l StageLayers) LastLayer() LayerModules { diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 813ddf71f..2d955df98 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -182,6 +182,7 @@ func (p *Pipeline) setupProcessingModule(reqDetails *reqctx.RequestDetails) { } } +// setupSubrequestsStores will prepare stores for all required modules up to the current stage. func (p *Pipeline) setupSubrequestStores(ctx context.Context) (storeMap store.Map, err error) { ctx, span := reqctx.WithSpan(ctx, "substreams/pipeline/tier2/store_setup") defer span.EndWithErr(&err) @@ -193,6 +194,9 @@ func (p *Pipeline) setupSubrequestStores(ctx context.Context) (storeMap store.Ma lastStage := len(p.executionStages) - 1 for stageIdx, stage := range p.executionStages { + if stageIdx > *p.highestStage { + break // skip stores for stages that we're not running + } isLastStage := stageIdx == lastStage layer := stage.LastLayer() if !layer.IsStoreLayer() { diff --git a/pipeline/process_block.go b/pipeline/process_block.go index 41a35f1a6..f63403579 100644 --- a/pipeline/process_block.go +++ b/pipeline/process_block.go @@ -371,9 +371,6 @@ func (p *Pipeline) applyExecutionResult(ctx context.Context, executor exec.Modul return fmt.Errorf("execute module: %w", runError) } - if !hasValidOutput { - return nil - } p.saveModuleOutput(moduleOutput, executor.Name(), reqctx.Details(ctx).ProductionMode) if err := execOutput.Set(executorName, outputBytes); err != nil { return fmt.Errorf("set output cache: %w", err) diff --git a/pipeline/stores.go b/pipeline/stores.go index bae7f9d02..39dae02de 100644 --- a/pipeline/stores.go +++ b/pipeline/stores.go @@ -79,6 +79,11 @@ func (s *Stores) saveStoresSnapshots(ctx context.Context, lastLayer outputmodule for _, mod := range lastLayer { store := s.StoreMap[mod.Name] s.logger.Info("flushing store at boundary", zap.Uint64("boundary", boundaryBlock), zap.String("store", mod.Name), zap.Int("stage", stage)) + // TODO when partials are generic again, we can also check if PartialKV exists and skip if it does. + exists, _ := s.configs[mod.Name].ExistsFullKV(ctx, boundaryBlock) + if exists { + continue + } if err := s.saveStoreSnapshot(ctx, store, boundaryBlock); err != nil { return fmt.Errorf("save store snapshot %q: %w", mod.Name, err) } diff --git a/service/tier1.go b/service/tier1.go index 1b2f4f144..7f6a86a7b 100644 --- a/service/tier1.go +++ b/service/tier1.go @@ -406,7 +406,7 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ stores := pipeline.NewStores(ctx, storeConfigs, s.runtimeConfig.StateBundleSize, requestDetails.LinearHandoffBlockNum, request.StopBlockNum, false) - execOutputCacheEngine, err := cache.NewEngine(ctx, s.runtimeConfig, nil, s.blockType, nil) // we don't read existing ExecOuts on tier1 + execOutputCacheEngine, err := cache.NewEngine(ctx, s.runtimeConfig, nil, s.blockType, nil) // we don't read or write ExecOuts on tier1 if err != nil { return fmt.Errorf("error building caching engine: %w", err) } diff --git a/service/tier2.go b/service/tier2.go index bee9e3fc0..303539e63 100644 --- a/service/tier2.go +++ b/service/tier2.go @@ -278,7 +278,7 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P return fmt.Errorf("internal error setting store: %w", err) } - execOutputConfigs, err := execout.NewConfigs(cacheStore, outputGraph.UsedModules(), outputGraph.ModuleHashes(), s.runtimeConfig.StateBundleSize, logger) + execOutputConfigs, err := execout.NewConfigs(cacheStore, outputGraph.UsedModulesUpToStage(int(request.Stage)), outputGraph.ModuleHashes(), s.runtimeConfig.StateBundleSize, logger) if err != nil { return fmt.Errorf("new config map: %w", err) } @@ -289,38 +289,20 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P } stores := pipeline.NewStores(ctx, storeConfigs, s.runtimeConfig.StateBundleSize, requestDetails.ResolvedStartBlockNum, request.StopBlockNum, true) - outputModule := outputGraph.OutputModule() - - var execOutWriter *execout.Writer - isOutputMapperStage := !outputGraph.StagedUsedModules()[request.Stage].LastLayer().IsStoreLayer() - if isOutputMapperStage { - execOutWriter = execout.NewWriter( - requestDetails.ResolvedStartBlockNum, - requestDetails.StopBlockNum, - outputModule.Name, - execOutputConfigs, - ) + // note all modules that are not in 'modulesRequiredToRun' are still iterated in 'pipeline.executeModules', but they will skip actual execution when they see that the cache provides the data + // This way, stores get updated at each block from the cached execouts without the actual execution of the module + modulesRequiredToRun, existingExecOuts, execOutWriters, err := evaluateModulesRequiredToRun(ctx, logger, outputGraph, request.Stage, request.StartBlockNum, request.StopBlockNum, request.OutputModule, execOutputConfigs, storeConfigs) + if err != nil { + return fmt.Errorf("evaluating required modules: %w", err) } - existingExecOuts := make(map[string]*execout.File) - for name, c := range execOutputConfigs.ConfigMap { - if c.ModuleKind() == pbsubstreams.ModuleKindStore { - continue // TODO @stepd add support for store modules - } - file, err := c.ReadFile(ctx, &block.Range{StartBlock: request.StartBlockNum, ExclusiveEndBlock: request.StopBlockNum}) - if err != nil { - continue - } - if isOutputMapperStage && name == request.OutputModule { - logger.Info("found existing exec output for output_module, skipping run", zap.String("output_module", name)) - return nil - } - existingExecOuts[name] = file + if len(modulesRequiredToRun) == 0 { + logger.Info("no modules required to run, skipping") + return nil } - skipBlocks, skipStores := checkSkipBlocksAndStores(existingExecOuts, outputGraph, s.blockType) - - execOutputCacheEngine, err := cache.NewEngine(ctx, s.runtimeConfig, execOutWriter, s.blockType, existingExecOuts) + // this engine will keep the existingExecOuts to optimize the execution (for inputs from modules that skip execution) + execOutputCacheEngine, err := cache.NewEngine(ctx, s.runtimeConfig, execOutWriters, s.blockType, existingExecOuts) if err != nil { return fmt.Errorf("error building caching engine: %w", err) } @@ -353,14 +335,12 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P if err := pipe.Init(ctx); err != nil { return fmt.Errorf("error during pipeline init: %w", err) } - if !skipStores { - if err := pipe.InitTier2Stores(ctx); err != nil { - return fmt.Errorf("error building pipeline: %w", err) - } + if err := pipe.InitTier2Stores(ctx); err != nil { + return fmt.Errorf("error building pipeline: %w", err) } var streamErr error - if skipBlocks { + if canSkipBlocks(existingExecOuts, modulesRequiredToRun, s.blockType) { var referenceMapper *execout.File for k, v := range existingExecOuts { referenceMapper = v @@ -416,26 +396,93 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P return pipe.OnStreamTerminated(ctx, streamErr) } -func checkSkipBlocksAndStores(existingExecOuts map[string]*execout.File, outputGraph *outputmodules.Graph, blockType string) (skipBlocks, skipStores bool) { +// evaluateModulesRequiredToRun will also load the existing execution outputs to be used as cache +// if it returns no modules at all, it means that we can skip the whole thing +func evaluateModulesRequiredToRun( + ctx context.Context, + logger *zap.Logger, + outputGraph *outputmodules.Graph, + stage uint32, + startBlock uint64, + stopBlock uint64, + outputModule string, + execoutConfigs *execout.Configs, + storeConfigs store.ConfigMap, +) (requiredModules map[string]*pbsubstreams.Module, existingExecOuts map[string]*execout.File, execoutWriters map[string]*execout.Writer, err error) { + + existingExecOuts = make(map[string]*execout.File) + requiredModules = make(map[string]*pbsubstreams.Module) + execoutWriters = make(map[string]*execout.Writer) + + usedModules := make(map[string]*pbsubstreams.Module) + for _, module := range outputGraph.UsedModulesUpToStage(int(stage)) { + usedModules[module.Name] = module + } + + runningLastStage := outputGraph.StagedUsedModules()[stage].IsLastStage() + + for name, c := range execoutConfigs.ConfigMap { + if _, found := usedModules[name]; !found { // skip modules that are only present in later stages + continue + } + + file, readErr := c.ReadFile(ctx, &block.Range{StartBlock: startBlock, ExclusiveEndBlock: stopBlock}) + if readErr != nil { + requiredModules[name] = usedModules[name] + continue + } + existingExecOuts[name] = file + + if c.ModuleKind() == pbsubstreams.ModuleKindMap { + if runningLastStage && name == outputModule { + logger.Info("found existing exec output for output_module, skipping run", zap.String("output_module", name)) + return nil, nil, nil, nil + } + continue + } + + // TODO when the partial KV stores are back to being 'generic' (without traceID), we will also be able to skip the store if the partial exists + storeExists, err := storeConfigs[name].ExistsFullKV(ctx, stopBlock) + if err != nil { + return nil, nil, nil, fmt.Errorf("checking file existence: %w", err) + } + if !storeExists { + // some stores may already exist completely on this stage, but others do not, so we keep going but ignore those + requiredModules[name] = usedModules[name] + } + } + + for name := range requiredModules { + if _, exists := existingExecOuts[name]; exists { + continue // for stores that need to be run for the partials, but already have cached execution outputs + } + execoutWriters[name] = execout.NewWriter( + startBlock, + stopBlock, + name, + execoutConfigs, + ) + } + + return + +} + +func canSkipBlocks(existingExecOuts map[string]*execout.File, requiredModules map[string]*pbsubstreams.Module, blockType string) bool { if len(existingExecOuts) == 0 { - return + return false } - skipBlocks = true - skipStores = true - for _, module := range outputGraph.UsedModules() { - if existingExecOuts[module.Name] != nil && outputGraph.OutputModule().Name != module.Name { // we don't skip the store if that store is actually our output module + for name, module := range requiredModules { + if existingExecOuts[name] != nil { continue } for _, input := range module.Inputs { if src := input.GetSource(); src != nil && src.Type == blockType { - skipBlocks = false - } - if input.GetStore() != nil { - skipStores = false + return true } } } - return + return false } func (s *Tier2Service) buildPipelineOptions(ctx context.Context, request *pbssinternal.ProcessRangeRequest) (opts []pipeline.Option) { diff --git a/storage/store/config.go b/storage/store/config.go index 55e78f199..869d3fa2e 100644 --- a/storage/store/config.go +++ b/storage/store/config.go @@ -9,6 +9,7 @@ import ( "github.com/streamingfast/logging" "go.uber.org/zap" + "github.com/streamingfast/substreams/block" pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1" "github.com/streamingfast/substreams/storage/store/marshaller" ) @@ -93,6 +94,11 @@ func (c *Config) NewFullKV(logger *zap.Logger) *FullKV { return &FullKV{c.newBaseStore(logger), "N/A"} } +func (c *Config) ExistsFullKV(ctx context.Context, upTo uint64) (bool, error) { + filename := FullStateFileName(block.NewRange(c.moduleInitialBlock, upTo)) + return c.objStore.FileExists(ctx, filename) +} + func (c *Config) NewPartialKV(initialBlock uint64, logger *zap.Logger) *PartialKV { return &PartialKV{ baseStore: c.newBaseStore(logger),