Skip to content

Commit

Permalink
partial (unsquashed) stores no longer contain traceID in filename, so…
Browse files Browse the repository at this point in the history
… they can be reused, squasher will handle this gracefully
  • Loading branch information
sduchesneau committed Mar 18, 2024
1 parent c7dfc88 commit 678e9c8
Show file tree
Hide file tree
Showing 29 changed files with 64 additions and 191 deletions.
6 changes: 5 additions & 1 deletion docs/release-notes/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

* Tier2 will now read back mapper outputs (if they exist) to prevent running them again. Additionally, it will not read back the full blocks if its inputs can be satisfied from existing cached mapper outputs.

* Tier2 will skip processing completely if the `output_module` is a mapper that has already been processed (ex: when multiple requests are indexing the same data at the same time)
* Tier2 will skip processing completely if it's processing the last stage and the `output_module` is a mapper that has already been processed (ex: when multiple requests are indexing the same data at the same time)

* Tier2 will skip processing completely if it's processing a stage that is not the last, but all the stores and outputs have been processed and cached.

* The "partial" store outputs no longer contain the trace ID in the filename, allowing them to be reused. If many requests point to the same modules being squashed, the squasher will detect if another Tier1 has squashed its file and reload the store from the produced full KV.

* [Operator] Readiness metric for Substreams tier1 app is now named `substreams_tier1` (was mistakenly called `firehose` before).

