Skip to content

Commit

Permalink
adjust scheduler to reduce reprocessing of same modules with new cach…
Browse files Browse the repository at this point in the history
…ed outputs
  • Loading branch information
sduchesneau committed Mar 20, 2024
1 parent 077f980 commit e377e61
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 59 deletions.
2 changes: 2 additions & 0 deletions docs/release-notes/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

* The "partial" store outputs no longer contain the trace ID in the filename, allowing them to be reused. If many requests point to the same modules being squashed, the squasher will detect if another Tier1 has squashed its file and reload the store from the produced full KV.

* Scheduler modification: a stage now waits for the previous stage to have completed the same segment before running, to take advantage of the cached intermediate layers.

* [Operator] Readiness metric for Substreams tier1 app is now named `substreams_tier1` (was mistakenly called `firehose` before).

* [Operator] Added back deadiness metric for Substreams tiere app (named `substreams_tier2`).
Expand Down
2 changes: 1 addition & 1 deletion orchestrator/plan/requestplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func BuildTier1RequestPlan(productionMode bool, segmentInterval uint64, graphIni
if endStoreBoundRange == nil {
return nil, fmt.Errorf("store bound range: invalid start block %d for segment interval %d", linearHandoffBlock, segmentInterval)
}
endStoreBound = endStoreBoundRange.StartBlock
endStoreBound = endStoreBoundRange.ExclusiveEndBlock
}
if scheduleStores {
plan.BuildStores = block.NewRange(graphInitBlock, endStoreBound)
Expand Down
8 changes: 4 additions & 4 deletions orchestrator/plan/requestplan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestBuildConfig(t *testing.T) {
resolvedStartBlock: 738,
linearHandoffBlock: 742,
exclusiveEndBlock: 742,
expectStoresRange: "621-700",
expectStoresRange: "621-742",
expectWriteExecOutRange: "700-742",
expectReadExecOutRange: "738-742",
expectLinearPipelineRange: "nil",
Expand All @@ -106,7 +106,7 @@ func TestBuildConfig(t *testing.T) {
resolvedStartBlock: 738,
linearHandoffBlock: 800,
exclusiveEndBlock: 800,
expectStoresRange: "621-700",
expectStoresRange: "621-800",
expectWriteExecOutRange: "700-800",
expectReadExecOutRange: "738-800",
expectLinearPipelineRange: "nil",
Expand Down Expand Up @@ -134,7 +134,7 @@ func TestBuildConfig(t *testing.T) {
resolvedStartBlock: 738,
linearHandoffBlock: 842,
exclusiveEndBlock: 842,
expectStoresRange: "621-800",
expectStoresRange: "621-842",
expectWriteExecOutRange: "700-842",
expectReadExecOutRange: "738-842",
expectLinearPipelineRange: "nil",
Expand Down Expand Up @@ -176,7 +176,7 @@ func TestBuildConfig(t *testing.T) {
resolvedStartBlock: 10,
linearHandoffBlock: 20,
exclusiveEndBlock: 20,
expectStoresRange: "5-5", // this 'empty' store range only exists to keep proper stage idx on execout mapper
expectStoresRange: "5-20",
expectWriteExecOutRange: "5-20",
expectReadExecOutRange: "10-20",
expectLinearPipelineRange: "nil",
Expand Down
2 changes: 1 addition & 1 deletion orchestrator/stage/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ func (s *Stages) dependenciesCompleted(u Unit) bool {
return true
}
for i := u.Stage - 1; i >= 0; i-- {
state := s.getState(Unit{Segment: u.Segment - 1, Stage: i})
state := s.getState(Unit{Segment: u.Segment, Stage: i})
if !(state == UnitCompleted || state == UnitNoOp) {
return false
}
Expand Down
93 changes: 40 additions & 53 deletions orchestrator/stage/stages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ func TestNewStages(t *testing.T) {

assert.Equal(t, block.ParseRange("5-10"), stages.storeSegmenter.Range(0))
assert.Equal(t, block.ParseRange("10-20"), stages.storeSegmenter.Range(1))
assert.Nil(t, stages.storeSegmenter.Range(7))
assert.Equal(t, block.ParseRange("70-75"), stages.storeSegmenter.Range(7))
assert.Equal(t, block.ParseRange("70-75"), stages.globalSegmenter.Range(7))
}

func TestNewStagesNextJobs(t *testing.T) {
//seg := block.NewSegmenter(10, 5, 50)
reqPlan, err := plan.BuildTier1RequestPlan(true, 10, 5, 5, 50, 50, true)
assert.NoError(t, err)
assert.Equal(t, "interval=10, stores=[5, 40), map_write=[5, 50), map_read=[5, 50), linear=[nil)", reqPlan.String())
assert.Equal(t, "interval=10, stores=[5, 50), map_write=[5, 50), map_read=[5, 50), linear=[nil)", reqPlan.String())
stages := NewStages(
context.Background(),
outputmodules.TestGraphStagedModules(5, 5, 5, 5, 5),
Expand Down Expand Up @@ -92,115 +92,102 @@ M:N.`)
stages.NextJob()

segmentStateEquals(t, stages, `
S:CS
S:C.
M:NS`)
S:CSS
S:C..
M:N..`)

stages.forceTransition(1, 0, UnitCompleted)
stages.NextJob()

segmentStateEquals(t, stages, `
S:CC
S:CS
M:NS`)
S:CCS
S:CS.
M:N..`)

stages.forceTransition(2, 0, UnitCompleted)
stages.NextJob()

segmentStateEquals(t, stages, `
S:CC.
S:CCC
S:CSS
M:NS.`)
M:N..`)

stages.MarkSegmentPartialPresent(id(1, 2))

segmentStateEquals(t, stages, `
S:CC.
S:CCC
S:CSS
M:NP.`)

stages.MarkSegmentMerging(id(1, 2))

segmentStateEquals(t, stages, `
S:CC.
S:CCC
S:CSS
M:NM.`)

stages.markSegmentCompleted(id(1, 2))
stages.NextJob()

segmentStateEquals(t, stages, `
S:CCS
S:CSS
M:NC.`)
S:CCCS
S:CSS.
M:NC..`)

stages.NextJob()

segmentStateEquals(t, stages, `
S:CCSS
S:CSS.
M:NC..`)
S:CCCSS
S:CSS..
M:NC...`)

_, r := stages.NextJob()
assert.Nil(t, r)

segmentStateEquals(t, stages, `
S:CCSS
S:CSS.
M:NC..`)
S:CCCSS
S:CSS..
M:NC...`)

// segmentStateEquals(t, stages, `
//S:CCSSS...
//S:CSS.....
//M:NC......`)
//
// stages.NextJob()
//
// segmentStateEquals(t, stages, `
//S:CCSSSS..
//S:CSS.....
//M:NC......`)
//
// _, r := stages.NextJob()
// assert.Nil(t, r)
stages.MarkSegmentPartialPresent(id(2, 0))
stages.MarkSegmentPartialPresent(id(3, 0))

segmentStateEquals(t, stages, `
S:CCPS
S:CSS.
M:NC..`)
S:CCCPS
S:CSS..
M:NC...`)

_, r = stages.NextJob()
assert.Nil(t, r)
stages.MarkSegmentMerging(id(2, 0))
stages.MarkSegmentMerging(id(3, 0))

segmentStateEquals(t, stages, `
S:CCMS
S:CSS.
M:NC..`)
S:CCCMS
S:CSS..
M:NC...`)

_, r = stages.NextJob()
assert.Nil(t, r)
stages.markSegmentCompleted(id(2, 0))
stages.markSegmentCompleted(id(3, 0))

segmentStateEquals(t, stages, `
S:CCCS
S:CSS.
M:NC..`)
S:CCCCS
S:CSS..
M:NC...`)

stages.NextJob()

segmentStateEquals(t, stages, `
S:CCCS
S:CSSS
M:NC..`)
S:CCCCS
S:CSSS.
M:NC...`)

stages.forceTransition(1, 1, UnitCompleted)
stages.NextJob()

segmentStateEquals(t, stages, `
S:CCCS
S:CCSS
M:NCS.`)
S:CCCCS
S:CCSS.
M:NC...`)

}

Expand Down

0 comments on commit e377e61

Please sign in to comment.