Skip to content

Commit

Permalink
Add writing of all modules outputs (store and mappers), reading from …
Browse files Browse the repository at this point in the history
…store outputs
  • Loading branch information
sduchesneau committed Mar 15, 2024
1 parent cb5094e commit fc685e1
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 63 deletions.
19 changes: 10 additions & 9 deletions pipeline/cache/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,19 @@ type Engine struct {
ctx context.Context
blockType string
reversibleBuffers map[uint64]*execout.Buffer // block num to modules' outputs for that given block
execOutputWriter *execout.Writer // moduleName => irreversible File
execOutputWriters map[string]*execout.Writer // moduleName => writer (single file)
existingExecOuts map[string]*execout.File
runtimeConfig config.RuntimeConfig // TODO(abourget): Deprecated: remove this as it's not used
logger *zap.Logger

runtimeConfig config.RuntimeConfig // TODO(abourget): Deprecated: remove this as it's not used
logger *zap.Logger
}

func NewEngine(ctx context.Context, runtimeConfig config.RuntimeConfig, execOutWriter *execout.Writer, blockType string, existingExecOuts map[string]*execout.File) (*Engine, error) {
func NewEngine(ctx context.Context, runtimeConfig config.RuntimeConfig, execOutWriters map[string]*execout.Writer, blockType string, existingExecOuts map[string]*execout.File) (*Engine, error) {
e := &Engine{
ctx: ctx,
runtimeConfig: runtimeConfig,
reversibleBuffers: map[uint64]*execout.Buffer{},
execOutputWriter: execOutWriter,
execOutputWriters: execOutWriters,
logger: reqctx.Logger(ctx),
blockType: blockType,
existingExecOuts: existingExecOuts,
Expand Down Expand Up @@ -73,8 +74,8 @@ func (e *Engine) HandleFinal(clock *pbsubstreams.Clock) error {
return nil
}

if e.execOutputWriter != nil {
e.execOutputWriter.Write(clock, execOutBuf)
for _, writer := range e.execOutputWriters {
writer.Write(clock, execOutBuf)
}

delete(e.reversibleBuffers, clock.Number)
Expand All @@ -88,8 +89,8 @@ func (e *Engine) HandleStalled(clock *pbsubstreams.Clock) error {
}

func (e *Engine) EndOfStream(lastFinalClock *pbsubstreams.Clock) error {
if e.execOutputWriter != nil {
e.execOutputWriter.Close(context.Background())
for _, writer := range e.execOutputWriters {
writer.Close(context.Background())
}
return nil
}
20 changes: 15 additions & 5 deletions pipeline/outputmodules/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,19 @@ type Graph struct {
schedulableAncestorsMap map[string][]string // modules that are ancestors (therefore dependencies) of a given module
}

func (g *Graph) OutputModule() *pbsubstreams.Module { return g.outputModule }
func (g *Graph) Stores() []*pbsubstreams.Module { return g.stores }
func (g *Graph) UsedModules() []*pbsubstreams.Module { return g.usedModules }
func (g *Graph) OutputModule() *pbsubstreams.Module { return g.outputModule }
func (g *Graph) Stores() []*pbsubstreams.Module { return g.stores }
func (g *Graph) UsedModules() []*pbsubstreams.Module { return g.usedModules }
func (g *Graph) UsedModulesUpToStage(stage int) (out []*pbsubstreams.Module) {
for i := 0; i <= int(stage); i++ {
for _, layer := range g.StagedUsedModules()[i] {
for _, mod := range layer {
out = append(out, mod)
}
}
}
return
}
func (g *Graph) StagedUsedModules() ExecutionStages { return g.stagedUsedModules }
func (g *Graph) IsOutputModule(name string) bool { return g.outputModule.Name == name }
func (g *Graph) ModuleHashes() *manifest.ModuleHashes { return g.moduleHashes }
Expand Down Expand Up @@ -100,8 +110,8 @@ func (e ExecutionStages) LastStage() StageLayers {
// a layer of mappers, followed by a layer of stores.
type StageLayers []LayerModules

func (l StageLayers) isStoreStage() bool {
return l.LastLayer().IsStoreLayer()
func (l StageLayers) IsLastStage() bool {
return !l.LastLayer().IsStoreLayer()
}

func (l StageLayers) LastLayer() LayerModules {
Expand Down
4 changes: 4 additions & 0 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func (p *Pipeline) setupProcessingModule(reqDetails *reqctx.RequestDetails) {
}
}

// setupSubrequestsStores will prepare stores for all required modules up to the current stage.
func (p *Pipeline) setupSubrequestStores(ctx context.Context) (storeMap store.Map, err error) {
ctx, span := reqctx.WithSpan(ctx, "substreams/pipeline/tier2/store_setup")
defer span.EndWithErr(&err)
Expand All @@ -193,6 +194,9 @@ func (p *Pipeline) setupSubrequestStores(ctx context.Context) (storeMap store.Ma

lastStage := len(p.executionStages) - 1
for stageIdx, stage := range p.executionStages {
if stageIdx > *p.highestStage {
break // skip stores for stages that we're not running
}
isLastStage := stageIdx == lastStage
layer := stage.LastLayer()
if !layer.IsStoreLayer() {
Expand Down
3 changes: 0 additions & 3 deletions pipeline/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,9 +371,6 @@ func (p *Pipeline) applyExecutionResult(ctx context.Context, executor exec.Modul
return fmt.Errorf("execute module: %w", runError)
}

if !hasValidOutput {
return nil
}
p.saveModuleOutput(moduleOutput, executor.Name(), reqctx.Details(ctx).ProductionMode)
if err := execOutput.Set(executorName, outputBytes); err != nil {
return fmt.Errorf("set output cache: %w", err)
Expand Down
5 changes: 5 additions & 0 deletions pipeline/stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ func (s *Stores) saveStoresSnapshots(ctx context.Context, lastLayer outputmodule
for _, mod := range lastLayer {
store := s.StoreMap[mod.Name]
s.logger.Info("flushing store at boundary", zap.Uint64("boundary", boundaryBlock), zap.String("store", mod.Name), zap.Int("stage", stage))
// TODO when partials are generic again, we can also check if PartialKV exists and skip if it does.
exists, _ := s.configs[mod.Name].ExistsFullKV(ctx, boundaryBlock)
if exists {
continue
}
if err := s.saveStoreSnapshot(ctx, store, boundaryBlock); err != nil {
return fmt.Errorf("save store snapshot %q: %w", mod.Name, err)
}
Expand Down
2 changes: 1 addition & 1 deletion service/tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ

stores := pipeline.NewStores(ctx, storeConfigs, s.runtimeConfig.StateBundleSize, requestDetails.LinearHandoffBlockNum, request.StopBlockNum, false)

execOutputCacheEngine, err := cache.NewEngine(ctx, s.runtimeConfig, nil, s.blockType, nil) // we don't read existing ExecOuts on tier1
execOutputCacheEngine, err := cache.NewEngine(ctx, s.runtimeConfig, nil, s.blockType, nil) // we don't read or write ExecOuts on tier1
if err != nil {
return fmt.Errorf("error building caching engine: %w", err)
}
Expand Down
137 changes: 92 additions & 45 deletions service/tier2.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P
return fmt.Errorf("internal error setting store: %w", err)
}

execOutputConfigs, err := execout.NewConfigs(cacheStore, outputGraph.UsedModules(), outputGraph.ModuleHashes(), s.runtimeConfig.StateBundleSize, logger)
execOutputConfigs, err := execout.NewConfigs(cacheStore, outputGraph.UsedModulesUpToStage(int(request.Stage)), outputGraph.ModuleHashes(), s.runtimeConfig.StateBundleSize, logger)
if err != nil {
return fmt.Errorf("new config map: %w", err)
}
Expand All @@ -289,38 +289,20 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P
}
stores := pipeline.NewStores(ctx, storeConfigs, s.runtimeConfig.StateBundleSize, requestDetails.ResolvedStartBlockNum, request.StopBlockNum, true)

outputModule := outputGraph.OutputModule()

var execOutWriter *execout.Writer
isOutputMapperStage := !outputGraph.StagedUsedModules()[request.Stage].LastLayer().IsStoreLayer()
if isOutputMapperStage {
execOutWriter = execout.NewWriter(
requestDetails.ResolvedStartBlockNum,
requestDetails.StopBlockNum,
outputModule.Name,
execOutputConfigs,
)
// note all modules that are not in 'modulesRequiredToRun' are still iterated in 'pipeline.executeModules', but they will skip actual execution when they see that the cache provides the data
// This way, stores get updated at each block from the cached execouts without the actual execution of the module
modulesRequiredToRun, existingExecOuts, execOutWriters, err := evaluateModulesRequiredToRun(ctx, logger, outputGraph, request.Stage, request.StartBlockNum, request.StopBlockNum, request.OutputModule, execOutputConfigs, storeConfigs)
if err != nil {
return fmt.Errorf("evaluating required modules: %w", err)
}

existingExecOuts := make(map[string]*execout.File)
for name, c := range execOutputConfigs.ConfigMap {
if c.ModuleKind() == pbsubstreams.ModuleKindStore {
continue // TODO @stepd add support for store modules
}
file, err := c.ReadFile(ctx, &block.Range{StartBlock: request.StartBlockNum, ExclusiveEndBlock: request.StopBlockNum})
if err != nil {
continue
}
if isOutputMapperStage && name == request.OutputModule {
logger.Info("found existing exec output for output_module, skipping run", zap.String("output_module", name))
return nil
}
existingExecOuts[name] = file
if len(modulesRequiredToRun) == 0 {
logger.Info("no modules required to run, skipping")
return nil
}

skipBlocks, skipStores := checkSkipBlocksAndStores(existingExecOuts, outputGraph, s.blockType)

execOutputCacheEngine, err := cache.NewEngine(ctx, s.runtimeConfig, execOutWriter, s.blockType, existingExecOuts)
// this engine will keep the existingExecOuts to optimize the execution (for inputs from modules that skip execution)
execOutputCacheEngine, err := cache.NewEngine(ctx, s.runtimeConfig, execOutWriters, s.blockType, existingExecOuts)
if err != nil {
return fmt.Errorf("error building caching engine: %w", err)
}
Expand Down Expand Up @@ -353,14 +335,12 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P
if err := pipe.Init(ctx); err != nil {
return fmt.Errorf("error during pipeline init: %w", err)
}
if !skipStores {
if err := pipe.InitTier2Stores(ctx); err != nil {
return fmt.Errorf("error building pipeline: %w", err)
}
if err := pipe.InitTier2Stores(ctx); err != nil {
return fmt.Errorf("error building pipeline: %w", err)
}

var streamErr error
if skipBlocks {
if canSkipBlocks(existingExecOuts, modulesRequiredToRun, s.blockType) {
var referenceMapper *execout.File
for k, v := range existingExecOuts {
referenceMapper = v
Expand Down Expand Up @@ -416,26 +396,93 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P
return pipe.OnStreamTerminated(ctx, streamErr)
}

func checkSkipBlocksAndStores(existingExecOuts map[string]*execout.File, outputGraph *outputmodules.Graph, blockType string) (skipBlocks, skipStores bool) {
// evaluateModulesRequiredToRun will also load the existing execution outputs to be used as cache
// if it returns no modules at all, it means that we can skip the whole thing
func evaluateModulesRequiredToRun(
ctx context.Context,
logger *zap.Logger,
outputGraph *outputmodules.Graph,
stage uint32,
startBlock uint64,
stopBlock uint64,
outputModule string,
execoutConfigs *execout.Configs,
storeConfigs store.ConfigMap,
) (requiredModules map[string]*pbsubstreams.Module, existingExecOuts map[string]*execout.File, execoutWriters map[string]*execout.Writer, err error) {

existingExecOuts = make(map[string]*execout.File)
requiredModules = make(map[string]*pbsubstreams.Module)
execoutWriters = make(map[string]*execout.Writer)

usedModules := make(map[string]*pbsubstreams.Module)
for _, module := range outputGraph.UsedModulesUpToStage(int(stage)) {
usedModules[module.Name] = module
}

runningLastStage := outputGraph.StagedUsedModules()[stage].IsLastStage()

for name, c := range execoutConfigs.ConfigMap {
if _, found := usedModules[name]; !found { // skip modules that are only present in later stages
continue
}

file, readErr := c.ReadFile(ctx, &block.Range{StartBlock: startBlock, ExclusiveEndBlock: stopBlock})
if readErr != nil {
requiredModules[name] = usedModules[name]
continue
}
existingExecOuts[name] = file

if c.ModuleKind() == pbsubstreams.ModuleKindMap {
if runningLastStage && name == outputModule {
logger.Info("found existing exec output for output_module, skipping run", zap.String("output_module", name))
return nil, nil, nil, nil
}
continue
}

// TODO when the partial KV stores are back to being 'generic' (without traceID), we will also be able to skip the store if the partial exists
storeExists, err := storeConfigs[name].ExistsFullKV(ctx, stopBlock)
if err != nil {
return nil, nil, nil, fmt.Errorf("checking file existence: %w", err)
}
if !storeExists {
// some stores may already exist completely on this stage, but others do not, so we keep going but ignore those
requiredModules[name] = usedModules[name]
}
}

for name := range requiredModules {
if _, exists := existingExecOuts[name]; exists {
continue // for stores that need to be run for the partials, but already have cached execution outputs
}
execoutWriters[name] = execout.NewWriter(
startBlock,
stopBlock,
name,
execoutConfigs,
)
}

return

}

func canSkipBlocks(existingExecOuts map[string]*execout.File, requiredModules map[string]*pbsubstreams.Module, blockType string) bool {
if len(existingExecOuts) == 0 {
return
return false
}
skipBlocks = true
skipStores = true
for _, module := range outputGraph.UsedModules() {
if existingExecOuts[module.Name] != nil && outputGraph.OutputModule().Name != module.Name { // we don't skip the store if that store is actually our output module
for name, module := range requiredModules {
if existingExecOuts[name] != nil {
continue
}
for _, input := range module.Inputs {
if src := input.GetSource(); src != nil && src.Type == blockType {
skipBlocks = false
}
if input.GetStore() != nil {
skipStores = false
return true
}
}
}
return
return false
}

func (s *Tier2Service) buildPipelineOptions(ctx context.Context, request *pbssinternal.ProcessRangeRequest) (opts []pipeline.Option) {
Expand Down
6 changes: 6 additions & 0 deletions storage/store/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/streamingfast/logging"
"go.uber.org/zap"

"github.com/streamingfast/substreams/block"
pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1"
"github.com/streamingfast/substreams/storage/store/marshaller"
)
Expand Down Expand Up @@ -93,6 +94,11 @@ func (c *Config) NewFullKV(logger *zap.Logger) *FullKV {
return &FullKV{c.newBaseStore(logger), "N/A"}
}

func (c *Config) ExistsFullKV(ctx context.Context, upTo uint64) (bool, error) {
filename := FullStateFileName(block.NewRange(c.moduleInitialBlock, upTo))
return c.objStore.FileExists(ctx, filename)
}

func (c *Config) NewPartialKV(initialBlock uint64, logger *zap.Logger) *PartialKV {
return &PartialKV{
baseStore: c.newBaseStore(logger),
Expand Down

0 comments on commit fc685e1

Please sign in to comment.