Expand Down
5 changes: 1 addition & 4 deletions orchestrator/parallelprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,12 @@ func BuildParallelProcessor(
execoutStorage *execout.Configs,
respFunc func(resp substreams.ResponseFromAnyTier) error,
storeConfigs store.ConfigMap,
traceID string,
) (*ParallelProcessor, error) {

stream := response.New(respFunc)
sched := scheduler.New(ctx, stream)

stages := stage.NewStages(ctx, outputGraph, reqPlan, storeConfigs, traceID)
stages := stage.NewStages(ctx, outputGraph, reqPlan, storeConfigs)
sched.Stages = stages

// OPTIMIZATION: We should fetch the ExecOut files too, and see if they
Expand Down Expand Up @@ -91,7 +90,6 @@ func BuildParallelProcessor(
reqPlan.StoresSegmenter(),
storeConfigs,
execoutStorage,
traceID,
)
if err != nil {
return nil, fmt.Errorf("fetch stores storage state: %w", err)
Expand All @@ -102,7 +100,6 @@ func BuildParallelProcessor(
reqPlan.WriteOutSegmenter(),
storeConfigs,
execoutStorage,
traceID,
)
if err != nil {
return nil, fmt.Errorf("fetch stores storage state: %w", err)
Expand Down
4 changes: 0 additions & 4 deletions orchestrator/stage/fetchstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ func (s *Stages) FetchStoresState(
segmenter *block.Segmenter,
storeConfigMap store.ConfigMap,
execoutConfigs *execout.Configs,
traceID string,
) error {
completes := make(unitMap)
partials := make(unitMap)
Expand Down Expand Up @@ -103,9 +102,6 @@ func (s *Stages) FetchStoresState(
if !rng.Equals(partial.Range) {
continue
}
if traceID != partial.TraceID {
continue
}
unit := Unit{Stage: stageIdx, Segment: segmentIdx}

if s.getState(unit) == UnitCompleted {
Expand Down
9 changes: 8 additions & 1 deletion orchestrator/stage/squash.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,12 @@ func (s *Stages) multiSquash(stage *Stage, mergeUnit Unit) error {
// We keep the cache of the latest FullKV store, to speed up things
// if they are linear
func (s *Stages) singleSquash(stage *Stage, modState *StoreModuleState, mergeUnit Unit) error {
fmt.Println("SQUASHING", modState.name)
metrics := mergeMetrics{}
metrics.start = time.Now()

rng := modState.segmenter.Range(mergeUnit.Segment)
partialFile := store.NewPartialFileInfo(modState.name, rng.StartBlock, rng.ExclusiveEndBlock, s.traceID)
partialFile := store.NewPartialFileInfo(modState.name, rng.StartBlock, rng.ExclusiveEndBlock)
partialKV := modState.derivePartialKV(rng.StartBlock)
segmentEndsOnInterval := modState.segmenter.EndsOnInterval(mergeUnit.Segment)

Expand All @@ -72,6 +73,12 @@ func (s *Stages) singleSquash(stage *Stage, modState *StoreModuleState, mergeUni
// Load
metrics.loadStart = time.Now()
if err := partialKV.Load(s.ctx, partialFile); err != nil {
if nextFull, err := modState.getStore(s.ctx, rng.ExclusiveEndBlock); err == nil { // try to load an already-merged file
s.logger.Info("got full store from cache instead of partial, reloading", zap.Stringer("store", nextFull))
modState.cachedStore = nextFull
modState.lastBlockInStore = rng.ExclusiveEndBlock
return nil
}
return fmt.Errorf("loading partial: %q: %w", partialFile.Filename, err)
}
metrics.loadEnd = time.Now()
Expand Down
7 changes: 2 additions & 5 deletions orchestrator/stage/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ import (
// that the Stage is completed, kicking off the next layer of jobs.

type Stages struct {
ctx context.Context
logger *zap.Logger
traceID string
ctx context.Context
logger *zap.Logger

globalSegmenter *block.Segmenter // This segmenter covers both the stores and the mapper
storeSegmenter *block.Segmenter // This segmenter covers only jobs needed to build up stores according to the RequestPlan.
Expand All @@ -58,7 +57,6 @@ func NewStages(
outputGraph *outputmodules.Graph,
reqPlan *plan.RequestPlan,
storeConfigs store.ConfigMap,
traceID string,
) (out *Stages) {

if !reqPlan.RequiresParallelProcessing() {
Expand All @@ -70,7 +68,6 @@ func NewStages(
stagedModules := outputGraph.StagedUsedModules()
out = &Stages{
ctx: ctx,
traceID: traceID,
logger: reqctx.Logger(ctx),
globalSegmenter: reqPlan.BackprocessSegmenter(),
}
Expand Down
2 changes: 0 additions & 2 deletions orchestrator/stage/stages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ func TestNewStages(t *testing.T) {
outputmodules.TestGraphStagedModules(5, 7, 12, 22, 25),
reqPlan,
nil,
"trace",
)

assert.Equal(t, 8, stages.globalSegmenter.Count()) // from 5 to 75
Expand All @@ -49,7 +48,6 @@ func TestNewStagesNextJobs(t *testing.T) {
outputmodules.TestGraphStagedModules(5, 5, 5, 5, 5),
reqPlan,
nil,
"trace",
)

stages.allocSegments(0)
Expand Down
2 changes: 1 addition & 1 deletion orchestrator/work/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func toRPCPartialFiles(completed *pbssinternal.Completed) (out store.FileInfos)
// stores to all having been processed.
out = make(store.FileInfos, len(completed.AllProcessedRanges))
for i, b := range completed.AllProcessedRanges {
out[i] = store.NewPartialFileInfo("TODO:CHANGE-ME", b.StartBlock, b.EndBlock, completed.TraceId)
out[i] = store.NewPartialFileInfo("TODO:CHANGE-ME", b.StartBlock, b.EndBlock)
}
return
}
4 changes: 0 additions & 4 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ type Pipeline struct {
// (for chains with potential block skips)
lastFinalClock *pbsubstreams.Clock

traceID string
blockStepMap map[bstream.StepType]uint64
}

Expand All @@ -88,7 +87,6 @@ func New(
execOutputCache *cache.Engine,
runtimeConfig config.RuntimeConfig,
respFunc substreams.ResponseFunc,
traceID string,
opts ...Option,
) *Pipeline {
pipe := &Pipeline{
Expand All @@ -102,7 +100,6 @@ func New(
stores: stores,
execoutStorage: execoutStorage,
forkHandler: NewForkHandler(),
traceID: traceID,
blockStepMap: make(map[bstream.StepType]uint64),
startTime: time.Now(),
}
Expand Down Expand Up @@ -262,7 +259,6 @@ func (p *Pipeline) runParallelProcess(ctx context.Context, reqPlan *plan.Request
p.execoutStorage,
p.respFunc,
p.stores.configs,
p.traceID,
)
if err != nil {
return nil, fmt.Errorf("building parallel processor: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func testConfigMap(t *testing.T, configs []testStoreConfig) store2.ConfigMap {
objStore := dstore.NewMockStore(nil)

for _, conf := range configs {
newStore, err := store2.NewConfig(conf.name, conf.initBlock, conf.name, pbsubstreams.Module_KindStore_UPDATE_POLICY_SET, "string", objStore, "")
newStore, err := store2.NewConfig(conf.name, conf.initBlock, conf.name, pbsubstreams.Module_KindStore_UPDATE_POLICY_SET, "string", objStore)
require.NoError(t, err)
confMap[newStore.Name()] = newStore

Expand Down
18 changes: 2 additions & 16 deletions service/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,8 @@ import (
pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2"
"github.com/streamingfast/substreams/pipeline/outputmodules"
"github.com/streamingfast/substreams/service/config"
"github.com/streamingfast/substreams/storage/store"
)

// TestTraceID must be used everywhere a TraceID is required. It must be the same
// between tier1 and tier2, otherwise tier1 will not find the file produced by
// tier2 correctly.
var TestTraceID = "00000000000000000000000000000000"

func TestTraceIDParam() store.TraceIDParam {
return store.TraceIDParam(TestTraceID)
}

func TestNewService(runtimeConfig config.RuntimeConfig, linearHandoffBlockNum uint64, streamFactoryFunc StreamFactoryFunc) *Tier1Service {
return &Tier1Service{
blockType: "sf.substreams.v1.test.Block",
Expand Down Expand Up @@ -58,10 +48,6 @@ func TestNewServiceTier2(runtimeConfig config.RuntimeConfig, streamFactoryFunc S
}
}

func (s *Tier2Service) TestProcessRange(ctx context.Context, request *pbssinternal.ProcessRangeRequest, respFunc substreams.ResponseFunc, traceID *string) error {
if traceID == nil {
traceID = &TestTraceID
}

return s.processRange(ctx, request, respFunc, *traceID)
func (s *Tier2Service) TestProcessRange(ctx context.Context, request *pbssinternal.ProcessRangeRequest, respFunc substreams.ResponseFunc) error {
return s.processRange(ctx, request, respFunc)
}
3 changes: 1 addition & 2 deletions service/tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ
return fmt.Errorf("new config map: %w", err)
}

storeConfigs, err := store.NewConfigMap(cacheStore, outputGraph.Stores(), outputGraph.ModuleHashes(), tracing.GetTraceID(ctx).String())
storeConfigs, err := store.NewConfigMap(cacheStore, outputGraph.Stores(), outputGraph.ModuleHashes())
if err != nil {
return fmt.Errorf("configuring stores: %w", err)
}
Expand Down Expand Up @@ -433,7 +433,6 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ
execOutputCacheEngine,
s.runtimeConfig,
respFunc,
tracing.GetTraceID(ctx).String(),
opts...,
)

Expand Down
23 changes: 15 additions & 8 deletions service/tier2.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (s *Tier2Service) ProcessRange(request *pbssinternal.ProcessRangeRequest, s
logger.Info("incoming substreams ProcessRange request", fields...)

respFunc := tier2ResponseHandler(ctx, logger, streamSrv)
err = s.processRange(ctx, request, respFunc, tracing.GetTraceID(ctx).String())
err = s.processRange(ctx, request, respFunc)
grpcError = toGRPCError(ctx, err)

switch status.Code(grpcError) {
Expand All @@ -232,7 +232,7 @@ func (s *Tier2Service) ProcessRange(request *pbssinternal.ProcessRangeRequest, s
return grpcError
}

func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.ProcessRangeRequest, respFunc substreams.ResponseFunc, traceID string) error {
func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.ProcessRangeRequest, respFunc substreams.ResponseFunc) error {
logger := reqctx.Logger(ctx)

if err := outputmodules.ValidateTier2Request(request, s.blockType); err != nil {
Expand Down Expand Up @@ -283,7 +283,7 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P
return fmt.Errorf("new config map: %w", err)
}

storeConfigs, err := store.NewConfigMap(cacheStore, outputGraph.Stores(), outputGraph.ModuleHashes(), traceID)
storeConfigs, err := store.NewConfigMap(cacheStore, outputGraph.Stores(), outputGraph.ModuleHashes())
if err != nil {
return fmt.Errorf("configuring stores: %w", err)
}
Expand Down Expand Up @@ -322,7 +322,6 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P
s.runtimeConfig,
respFunc,
// This must always be the parent/global trace id, the one that comes from tier1
traceID,
opts...,
)

Expand Down Expand Up @@ -437,20 +436,28 @@ func evaluateModulesRequiredToRun(

if c.ModuleKind() == pbsubstreams.ModuleKindMap {
if runningLastStage && name == outputModule {
// WARNING be careful, if we want to force producing module outputs/stores states for ALL STAGES on the first block range,
// this optimization will be in our way..
logger.Info("found existing exec output for output_module, skipping run", zap.String("output_module", name))
return nil, nil, nil, nil
}
continue
}

// TODO when the partial KV stores are back to being 'generic' (without traceID), we will also be able to skip the store if the partial exists
// if either full or partial kv exists, we can skip the module
storeExists, err := storeConfigs[name].ExistsFullKV(ctx, stopBlock)
if err != nil {
return nil, nil, nil, fmt.Errorf("checking file existence: %w", err)
return nil, nil, nil, fmt.Errorf("checking fullkv file existence: %w", err)
}
if !storeExists {
// some stores may already exist completely on this stage, but others do not, so we keep going but ignore those
requiredModules[name] = usedModules[name]
partialStoreExists, err := storeConfigs[name].ExistsPartialKV(ctx, startBlock, stopBlock)
if err != nil {
return nil, nil, nil, fmt.Errorf("checking partial file existence: %w", err)
}
if !partialStoreExists {
// some stores may already exist completely on this stage, but others do not, so we keep going but ignore those
requiredModules[name] = usedModules[name]
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions storage/store/base_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ func TestFileName(t *testing.T) {
stateFileName := FullStateFileName(&block.Range{StartBlock: 100, ExclusiveEndBlock: 10000})
require.Equal(t, "0000010000-0000000100.kv", stateFileName)

partialFileName := PartialFileName(&block.Range{StartBlock: 10000, ExclusiveEndBlock: 20000}, "abc")
require.Equal(t, "0000020000-0000010000.abc.partial", partialFileName)
partialFileName := PartialFileName(&block.Range{StartBlock: 10000, ExclusiveEndBlock: 20000})
require.Equal(t, "0000020000-0000010000.partial", partialFileName)

partialFileNameLegacy := PartialFileName(&block.Range{StartBlock: 10000, ExclusiveEndBlock: 20000}, "")
partialFileNameLegacy := PartialFileName(&block.Range{StartBlock: 10000, ExclusiveEndBlock: 20000})
require.Equal(t, "0000020000-0000010000.partial", partialFileNameLegacy)
}

Expand Down
12 changes: 5 additions & 7 deletions storage/store/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@ type Config struct {
appendLimit uint64
totalSizeLimit uint64
itemSizeLimit uint64

// traceID uniquely identifies the connection ID so that store can be
// written to unique filename preventing some races when multiple Substreams
// request works on the same range.
traceID string
}

func NewConfig(
Expand All @@ -40,7 +35,6 @@ func NewConfig(
updatePolicy pbsubstreams.Module_KindStore_UpdatePolicy,
valueType string,
store dstore.Store,
traceID string,
) (*Config, error) {
subStore, err := store.SubStore(fmt.Sprintf("%s/states", moduleHash))
if err != nil {
Expand All @@ -57,7 +51,6 @@ func NewConfig(
appendLimit: 8_388_608, // 8MiB = 8 * 1024 * 1024,
totalSizeLimit: 1_073_741_824, // 1GiB
itemSizeLimit: 10_485_760, // 10MiB
traceID: traceID,
}, nil
}

Expand Down Expand Up @@ -99,6 +92,11 @@ func (c *Config) ExistsFullKV(ctx context.Context, upTo uint64) (bool, error) {
return c.objStore.FileExists(ctx, filename)
}

func (c *Config) ExistsPartialKV(ctx context.Context, from, to uint64) (bool, error) {
filename := PartialFileName(block.NewRange(from, to))
return c.objStore.FileExists(ctx, filename)
}

func (c *Config) NewPartialKV(initialBlock uint64, logger *zap.Logger) *PartialKV {
return &PartialKV{
baseStore: c.newBaseStore(logger),
Expand Down
3 changes: 1 addition & 2 deletions storage/store/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

type ConfigMap map[string]*Config

func NewConfigMap(baseObjectStore dstore.Store, storeModules []*pbsubstreams.Module, moduleHashes *manifest.ModuleHashes, traceID string) (out ConfigMap, err error) {
func NewConfigMap(baseObjectStore dstore.Store, storeModules []*pbsubstreams.Module, moduleHashes *manifest.ModuleHashes) (out ConfigMap, err error) {
out = make(ConfigMap)
for _, storeModule := range storeModules {
c, err := NewConfig(
Expand All @@ -20,7 +20,6 @@ func NewConfigMap(baseObjectStore dstore.Store, storeModules []*pbsubstreams.Mod
storeModule.GetKindStore().UpdatePolicy,
storeModule.GetKindStore().ValueType,
baseObjectStore,
traceID,
)
if err != nil {
return nil, fmt.Errorf("new store config for %q: %w", storeModule.Name, err)
Expand Down
Loading

0 comments on commit 678e9c8

Please sign in to comment.