Skip to content

Commit

Permalink
small fix to speed up some development-mode requests
Browse files Browse the repository at this point in the history
  • Loading branch information
sduchesneau committed Sep 20, 2024
1 parent 72b41a4 commit b094008
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 27 deletions.
4 changes: 4 additions & 0 deletions docs/release-notes/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

* Fix small bug making some requests in development-mode slow to start (when starting close to the module initialBlock with a store that doesn't start on a boundary)

## v1.10.6

* Fixed `substreams gui` panic (regression appeared in v1.10.3)
Expand Down
44 changes: 29 additions & 15 deletions pipeline/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,26 @@ import (

type getBlockFunc func() (uint64, error)

func reprocStateRequired(startBlock uint64, outputModule string, modules []*pbsubstreams.Module) (bool, error) {
// if some stores need to gather data from a block below startBlock,
// we return the lowest block number required, else nil
func reprocStateRequired(startBlock uint64, outputModule string, modules []*pbsubstreams.Module) (*uint64, error) {
graph, err := manifest.NewModuleGraph(modules)
if err != nil {
return false, err
return nil, err
}
requiredStores, err := graph.StoresDownTo(outputModule)
if err != nil {
return false, err
return nil, err
}

var lowest *uint64
for _, store := range requiredStores {
if store.InitialBlock < startBlock {
return true, nil
lowest = &store.InitialBlock
}
}
return false, nil

return lowest, nil
}

func BuildRequestDetails(
Expand All @@ -60,17 +64,18 @@ func BuildRequestDetails(
return nil, nil, err
}

var moduleHasStatefulDependencies bool
var stateRequiredAt *uint64
if request.Modules == nil {
moduleHasStatefulDependencies = true // FIXME this is for test compatibility, it never happens in real life
x := uint64(0)
stateRequiredAt = &x // FIXME this is for test compatibility, it never happens in real life
} else {
moduleHasStatefulDependencies, err = reprocStateRequired(req.ResolvedStartBlockNum, request.OutputModule, request.Modules.Modules)
stateRequiredAt, err = reprocStateRequired(req.ResolvedStartBlockNum, request.OutputModule, request.Modules.Modules)
if err != nil {
return nil, nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid modules: %w", err))
}
}

linearHandoff, err := computeLinearHandoffBlockNum(request.ProductionMode, req.ResolvedStartBlockNum, request.StopBlockNum, getRecentFinalBlock, moduleHasStatefulDependencies, segmentSize)
linearHandoff, err := computeLinearHandoffBlockNum(request.ProductionMode, req.ResolvedStartBlockNum, request.StopBlockNum, getRecentFinalBlock, stateRequiredAt, segmentSize)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -112,7 +117,9 @@ func nextUniqueID() uint64 {
return uniqueRequestIDCounter.Add(1)
}

func computeLinearHandoffBlockNum(productionMode bool, startBlock, stopBlock uint64, getRecentFinalBlockFunc func() (uint64, error), stateRequired bool, segmentSize uint64) (uint64, error) {
func computeLinearHandoffBlockNum(productionMode bool, startBlock, stopBlock uint64, getRecentFinalBlockFunc func() (uint64, error), stateRequiredAt *uint64, segmentSize uint64) (uint64, error) {
stateRequired := stateRequiredAt != nil && *stateRequiredAt <= startBlock

//get value of of next boundary after stopBlock
if productionMode {
nextBoundary := stopBlock
Expand Down Expand Up @@ -145,14 +152,21 @@ func computeLinearHandoffBlockNum(productionMode bool, startBlock, stopBlock uin
}

prevBoundary := startBlock - (startBlock % segmentSize)
linearHandoff := prevBoundary
if *stateRequiredAt > prevBoundary {
// ex: first store is at block 1010 and we start at 1020
// we'll need to start the linear processing at block 1010
// if we have even a single store that starts at block 1000 (prevBoundary),
// then we need to start at 1000
return *stateRequiredAt, nil
}

libHandoff, err := getRecentFinalBlockFunc()
if err != nil {
return prevBoundary, nil
lib, err := getRecentFinalBlockFunc()
if err != nil || linearHandoff <= lib { // no linear handoff above the lib because tier2s cannot read blocks that are not final yet
return linearHandoff, nil
}
libHandoffBoundary := libHandoff - (libHandoff % segmentSize)

return min(prevBoundary, libHandoffBoundary), nil
return lib - (lib % segmentSize), nil
}

// resolveStartBlockNum will occasionally modify or remove the cursor inside the request
Expand Down
30 changes: 18 additions & 12 deletions pipeline/resolve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ func Test_resolveStartBlockNum(t *testing.T) {
}
}

func ref(in uint64) *uint64 {
return &in
}

func Test_computeLinaerHandoffBlockNum(t *testing.T) {
tests := []struct {
name string
Expand All @@ -182,21 +186,23 @@ func Test_computeLinaerHandoffBlockNum(t *testing.T) {
stopBlockNum uint64
expectHandoffNum uint64
expectError bool
stateRequired bool
stateRequiredAt *uint64
}{
// development mode
{"g1_start_stop_same_boundary", true, 500, false, 138, 142, 100, false, true},
{"g1_start_stop_same_boundary_livehub_fails", false, 500, false, 138, 142, 100, false, true},
{"g2_start_stop_across_boundary", true, 500, false, 138, 242, 100, false, true},
{"g2_start_stop_across_boundary_livehub_fails", true, 500, false, 138, 242, 100, false, true},
{"g1_start_stop_same_boundary", true, 500, false, 138, 142, 100, false, ref(0)},
{"g1_start_stop_same_boundary_livehub_fails", false, 500, false, 138, 142, 100, false, ref(0)},
{"g2_start_stop_across_boundary", true, 500, false, 138, 242, 100, false, ref(0)},
{"g2_start_stop_across_boundary_livehub_fails", true, 500, false, 138, 242, 100, false, ref(0)},
{"start_with_state_near", true, 500, false, 138, 242, 135, false, ref(135)},
{"start_with_state_near_livehub_fails", true, 500, false, 138, 242, 135, false, ref(135)},

// production mode
{"g4_start_stop_same_boundary", true, 500, true, 138, 142, 200, false, true},
{"g5_start_stop_across_boundary", true, 500, true, 138, 242, 300, false, true},
{"g6_lib_between_start_and_stop", true, 342, true, 121, 498, 300, false, true},
{"g6_lib_between_start_and_stop_livehub_fails", false, 342, true, 121, 498, 500, false, true},
{"g7_stop_block_infinity", true, 342, true, 121, 0, 300, false, true},
{"g7_stop_block_infinity_livehub_fails", false, 342, true, 121, 0, 300, true, true},
{"g4_start_stop_same_boundary", true, 500, true, 138, 142, 200, false, ref(0)},
{"g5_start_stop_across_boundary", true, 500, true, 138, 242, 300, false, ref(0)},
{"g6_lib_between_start_and_stop", true, 342, true, 121, 498, 300, false, ref(0)},
{"g6_lib_between_start_and_stop_livehub_fails", false, 342, true, 121, 498, 500, false, ref(0)},
{"g7_stop_block_infinity", true, 342, true, 121, 0, 300, false, ref(0)},
{"g7_stop_block_infinity_livehub_fails", false, 342, true, 121, 0, 300, true, ref(0)},
}

for _, test := range tests {
Expand All @@ -210,7 +216,7 @@ func Test_computeLinaerHandoffBlockNum(t *testing.T) {
return 0, fmt.Errorf("live not available")
}
return test.recentBlockNum, nil
}, test.stateRequired, 100)
}, test.stateRequiredAt, 100)
if test.expectError {
assert.Error(t, err)
} else {
Expand Down

0 comments on commit b094008

Please sign in to comment.