Skip to content

Commit

Permalink
partial (unsquashed) stores no longer contain traceID in filename, so…
Browse files Browse the repository at this point in the history
… they can be reused, squasher will handle this gracefully

the execouts for stores are now only written on a PartialKV, containing
a list of operations to re-apply. They are only output as deltas when
applied to a fullKV.
  • Loading branch information
sduchesneau committed Mar 19, 2024
1 parent 48abe7f commit 0077dd1
Show file tree
Hide file tree
Showing 36 changed files with 791 additions and 239 deletions.
6 changes: 5 additions & 1 deletion docs/release-notes/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

* Tier2 will now read back mapper outputs (if they exist) to prevent running them again. Additionally, it will not read back the full blocks if its inputs can be satisfied from existing cached mapper outputs.

* Tier2 will skip processing completely if the `output_module` is a mapper that has already been processed (ex: when multiple requests are indexing the same data at the same time)
* Tier2 will skip processing completely if it's processing the last stage and the `output_module` is a mapper that has already been processed (ex: when multiple requests are indexing the same data at the same time)

* Tier2 will skip processing completely if it's processing a stage that is not the last, but all the stores and outputs have been processed and cached.

* The "partial" store outputs no longer contain the trace ID in the filename, allowing them to be reused. If many requests point to the same modules being squashed, the squasher will detect if another Tier1 has squashed its file and reload the store from the produced full KV.

* [Operator] Readiness metric for Substreams tier1 app is now named `substreams_tier1` (was mistakenly called `firehose` before).

Expand Down
5 changes: 1 addition & 4 deletions orchestrator/parallelprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,12 @@ func BuildParallelProcessor(
execoutStorage *execout.Configs,
respFunc func(resp substreams.ResponseFromAnyTier) error,
storeConfigs store.ConfigMap,
traceID string,
) (*ParallelProcessor, error) {

stream := response.New(respFunc)
sched := scheduler.New(ctx, stream)

stages := stage.NewStages(ctx, outputGraph, reqPlan, storeConfigs, traceID)
stages := stage.NewStages(ctx, outputGraph, reqPlan, storeConfigs)
sched.Stages = stages

// OPTIMIZATION: We should fetch the ExecOut files too, and see if they
Expand Down Expand Up @@ -91,7 +90,6 @@ func BuildParallelProcessor(
reqPlan.StoresSegmenter(),
storeConfigs,
execoutStorage,
traceID,
)
if err != nil {
return nil, fmt.Errorf("fetch stores storage state: %w", err)
Expand All @@ -102,7 +100,6 @@ func BuildParallelProcessor(
reqPlan.WriteOutSegmenter(),
storeConfigs,
execoutStorage,
traceID,
)
if err != nil {
return nil, fmt.Errorf("fetch stores storage state: %w", err)
Expand Down
4 changes: 0 additions & 4 deletions orchestrator/stage/fetchstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ func (s *Stages) FetchStoresState(
segmenter *block.Segmenter,
storeConfigMap store.ConfigMap,
execoutConfigs *execout.Configs,
traceID string,
) error {
completes := make(unitMap)
partials := make(unitMap)
Expand Down Expand Up @@ -103,9 +102,6 @@ func (s *Stages) FetchStoresState(
if !rng.Equals(partial.Range) {
continue
}
if traceID != partial.TraceID {
continue
}
unit := Unit{Stage: stageIdx, Segment: segmentIdx}

if s.getState(unit) == UnitCompleted {
Expand Down
8 changes: 7 additions & 1 deletion orchestrator/stage/squash.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (s *Stages) singleSquash(stage *Stage, modState *StoreModuleState, mergeUni
metrics.start = time.Now()

rng := modState.segmenter.Range(mergeUnit.Segment)
partialFile := store.NewPartialFileInfo(modState.name, rng.StartBlock, rng.ExclusiveEndBlock, s.traceID)
partialFile := store.NewPartialFileInfo(modState.name, rng.StartBlock, rng.ExclusiveEndBlock)
partialKV := modState.derivePartialKV(rng.StartBlock)
segmentEndsOnInterval := modState.segmenter.EndsOnInterval(mergeUnit.Segment)

Expand All @@ -72,6 +72,12 @@ func (s *Stages) singleSquash(stage *Stage, modState *StoreModuleState, mergeUni
// Load
metrics.loadStart = time.Now()
if err := partialKV.Load(s.ctx, partialFile); err != nil {
if nextFull, err := modState.getStore(s.ctx, rng.ExclusiveEndBlock); err == nil { // try to load an already-merged file
s.logger.Info("got full store from cache instead of partial, reloading", zap.Stringer("store", nextFull))
modState.cachedStore = nextFull
modState.lastBlockInStore = rng.ExclusiveEndBlock
return nil
}
return fmt.Errorf("loading partial: %q: %w", partialFile.Filename, err)
}
metrics.loadEnd = time.Now()
Expand Down
7 changes: 2 additions & 5 deletions orchestrator/stage/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ import (
// that the Stage is completed, kicking off the next layer of jobs.

type Stages struct {
ctx context.Context
logger *zap.Logger
traceID string
ctx context.Context
logger *zap.Logger

globalSegmenter *block.Segmenter // This segmenter covers both the stores and the mapper
storeSegmenter *block.Segmenter // This segmenter covers only jobs needed to build up stores according to the RequestPlan.
Expand All @@ -58,7 +57,6 @@ func NewStages(
outputGraph *outputmodules.Graph,
reqPlan *plan.RequestPlan,
storeConfigs store.ConfigMap,
traceID string,
) (out *Stages) {

if !reqPlan.RequiresParallelProcessing() {
Expand All @@ -70,7 +68,6 @@ func NewStages(
stagedModules := outputGraph.StagedUsedModules()
out = &Stages{
ctx: ctx,
traceID: traceID,
logger: reqctx.Logger(ctx),
globalSegmenter: reqPlan.BackprocessSegmenter(),
}
Expand Down
2 changes: 0 additions & 2 deletions orchestrator/stage/stages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ func TestNewStages(t *testing.T) {
outputmodules.TestGraphStagedModules(5, 7, 12, 22, 25),
reqPlan,
nil,
"trace",
)

assert.Equal(t, 8, stages.globalSegmenter.Count()) // from 5 to 75
Expand All @@ -49,7 +48,6 @@ func TestNewStagesNextJobs(t *testing.T) {
outputmodules.TestGraphStagedModules(5, 5, 5, 5, 5),
reqPlan,
nil,
"trace",
)

stages.allocSegments(0)
Expand Down
2 changes: 1 addition & 1 deletion orchestrator/work/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func toRPCPartialFiles(completed *pbssinternal.Completed) (out store.FileInfos)
// stores to all having been processed.
out = make(store.FileInfos, len(completed.AllProcessedRanges))
for i, b := range completed.AllProcessedRanges {
out[i] = store.NewPartialFileInfo("TODO:CHANGE-ME", b.StartBlock, b.EndBlock, completed.TraceId)
out[i] = store.NewPartialFileInfo("TODO:CHANGE-ME", b.StartBlock, b.EndBlock)
}
return
}
4 changes: 2 additions & 2 deletions pb/last_generate.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
generate.sh - Thu 14 Mar 2024 10:02:19 EDT - arnaudberger
streamingfast/proto revision: 2999cd42d71a82c4adf739557f9520f0609f7b10
generate.sh - Tue Mar 19 09:09:54 EDT 2024 - stepd
streamingfast/proto revision: fde3637cef38103a68301d0ca19b3f2af9b6079b
Loading

0 comments on commit 0077dd1

Please sign in to comment.