Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow tier2 to run from existing mapper outputs only or to use them to skip some module re-execution #433

Merged
merged 4 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/release-notes/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
* Pick up docs from the README.md or README in the same directory as the manifest, when top-level package.doc is empty
* Tier2 service now supports a maximum concurrent requests limit. Default set to 0 (unlimited).
* Improved file listing performance for Google Storage backends by 25%
* Tier2 will now read back mapper outputs (if they exist) to prevent running them again. Additionally, it will not read back the full blocks if its inputs can be satisfied from existing cached mapper outputs.
* Tier2 will skip processing completely if the output_module is a mapper that has already been processed (ex: when multiple requests are indexing the same data at the same time)

## v1.3.7

Expand Down
19 changes: 13 additions & 6 deletions pipeline/cache/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,38 @@ type Engine struct {
blockType string
reversibleBuffers map[uint64]*execout.Buffer // block num to modules' outputs for that given block
execOutputWriter *execout.Writer // moduleName => irreversible File
runtimeConfig config.RuntimeConfig // TODO(abourget): Deprecated: remove this as it's not used
existingExecOuts map[string]*execout.File
runtimeConfig config.RuntimeConfig // TODO(abourget): Deprecated: remove this as it's not used
logger *zap.Logger
}

func NewEngine(ctx context.Context, runtimeConfig config.RuntimeConfig, execOutWriter *execout.Writer, blockType string) (*Engine, error) {
func NewEngine(ctx context.Context, runtimeConfig config.RuntimeConfig, execOutWriter *execout.Writer, blockType string, existingExecOuts map[string]*execout.File) (*Engine, error) {
e := &Engine{
ctx: ctx,
runtimeConfig: runtimeConfig,
reversibleBuffers: map[uint64]*execout.Buffer{},
execOutputWriter: execOutWriter,
logger: reqctx.Logger(ctx),
blockType: blockType,
existingExecOuts: existingExecOuts,
}
return e, nil
}

func (e *Engine) NewBuffer(block *pbbstream.Block, clock *pbsubstreams.Clock, cursor *bstream.Cursor) (execout.ExecutionOutput, error) {
execOutBuf, err := execout.NewBuffer(e.blockType, block, clock)
func (e *Engine) NewBuffer(optionalBlock *pbbstream.Block, clock *pbsubstreams.Clock, cursor *bstream.Cursor) (execout.ExecutionOutput, error) {
out, err := execout.NewBuffer(e.blockType, optionalBlock, clock)
if err != nil {
return nil, fmt.Errorf("setting up map: %w", err)
}

e.reversibleBuffers[clock.Number] = execOutBuf
e.reversibleBuffers[clock.Number] = out
for moduleName, existingExecOut := range e.existingExecOuts {
if val, ok := existingExecOut.Get(clock); ok {
out.Set(moduleName, val)
}
}

return execOutBuf, nil
return out, nil
}

func (e *Engine) HandleUndo(clock *pbsubstreams.Clock) {
Expand Down
4 changes: 4 additions & 0 deletions pipeline/exec/module_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ func (t *MockExecOutput) Clock() *pbsubstreams.Clock {
return t.clockFunc()
}

func (t *MockExecOutput) Len() int {
return 0
}

func (t *MockExecOutput) Get(name string) ([]byte, bool, error) {
v, ok := t.cacheMap[name]
if !ok {
Expand Down
7 changes: 7 additions & 0 deletions pipeline/init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ func NewExecOutputTesting(t *testing.T, block *pbbstream.Block, clock *pbsubstre
}
}

func (i *ExecOutputTesting) Len() (out int) {
for _, v := range i.Values {
out += len(v)
}
return
}

func (i *ExecOutputTesting) Get(moduleName string) (value []byte, cached bool, err error) {
val, found := i.Values[moduleName]
if !found {
Expand Down
8 changes: 1 addition & 7 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func New(
return pipe
}

func (p *Pipeline) init(ctx context.Context) (err error) {
func (p *Pipeline) Init(ctx context.Context) (err error) {
reqDetails := reqctx.Details(ctx)

p.forkHandler.registerUndoHandler(func(clock *pbsubstreams.Clock, moduleOutputs []*pbssinternal.ModuleOutput) {
Expand All @@ -138,9 +138,6 @@ func (p *Pipeline) init(ctx context.Context) (err error) {
}

func (p *Pipeline) InitTier2Stores(ctx context.Context) (err error) {
if err := p.init(ctx); err != nil {
return err
}

storeMap, err := p.setupSubrequestStores(ctx)
if err != nil {
Expand All @@ -156,9 +153,6 @@ func (p *Pipeline) InitTier2Stores(ctx context.Context) (err error) {
}

func (p *Pipeline) InitTier1StoresAndBackprocess(ctx context.Context, reqPlan *plan.RequestPlan) (err error) {
if err := p.init(ctx); err != nil {
return err
}

if reqPlan.RequiresParallelProcessing() {
storeMap, err := p.runParallelProcess(ctx, reqPlan)
Expand Down
54 changes: 34 additions & 20 deletions pipeline/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import (
"sync"

pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/dmetering"

"github.com/streamingfast/bstream"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/streamingfast/dmetering"
"github.com/streamingfast/substreams/metrics"
pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2"
pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2"
Expand All @@ -24,6 +24,24 @@ import (
"github.com/streamingfast/substreams/storage/execout"
)

func (p *Pipeline) ProcessFromExecOutput(
ctx context.Context,
clock *pbsubstreams.Clock,
cursor *bstream.Cursor,
) (err error) {
p.gate.processBlock(clock.Number, bstream.StepNewIrreversible)
execOutput, err := p.execOutputCache.NewBuffer(nil, clock, cursor)
if err != nil {
return fmt.Errorf("setting up exec output: %w", err)
}

if err = p.processBlock(ctx, execOutput, clock, cursor, bstream.StepNewIrreversible, nil); err != nil {
return err
}

return nil
}

func (p *Pipeline) ProcessBlock(block *pbbstream.Block, obj interface{}) (err error) {
ctx := p.ctx

Expand Down Expand Up @@ -51,12 +69,16 @@ func (p *Pipeline) ProcessBlock(block *pbbstream.Block, obj interface{}) (err er
step = bstream.StepNewIrreversible // with finalBlocksOnly, we never get NEW signals so we fake any 'irreversible' signal as both
}

finalBlockHeight := obj.(bstream.Stepable).FinalBlockHeight()
reorgJunctionBlock := obj.(bstream.Stepable).ReorgJunctionBlock()

reqctx.ReqStats(ctx).RecordBlock(block.AsRef())
p.gate.processBlock(block.Number, step)
if err = p.processBlock(ctx, block, clock, cursor, step, finalBlockHeight, reorgJunctionBlock); err != nil {
execOutput, err := p.execOutputCache.NewBuffer(block, clock, cursor)
if err != nil {
return fmt.Errorf("setting up exec output: %w", err)
}

if err = p.processBlock(ctx, execOutput, clock, cursor, step, reorgJunctionBlock); err != nil {
return err // watch out, io.EOF needs to go through undecorated
}
return
Expand All @@ -79,11 +101,10 @@ func blockRefToPB(block bstream.BlockRef) *pbsubstreams.BlockRef {

func (p *Pipeline) processBlock(
ctx context.Context,
block *pbbstream.Block,
execOutput execout.ExecutionOutput,
clock *pbsubstreams.Clock,
cursor *bstream.Cursor,
step bstream.StepType,
finalBlockHeight uint64,
reorgJunctionBlock bstream.BlockRef,
) (err error) {
var eof bool
Expand All @@ -101,11 +122,9 @@ func (p *Pipeline) processBlock(
}
case bstream.StepNew:
p.blockStepMap[bstream.StepNew]++
// metering of live blocks
payload := block.Payload.Value
dmetering.GetBytesMeter(ctx).AddBytesRead(len(payload))

err = p.handleStepNew(ctx, block, clock, cursor)
dmetering.GetBytesMeter(ctx).AddBytesRead(execOutput.Len())
err = p.handleStepNew(ctx, clock, cursor, execOutput)
if err != nil && err != io.EOF {
return fmt.Errorf("step new: handler step new: %w", err)
}
Expand All @@ -114,7 +133,7 @@ func (p *Pipeline) processBlock(
}
case bstream.StepNewIrreversible:
p.blockStepMap[bstream.StepNewIrreversible]++
err := p.handleStepNew(ctx, block, clock, cursor)
err = p.handleStepNew(ctx, clock, cursor, execOutput)
if err != nil && err != io.EOF {
return fmt.Errorf("step new irr: handler step new: %w", err)
}
Expand All @@ -133,11 +152,11 @@ func (p *Pipeline) processBlock(
}
}

if block.Number%500 == 0 {
if clock.Number%500 == 0 {
logger := reqctx.Logger(ctx)
// log the total number of StepNew and StepNewIrreversible blocks, and the ratio of the two
logger.Debug("block stats",
zap.Uint64("block_num", block.Number),
zap.Uint64("block_num", clock.Number),
zap.Uint64("step_new", p.blockStepMap[bstream.StepNew]),
zap.Uint64("step_new_irreversible", p.blockStepMap[bstream.StepNewIrreversible]),
zap.Float64("ratio", float64(p.blockStepMap[bstream.StepNewIrreversible])/float64(p.blockStepMap[bstream.StepNew])),
Expand Down Expand Up @@ -197,7 +216,7 @@ func (p *Pipeline) handleStepFinal(clock *pbsubstreams.Clock) error {
return nil
}

func (p *Pipeline) handleStepNew(ctx context.Context, block *pbbstream.Block, clock *pbsubstreams.Clock, cursor *bstream.Cursor) (err error) {
func (p *Pipeline) handleStepNew(ctx context.Context, clock *pbsubstreams.Clock, cursor *bstream.Cursor, execOutput execout.ExecutionOutput) (err error) {
p.insideReorgUpTo = nil
reqDetails := reqctx.Details(ctx)

Expand Down Expand Up @@ -241,17 +260,12 @@ func (p *Pipeline) handleStepNew(ctx context.Context, block *pbbstream.Block, cl
}

logger := reqctx.Logger(ctx)
execOutput, err := p.execOutputCache.NewBuffer(block, clock, cursor)
if err != nil {
return fmt.Errorf("setting up exec output: %w", err)
}

if err := p.runPreBlockHooks(ctx, clock); err != nil {
return fmt.Errorf("pre block hook: %w", err)
}

dmetering.GetBytesMeter(ctx).CountInc("wasm_input_bytes", len(block.Payload.Value))

dmetering.GetBytesMeter(ctx).CountInc("wasm_input_bytes", execOutput.Len())
if err := p.executeModules(ctx, execOutput); err != nil {
return fmt.Errorf("execute modules: %w", err)
}
Expand All @@ -270,7 +284,7 @@ func (p *Pipeline) handleStepNew(ctx context.Context, block *pbbstream.Block, cl
}

p.stores.resetStores()
logger.Debug("block processed", zap.Uint64("block_num", block.Number))
logger.Debug("block processed", zap.Uint64("block_num", clock.Number))
return nil
}

Expand Down
12 changes: 7 additions & 5 deletions service/tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (

"github.com/streamingfast/bstream/hub"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/bstream/stream"
bsstream "github.com/streamingfast/bstream/stream"
"github.com/streamingfast/dauth"
"github.com/streamingfast/dmetering"
Expand Down Expand Up @@ -325,7 +324,7 @@ var IsValidCacheTag = regexp.MustCompile(`^[a-zA-Z0-9_-]+$`).MatchString
func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Request, outputGraph *outputmodules.Graph, respFunc substreams.ResponseFunc) error {
chainFirstStreamableBlock := bstream.GetProtocolFirstStreamableBlock
if request.StartBlockNum > 0 && request.StartBlockNum < int64(chainFirstStreamableBlock) {
return stream.NewErrInvalidArg("invalid start block %d, must be >= %d (the first streamable block of the chain)", request.StartBlockNum, chainFirstStreamableBlock)
return bsstream.NewErrInvalidArg("invalid start block %d, must be >= %d (the first streamable block of the chain)", request.StartBlockNum, chainFirstStreamableBlock)
} else if request.StartBlockNum < 0 && request.StopBlockNum > 0 {
if int64(request.StopBlockNum)+int64(request.StartBlockNum) < int64(chainFirstStreamableBlock) {
request.StartBlockNum = int64(chainFirstStreamableBlock)
Expand Down Expand Up @@ -385,7 +384,7 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ
}

if err := outputGraph.ValidateRequestStartBlock(requestDetails.ResolvedStartBlockNum); err != nil {
return stream.NewErrInvalidArg(err.Error())
return bsstream.NewErrInvalidArg(err.Error())
}

wasmRuntime := wasm.NewRegistry(s.wasmExtensions, s.runtimeConfig.MaxWasmFuel)
Expand All @@ -407,7 +406,7 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ

stores := pipeline.NewStores(ctx, storeConfigs, s.runtimeConfig.StateBundleSize, requestDetails.LinearHandoffBlockNum, request.StopBlockNum, false)

execOutputCacheEngine, err := cache.NewEngine(ctx, s.runtimeConfig, nil, s.blockType)
execOutputCacheEngine, err := cache.NewEngine(ctx, s.runtimeConfig, nil, s.blockType, nil) // we don't read existing ExecOuts on tier1
if err != nil {
return fmt.Errorf("error building caching engine: %w", err)
}
Expand Down Expand Up @@ -469,6 +468,9 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ
zap.String("output_module", request.OutputModule),
)

if err := pipe.Init(ctx); err != nil {
return fmt.Errorf("error during pipeline init: %w", err)
}
if err := pipe.InitTier1StoresAndBackprocess(ctx, reqPlan); err != nil {
return fmt.Errorf("error during init_stores_and_backprocess: %w", err)
}
Expand Down Expand Up @@ -620,7 +622,7 @@ func toConnectError(ctx context.Context, err error) error {
return connect.NewError(connect.CodeInvalidArgument, err)
}

var errInvalidArg *stream.ErrInvalidArg
var errInvalidArg *bsstream.ErrInvalidArg
if errors.As(err, &errInvalidArg) {
return connect.NewError(connect.CodeInvalidArgument, errInvalidArg)
}
Expand Down
Loading
Loading