From 678e9c81b2eb7d60099ec7703a580aea7b4e026c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Duchesneau?= Date: Mon, 18 Mar 2024 16:21:17 -0400 Subject: [PATCH] partial (unsquashed) stores no longer contain traceID in filename, so they can be reused, squasher will handle this gracefully --- docs/release-notes/change-log.md | 6 +- orchestrator/parallelprocessor.go | 5 +- orchestrator/stage/fetchstorage.go | 4 -- orchestrator/stage/squash.go | 9 ++- orchestrator/stage/stages.go | 7 +-- orchestrator/stage/stages_test.go | 2 - orchestrator/work/worker.go | 2 +- pipeline/pipeline.go | 4 -- pipeline/pipeline_test.go | 2 +- service/testing.go | 18 +----- service/tier1.go | 3 +- service/tier2.go | 23 +++++--- storage/store/base_store_test.go | 6 +- storage/store/config.go | 12 ++-- storage/store/configmap.go | 3 +- storage/store/filename.go | 16 ++---- storage/store/filename_test.go | 12 +--- storage/store/init_test.go | 2 +- storage/store/partial_kv.go | 2 +- storage/store/state/snapshot.go | 6 -- storage/store/testing.go | 8 +-- test/integration_test.go | 89 ++---------------------------- test/runnable_test.go | 3 +- test/worker_test.go | 3 +- tools/analytics_store_stats.go | 1 - tools/check.go | 2 +- tools/decode.go | 2 +- tools/module.go | 1 - wasm/call_test.go | 2 +- 29 files changed, 64 insertions(+), 191 deletions(-) diff --git a/docs/release-notes/change-log.md b/docs/release-notes/change-log.md index 682a06be0..3223aaad3 100644 --- a/docs/release-notes/change-log.md +++ b/docs/release-notes/change-log.md @@ -22,7 +22,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), * Tier2 will now read back mapper outputs (if they exist) to prevent running them again. Additionally, it will not read back the full blocks if its inputs can be satisfied from existing cached mapper outputs. -* Tier2 will skip processing completely if the `output_module` is a mapper that has already been processed (ex: when multiple requests are indexing the same data at the same time) +* Tier2 will skip processing completely if it's processing the last stage and the `output_module` is a mapper that has already been processed (ex: when multiple requests are indexing the same data at the same time) + +* Tier2 will skip processing completely if it's processing a stage that is not the last, but all the stores and outputs have been processed and cached. + +* The "partial" store outputs no longer contain the trace ID in the filename, allowing them to be reused. If many requests point to the same modules being squashed, the squasher will detect if another Tier1 has squashed its file and reload the store from the produced full KV. * [Operator] Readiness metric for Substreams tier1 app is now named `substreams_tier1` (was mistakenly called `firehose` before). diff --git a/orchestrator/parallelprocessor.go b/orchestrator/parallelprocessor.go index d8ba12180..06ef6d427 100644 --- a/orchestrator/parallelprocessor.go +++ b/orchestrator/parallelprocessor.go @@ -33,13 +33,12 @@ func BuildParallelProcessor( execoutStorage *execout.Configs, respFunc func(resp substreams.ResponseFromAnyTier) error, storeConfigs store.ConfigMap, - traceID string, ) (*ParallelProcessor, error) { stream := response.New(respFunc) sched := scheduler.New(ctx, stream) - stages := stage.NewStages(ctx, outputGraph, reqPlan, storeConfigs, traceID) + stages := stage.NewStages(ctx, outputGraph, reqPlan, storeConfigs) sched.Stages = stages // OPTIMIZATION: We should fetch the ExecOut files too, and see if they @@ -91,7 +90,6 @@ func BuildParallelProcessor( reqPlan.StoresSegmenter(), storeConfigs, execoutStorage, - traceID, ) if err != nil { return nil, fmt.Errorf("fetch stores storage state: %w", err) @@ -102,7 +100,6 @@ func BuildParallelProcessor( reqPlan.WriteOutSegmenter(), storeConfigs, execoutStorage, - traceID, ) if err != nil { return nil, fmt.Errorf("fetch stores storage state: %w", err) diff --git a/orchestrator/stage/fetchstorage.go b/orchestrator/stage/fetchstorage.go index a3ccd5696..61fdba561 100644 --- a/orchestrator/stage/fetchstorage.go +++ b/orchestrator/stage/fetchstorage.go @@ -17,7 +17,6 @@ func (s *Stages) FetchStoresState( segmenter *block.Segmenter, storeConfigMap store.ConfigMap, execoutConfigs *execout.Configs, - traceID string, ) error { completes := make(unitMap) partials := make(unitMap) @@ -103,9 +102,6 @@ func (s *Stages) FetchStoresState( if !rng.Equals(partial.Range) { continue } - if traceID != partial.TraceID { - continue - } unit := Unit{Stage: stageIdx, Segment: segmentIdx} if s.getState(unit) == UnitCompleted { diff --git a/orchestrator/stage/squash.go b/orchestrator/stage/squash.go index bc22c3047..650597b77 100644 --- a/orchestrator/stage/squash.go +++ b/orchestrator/stage/squash.go @@ -54,11 +54,12 @@ func (s *Stages) multiSquash(stage *Stage, mergeUnit Unit) error { // We keep the cache of the latest FullKV store, to speed up things // if they are linear func (s *Stages) singleSquash(stage *Stage, modState *StoreModuleState, mergeUnit Unit) error { + fmt.Println("SQUASHING", modState.name) metrics := mergeMetrics{} metrics.start = time.Now() rng := modState.segmenter.Range(mergeUnit.Segment) - partialFile := store.NewPartialFileInfo(modState.name, rng.StartBlock, rng.ExclusiveEndBlock, s.traceID) + partialFile := store.NewPartialFileInfo(modState.name, rng.StartBlock, rng.ExclusiveEndBlock) partialKV := modState.derivePartialKV(rng.StartBlock) segmentEndsOnInterval := modState.segmenter.EndsOnInterval(mergeUnit.Segment) @@ -72,6 +73,12 @@ func (s *Stages) singleSquash(stage *Stage, modState *StoreModuleState, mergeUni // Load metrics.loadStart = time.Now() if err := partialKV.Load(s.ctx, partialFile); err != nil { + if nextFull, err := modState.getStore(s.ctx, rng.ExclusiveEndBlock); err == nil { // try to load an already-merged file + s.logger.Info("got full store from cache instead of partial, reloading", zap.Stringer("store", nextFull)) + modState.cachedStore = nextFull + modState.lastBlockInStore = rng.ExclusiveEndBlock + return nil + } return fmt.Errorf("loading partial: %q: %w", partialFile.Filename, err) } metrics.loadEnd = time.Now() diff --git a/orchestrator/stage/stages.go b/orchestrator/stage/stages.go index 3c0bce35d..2e5af06c0 100644 --- a/orchestrator/stage/stages.go +++ b/orchestrator/stage/stages.go @@ -32,9 +32,8 @@ import ( // that the Stage is completed, kicking off the next layer of jobs. type Stages struct { - ctx context.Context - logger *zap.Logger - traceID string + ctx context.Context + logger *zap.Logger globalSegmenter *block.Segmenter // This segmenter covers both the stores and the mapper storeSegmenter *block.Segmenter // This segmenter covers only jobs needed to build up stores according to the RequestPlan. @@ -58,7 +57,6 @@ func NewStages( outputGraph *outputmodules.Graph, reqPlan *plan.RequestPlan, storeConfigs store.ConfigMap, - traceID string, ) (out *Stages) { if !reqPlan.RequiresParallelProcessing() { @@ -70,7 +68,6 @@ func NewStages( stagedModules := outputGraph.StagedUsedModules() out = &Stages{ ctx: ctx, - traceID: traceID, logger: reqctx.Logger(ctx), globalSegmenter: reqPlan.BackprocessSegmenter(), } diff --git a/orchestrator/stage/stages_test.go b/orchestrator/stage/stages_test.go index 9167d86fc..9adad77f0 100644 --- a/orchestrator/stage/stages_test.go +++ b/orchestrator/stage/stages_test.go @@ -23,7 +23,6 @@ func TestNewStages(t *testing.T) { outputmodules.TestGraphStagedModules(5, 7, 12, 22, 25), reqPlan, nil, - "trace", ) assert.Equal(t, 8, stages.globalSegmenter.Count()) // from 5 to 75 @@ -49,7 +48,6 @@ func TestNewStagesNextJobs(t *testing.T) { outputmodules.TestGraphStagedModules(5, 5, 5, 5, 5), reqPlan, nil, - "trace", ) stages.allocSegments(0) diff --git a/orchestrator/work/worker.go b/orchestrator/work/worker.go index 7bbc36880..c11d7efbd 100644 --- a/orchestrator/work/worker.go +++ b/orchestrator/work/worker.go @@ -288,7 +288,7 @@ func toRPCPartialFiles(completed *pbssinternal.Completed) (out store.FileInfos) // stores to all having been processed. out = make(store.FileInfos, len(completed.AllProcessedRanges)) for i, b := range completed.AllProcessedRanges { - out[i] = store.NewPartialFileInfo("TODO:CHANGE-ME", b.StartBlock, b.EndBlock, completed.TraceId) + out[i] = store.NewPartialFileInfo("TODO:CHANGE-ME", b.StartBlock, b.EndBlock) } return } diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 15e0675ff..ca592200f 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -75,7 +75,6 @@ type Pipeline struct { // (for chains with potential block skips) lastFinalClock *pbsubstreams.Clock - traceID string blockStepMap map[bstream.StepType]uint64 } @@ -88,7 +87,6 @@ func New( execOutputCache *cache.Engine, runtimeConfig config.RuntimeConfig, respFunc substreams.ResponseFunc, - traceID string, opts ...Option, ) *Pipeline { pipe := &Pipeline{ @@ -102,7 +100,6 @@ func New( stores: stores, execoutStorage: execoutStorage, forkHandler: NewForkHandler(), - traceID: traceID, blockStepMap: make(map[bstream.StepType]uint64), startTime: time.Now(), } @@ -262,7 +259,6 @@ func (p *Pipeline) runParallelProcess(ctx context.Context, reqPlan *plan.Request p.execoutStorage, p.respFunc, p.stores.configs, - p.traceID, ) if err != nil { return nil, fmt.Errorf("building parallel processor: %w", err) diff --git a/pipeline/pipeline_test.go b/pipeline/pipeline_test.go index c05c7ddd0..12f5d5342 100644 --- a/pipeline/pipeline_test.go +++ b/pipeline/pipeline_test.go @@ -186,7 +186,7 @@ func testConfigMap(t *testing.T, configs []testStoreConfig) store2.ConfigMap { objStore := dstore.NewMockStore(nil) for _, conf := range configs { - newStore, err := store2.NewConfig(conf.name, conf.initBlock, conf.name, pbsubstreams.Module_KindStore_UPDATE_POLICY_SET, "string", objStore, "") + newStore, err := store2.NewConfig(conf.name, conf.initBlock, conf.name, pbsubstreams.Module_KindStore_UPDATE_POLICY_SET, "string", objStore) require.NoError(t, err) confMap[newStore.Name()] = newStore diff --git a/service/testing.go b/service/testing.go index 30a935011..1766479a2 100644 --- a/service/testing.go +++ b/service/testing.go @@ -11,18 +11,8 @@ import ( pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2" "github.com/streamingfast/substreams/pipeline/outputmodules" "github.com/streamingfast/substreams/service/config" - "github.com/streamingfast/substreams/storage/store" ) -// TestTraceID must be used everywhere a TraceID is required. It must be the same -// between tier1 and tier2, otherwise tier1 will not find the file produced by -// tier2 correctly. -var TestTraceID = "00000000000000000000000000000000" - -func TestTraceIDParam() store.TraceIDParam { - return store.TraceIDParam(TestTraceID) -} - func TestNewService(runtimeConfig config.RuntimeConfig, linearHandoffBlockNum uint64, streamFactoryFunc StreamFactoryFunc) *Tier1Service { return &Tier1Service{ blockType: "sf.substreams.v1.test.Block", @@ -58,10 +48,6 @@ func TestNewServiceTier2(runtimeConfig config.RuntimeConfig, streamFactoryFunc S } } -func (s *Tier2Service) TestProcessRange(ctx context.Context, request *pbssinternal.ProcessRangeRequest, respFunc substreams.ResponseFunc, traceID *string) error { - if traceID == nil { - traceID = &TestTraceID - } - - return s.processRange(ctx, request, respFunc, *traceID) +func (s *Tier2Service) TestProcessRange(ctx context.Context, request *pbssinternal.ProcessRangeRequest, respFunc substreams.ResponseFunc) error { + return s.processRange(ctx, request, respFunc) } diff --git a/service/tier1.go b/service/tier1.go index 7f6a86a7b..95708bf35 100644 --- a/service/tier1.go +++ b/service/tier1.go @@ -399,7 +399,7 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ return fmt.Errorf("new config map: %w", err) } - storeConfigs, err := store.NewConfigMap(cacheStore, outputGraph.Stores(), outputGraph.ModuleHashes(), tracing.GetTraceID(ctx).String()) + storeConfigs, err := store.NewConfigMap(cacheStore, outputGraph.Stores(), outputGraph.ModuleHashes()) if err != nil { return fmt.Errorf("configuring stores: %w", err) } @@ -433,7 +433,6 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ execOutputCacheEngine, s.runtimeConfig, respFunc, - tracing.GetTraceID(ctx).String(), opts..., ) diff --git a/service/tier2.go b/service/tier2.go index 077cd1103..545238f74 100644 --- a/service/tier2.go +++ b/service/tier2.go @@ -221,7 +221,7 @@ func (s *Tier2Service) ProcessRange(request *pbssinternal.ProcessRangeRequest, s logger.Info("incoming substreams ProcessRange request", fields...) respFunc := tier2ResponseHandler(ctx, logger, streamSrv) - err = s.processRange(ctx, request, respFunc, tracing.GetTraceID(ctx).String()) + err = s.processRange(ctx, request, respFunc) grpcError = toGRPCError(ctx, err) switch status.Code(grpcError) { @@ -232,7 +232,7 @@ func (s *Tier2Service) ProcessRange(request *pbssinternal.ProcessRangeRequest, s return grpcError } -func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.ProcessRangeRequest, respFunc substreams.ResponseFunc, traceID string) error { +func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.ProcessRangeRequest, respFunc substreams.ResponseFunc) error { logger := reqctx.Logger(ctx) if err := outputmodules.ValidateTier2Request(request, s.blockType); err != nil { @@ -283,7 +283,7 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P return fmt.Errorf("new config map: %w", err) } - storeConfigs, err := store.NewConfigMap(cacheStore, outputGraph.Stores(), outputGraph.ModuleHashes(), traceID) + storeConfigs, err := store.NewConfigMap(cacheStore, outputGraph.Stores(), outputGraph.ModuleHashes()) if err != nil { return fmt.Errorf("configuring stores: %w", err) } @@ -322,7 +322,6 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P s.runtimeConfig, respFunc, // This must always be the parent/global trace id, the one that comes from tier1 - traceID, opts..., ) @@ -437,20 +436,28 @@ func evaluateModulesRequiredToRun( if c.ModuleKind() == pbsubstreams.ModuleKindMap { if runningLastStage && name == outputModule { + // WARNING be careful, if we want to force producing module outputs/stores states for ALL STAGES on the first block range, + // this optimization will be in our way.. logger.Info("found existing exec output for output_module, skipping run", zap.String("output_module", name)) return nil, nil, nil, nil } continue } - // TODO when the partial KV stores are back to being 'generic' (without traceID), we will also be able to skip the store if the partial exists + // if either full or partial kv exists, we can skip the module storeExists, err := storeConfigs[name].ExistsFullKV(ctx, stopBlock) if err != nil { - return nil, nil, nil, fmt.Errorf("checking file existence: %w", err) + return nil, nil, nil, fmt.Errorf("checking fullkv file existence: %w", err) } if !storeExists { - // some stores may already exist completely on this stage, but others do not, so we keep going but ignore those - requiredModules[name] = usedModules[name] + partialStoreExists, err := storeConfigs[name].ExistsPartialKV(ctx, startBlock, stopBlock) + if err != nil { + return nil, nil, nil, fmt.Errorf("checking partial file existence: %w", err) + } + if !partialStoreExists { + // some stores may already exist completely on this stage, but others do not, so we keep going but ignore those + requiredModules[name] = usedModules[name] + } } } diff --git a/storage/store/base_store_test.go b/storage/store/base_store_test.go index 5e55ae9da..8b8187e87 100644 --- a/storage/store/base_store_test.go +++ b/storage/store/base_store_test.go @@ -81,10 +81,10 @@ func TestFileName(t *testing.T) { stateFileName := FullStateFileName(&block.Range{StartBlock: 100, ExclusiveEndBlock: 10000}) require.Equal(t, "0000010000-0000000100.kv", stateFileName) - partialFileName := PartialFileName(&block.Range{StartBlock: 10000, ExclusiveEndBlock: 20000}, "abc") - require.Equal(t, "0000020000-0000010000.abc.partial", partialFileName) + partialFileName := PartialFileName(&block.Range{StartBlock: 10000, ExclusiveEndBlock: 20000}) + require.Equal(t, "0000020000-0000010000.partial", partialFileName) - partialFileNameLegacy := PartialFileName(&block.Range{StartBlock: 10000, ExclusiveEndBlock: 20000}, "") + partialFileNameLegacy := PartialFileName(&block.Range{StartBlock: 10000, ExclusiveEndBlock: 20000}) require.Equal(t, "0000020000-0000010000.partial", partialFileNameLegacy) } diff --git a/storage/store/config.go b/storage/store/config.go index 869d3fa2e..6a5909992 100644 --- a/storage/store/config.go +++ b/storage/store/config.go @@ -26,11 +26,6 @@ type Config struct { appendLimit uint64 totalSizeLimit uint64 itemSizeLimit uint64 - - // traceID uniquely identifies the connection ID so that store can be - // written to unique filename preventing some races when multiple Substreams - // request works on the same range. - traceID string } func NewConfig( @@ -40,7 +35,6 @@ func NewConfig( updatePolicy pbsubstreams.Module_KindStore_UpdatePolicy, valueType string, store dstore.Store, - traceID string, ) (*Config, error) { subStore, err := store.SubStore(fmt.Sprintf("%s/states", moduleHash)) if err != nil { @@ -57,7 +51,6 @@ func NewConfig( appendLimit: 8_388_608, // 8MiB = 8 * 1024 * 1024, totalSizeLimit: 1_073_741_824, // 1GiB itemSizeLimit: 10_485_760, // 10MiB - traceID: traceID, }, nil } @@ -99,6 +92,11 @@ func (c *Config) ExistsFullKV(ctx context.Context, upTo uint64) (bool, error) { return c.objStore.FileExists(ctx, filename) } +func (c *Config) ExistsPartialKV(ctx context.Context, from, to uint64) (bool, error) { + filename := PartialFileName(block.NewRange(from, to)) + return c.objStore.FileExists(ctx, filename) +} + func (c *Config) NewPartialKV(initialBlock uint64, logger *zap.Logger) *PartialKV { return &PartialKV{ baseStore: c.newBaseStore(logger), diff --git a/storage/store/configmap.go b/storage/store/configmap.go index 64d225806..e0a9d1801 100644 --- a/storage/store/configmap.go +++ b/storage/store/configmap.go @@ -10,7 +10,7 @@ import ( type ConfigMap map[string]*Config -func NewConfigMap(baseObjectStore dstore.Store, storeModules []*pbsubstreams.Module, moduleHashes *manifest.ModuleHashes, traceID string) (out ConfigMap, err error) { +func NewConfigMap(baseObjectStore dstore.Store, storeModules []*pbsubstreams.Module, moduleHashes *manifest.ModuleHashes) (out ConfigMap, err error) { out = make(ConfigMap) for _, storeModule := range storeModules { c, err := NewConfig( @@ -20,7 +20,6 @@ func NewConfigMap(baseObjectStore dstore.Store, storeModules []*pbsubstreams.Mod storeModule.GetKindStore().UpdatePolicy, storeModule.GetKindStore().ValueType, baseObjectStore, - traceID, ) if err != nil { return nil, fmt.Errorf("new store config for %q: %w", storeModule.Name, err) diff --git a/storage/store/filename.go b/storage/store/filename.go index 1018c72ff..b4294f782 100644 --- a/storage/store/filename.go +++ b/storage/store/filename.go @@ -38,7 +38,6 @@ type FileInfo struct { ModuleName string Filename string Range *block.Range - TraceID string Partial bool } @@ -53,14 +52,13 @@ func NewCompleteFileInfo(moduleName string, moduleInitialBlock uint64, exclusive } } -func NewPartialFileInfo(moduleName string, start uint64, exclusiveEndBlock uint64, traceID string) *FileInfo { +func NewPartialFileInfo(moduleName string, start uint64, exclusiveEndBlock uint64) *FileInfo { bRange := block.NewRange(start, exclusiveEndBlock) return &FileInfo{ ModuleName: moduleName, - Filename: PartialFileName(bRange, traceID), + Filename: PartialFileName(bRange), Range: bRange, - TraceID: traceID, Partial: true, } } @@ -75,18 +73,12 @@ func parseFileName(moduleName, filename string) (*FileInfo, bool) { ModuleName: moduleName, Filename: filename, Range: block.NewRange(uint64(mustAtoi(res[0][2])), uint64(mustAtoi(res[0][1]))), - TraceID: res[0][3], Partial: res[0][4] == "partial", }, true } -func PartialFileName(r *block.Range, traceID string) string { - if traceID == "" { - // Generate legacy partial filename - return fmt.Sprintf("%010d-%010d.partial", r.ExclusiveEndBlock, r.StartBlock) - } - - return fmt.Sprintf("%010d-%010d.%s.partial", r.ExclusiveEndBlock, r.StartBlock, traceID) +func PartialFileName(r *block.Range) string { + return fmt.Sprintf("%010d-%010d.partial", r.ExclusiveEndBlock, r.StartBlock) } func FullStateFileName(r *block.Range) string { diff --git a/storage/store/filename_test.go b/storage/store/filename_test.go index 546c8bdbc..896d3192c 100644 --- a/storage/store/filename_test.go +++ b/storage/store/filename_test.go @@ -15,22 +15,16 @@ func Test_parseFileName(t *testing.T) { want *FileInfo want1 bool }{ - { - "partial legacy", - fmt.Sprintf("%010d-%010d.partial", 100, 0), - &FileInfo{ModuleName: "test", Filename: "0000000100-0000000000.partial", Range: block.NewRange(0, 100), TraceID: "", Partial: true}, - true, - }, { "partial", - fmt.Sprintf("%010d-%010d.abcdef.partial", 100, 0), - &FileInfo{ModuleName: "test", Filename: "0000000100-0000000000.abcdef.partial", Range: block.NewRange(0, 100), TraceID: "abcdef", Partial: true}, + fmt.Sprintf("%010d-%010d.partial", 100, 0), + &FileInfo{ModuleName: "test", Filename: "0000000100-0000000000.partial", Range: block.NewRange(0, 100), Partial: true}, true, }, { "full", fmt.Sprintf("%010d-%010d.kv", 100, 0), - &FileInfo{ModuleName: "test", Filename: "0000000100-0000000000.kv", Range: block.NewRange(0, 100), TraceID: "", Partial: false}, + &FileInfo{ModuleName: "test", Filename: "0000000100-0000000000.kv", Range: block.NewRange(0, 100), Partial: false}, true, }, } diff --git a/storage/store/init_test.go b/storage/store/init_test.go index e6fdacc5e..672d02312 100644 --- a/storage/store/init_test.go +++ b/storage/store/init_test.go @@ -24,7 +24,7 @@ func newTestBaseStore( appendLimit = 10 } - config, err := NewConfig("test", 0, "test.module.hash", updatePolicy, valueType, store, "") + config, err := NewConfig("test", 0, "test.module.hash", updatePolicy, valueType, store) config.appendLimit = appendLimit config.totalSizeLimit = 9999 config.itemSizeLimit = 10_485_760 diff --git a/storage/store/partial_kv.go b/storage/store/partial_kv.go index 758ae3433..89e51a9d9 100644 --- a/storage/store/partial_kv.go +++ b/storage/store/partial_kv.go @@ -65,7 +65,7 @@ func (p *PartialKV) Save(endBoundaryBlock uint64) (*FileInfo, *fileWriter, error return nil, nil, fmt.Errorf("marshal partial data: %w", err) } - file := NewPartialFileInfo(p.name, p.initialBlock, endBoundaryBlock, p.traceID) + file := NewPartialFileInfo(p.name, p.initialBlock, endBoundaryBlock) p.logger.Debug("partial store save written", zap.String("file_name", file.Filename), zap.Stringer("block_range", file.Range)) fw := &fileWriter{ diff --git a/storage/store/state/snapshot.go b/storage/store/state/snapshot.go index 00155ec5c..e5cf76047 100644 --- a/storage/store/state/snapshot.go +++ b/storage/store/state/snapshot.go @@ -40,12 +40,6 @@ func (s *storeSnapshots) Sort() { left := s.Partials[i] right := s.Partials[j] - // Sort by start block first, then by trace ID so at least we - // take partials all from the same producer. - if left.Range.StartBlock == right.Range.StartBlock { - return left.TraceID < right.TraceID - } - return left.Range.StartBlock < right.Range.StartBlock }) } diff --git a/storage/store/testing.go b/storage/store/testing.go index e87f118b7..33392a762 100644 --- a/storage/store/testing.go +++ b/storage/store/testing.go @@ -53,15 +53,9 @@ func fileFromRanges(kind string, in string, params ...FileInfoParam) FileInfos { param.apply(file) } - file.Filename = PartialFileName(blockRange, file.TraceID) + file.Filename = PartialFileName(blockRange) files[i] = file } return files } - -type TraceIDParam string - -func (t TraceIDParam) apply(file *FileInfo) { - file.TraceID = string(t) -} diff --git a/test/integration_test.go b/test/integration_test.go index 7bb54ba1b..11267c8cf 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -203,7 +203,7 @@ func TestOneStoreOneMap(t *testing.T) { "ebd5bb65aaf4471e468efea126f27dbddb37b59e/outputs/0000000010-0000000020.output", "ebd5bb65aaf4471e468efea126f27dbddb37b59e/states/0000000010-0000000001.kv", // store states "ebd5bb65aaf4471e468efea126f27dbddb37b59e/states/0000000020-0000000001.kv", - // "states/0000000025-0000000020.00000000000000000000000000000000.partial", // produced, then deleted + // "states/0000000025-0000000020.partial", // produced, then deleted }, }, { @@ -218,7 +218,7 @@ func TestOneStoreOneMap(t *testing.T) { "ebd5bb65aaf4471e468efea126f27dbddb37b59e/outputs/0000000010-0000000020.output", "ebd5bb65aaf4471e468efea126f27dbddb37b59e/states/0000000010-0000000001.kv", // store states "ebd5bb65aaf4471e468efea126f27dbddb37b59e/states/0000000020-0000000001.kv", - // "states/0000000025-0000000020.00000000000000000000000000000000.partial", // produced, then deleted + // "states/0000000025-0000000020.partial", // produced, then deleted //"states/0000000030-0000000001.kv", // Again, backprocess wouldn't save this one, nor does it need to. }, }, @@ -300,7 +300,7 @@ func TestOneStoreOneMap(t *testing.T) { stopBlock: 29, production: true, preWork: func(t *testing.T, run *testRun, workerFactory work.WorkerFactory) { - partialPreWork(t, 1, 10, 0, run, workerFactory, "00000000000000000000000000000000") + partialPreWork(t, 1, 10, 0, run, workerFactory) }, expectedResponseCount: 28, expectFiles: []string{ @@ -311,86 +311,6 @@ func TestOneStoreOneMap(t *testing.T) { "3574de26d590713344b911bbc1c3bf3305ccb906/outputs/0000000001-0000000010.output", "3574de26d590713344b911bbc1c3bf3305ccb906/outputs/0000000010-0000000020.output", "3574de26d590713344b911bbc1c3bf3305ccb906/outputs/0000000020-0000000029.output", - - // Existing partial files are not re-used - //"states/0000000010-0000000001.00000000000000000000000000000000.partial", // FIXME: perhaps wasn't deleted before? - }, - }, - { - name: "prod_mode_multiple_partial_different_trace_id", - startBlock: 1, - linearBlock: 29, - stopBlock: 29, - production: true, - preWork: func(t *testing.T, run *testRun, workerFactory work.WorkerFactory) { - partialPreWork(t, 1, 10, 0, run, workerFactory, "11111111111111111111") - partialPreWork(t, 1, 10, 0, run, workerFactory, "22222222222222222222") - }, - expectedResponseCount: 28, - expectFiles: []string{ - "ebd5bb65aaf4471e468efea126f27dbddb37b59e/states/0000000010-0000000001.kv", - "ebd5bb65aaf4471e468efea126f27dbddb37b59e/states/0000000020-0000000001.kv", - "ebd5bb65aaf4471e468efea126f27dbddb37b59e/outputs/0000000001-0000000010.output", - "ebd5bb65aaf4471e468efea126f27dbddb37b59e/outputs/0000000010-0000000020.output", - - "3574de26d590713344b911bbc1c3bf3305ccb906/outputs/0000000001-0000000010.output", - "3574de26d590713344b911bbc1c3bf3305ccb906/outputs/0000000010-0000000020.output", - "3574de26d590713344b911bbc1c3bf3305ccb906/outputs/0000000020-0000000029.output", - - // Existing partial files are not re-used - "ebd5bb65aaf4471e468efea126f27dbddb37b59e/states/0000000010-0000000001.11111111111111111111.partial", - "ebd5bb65aaf4471e468efea126f27dbddb37b59e/states/0000000010-0000000001.22222222222222222222.partial", - }, - }, - { - name: "prod_mode_partial_legacy_generated", - startBlock: 1, - linearBlock: 29, - stopBlock: 29, - production: true, - preWork: func(t *testing.T, run *testRun, workerFactory work.WorkerFactory) { - // Using an empty trace id brings up the old behavior where files are not suffixed with a trace id - partialPreWork(t, 1, 10, 0, run, workerFactory, "") - }, - expectedResponseCount: 28, - expectFiles: []string{ - "ebd5bb65aaf4471e468efea126f27dbddb37b59e/states/0000000010-0000000001.kv", - "ebd5bb65aaf4471e468efea126f27dbddb37b59e/states/0000000020-0000000001.kv", - "ebd5bb65aaf4471e468efea126f27dbddb37b59e/outputs/0000000001-0000000010.output", - "ebd5bb65aaf4471e468efea126f27dbddb37b59e/outputs/0000000010-0000000020.output", - "3574de26d590713344b911bbc1c3bf3305ccb906/outputs/0000000001-0000000010.output", - "3574de26d590713344b911bbc1c3bf3305ccb906/outputs/0000000010-0000000020.output", - "3574de26d590713344b911bbc1c3bf3305ccb906/outputs/0000000020-0000000029.output", - - // Existing partial files are not re-used - "ebd5bb65aaf4471e468efea126f27dbddb37b59e/states/0000000010-0000000001.partial", - }, - }, - { - name: "prod_mode_multiple_partial_mixed_legacy_and_new", - startBlock: 1, - linearBlock: 29, - stopBlock: 29, - production: true, - preWork: func(t *testing.T, run *testRun, workerFactory work.WorkerFactory) { - // Using an empty trace id brings up the old behavior where files are not suffixed with a trace id - partialPreWork(t, 1, 10, 0, run, workerFactory, "") - partialPreWork(t, 1, 10, 0, run, workerFactory, "11111111111111111111") - }, - expectedResponseCount: 28, - expectFiles: []string{ - "ebd5bb65aaf4471e468efea126f27dbddb37b59e/states/0000000010-0000000001.kv", - "ebd5bb65aaf4471e468efea126f27dbddb37b59e/states/0000000020-0000000001.kv", - "ebd5bb65aaf4471e468efea126f27dbddb37b59e/outputs/0000000001-0000000010.output", - "ebd5bb65aaf4471e468efea126f27dbddb37b59e/outputs/0000000010-0000000020.output", - - "3574de26d590713344b911bbc1c3bf3305ccb906/outputs/0000000001-0000000010.output", - "3574de26d590713344b911bbc1c3bf3305ccb906/outputs/0000000010-0000000020.output", - "3574de26d590713344b911bbc1c3bf3305ccb906/outputs/0000000020-0000000029.output", - - // Existing partial files are not re-used - "ebd5bb65aaf4471e468efea126f27dbddb37b59e/states/0000000010-0000000001.partial", - "ebd5bb65aaf4471e468efea126f27dbddb37b59e/states/0000000010-0000000001.11111111111111111111.partial", }, }, } @@ -543,9 +463,8 @@ func assertFiles(t *testing.T, tempDir string, wantedFiles ...string) { assert.ElementsMatch(t, wantedFiles, actualFiles) } -func partialPreWork(t *testing.T, start, end uint64, stageIdx int, run *testRun, workerFactory work.WorkerFactory, traceID string) { +func partialPreWork(t *testing.T, start, end uint64, stageIdx int, run *testRun, workerFactory work.WorkerFactory) { worker := workerFactory(zlog) - worker.(*TestWorker).traceID = &traceID // FIXME: use the new `Work` interface here, and validate that the // caller to `partialPreWork` doesn't need to be changed too much? :) diff --git a/test/runnable_test.go b/test/runnable_test.go index 4c1bc5a0c..72b9d75c9 100644 --- a/test/runnable_test.go +++ b/test/runnable_test.go @@ -237,7 +237,6 @@ func processInternalRequest( responseCollector *responseCollector, blockProcessedCallBack blockProcessedCallBack, testTempDir string, - traceID *string, ) error { t.Helper() @@ -263,7 +262,7 @@ func processInternalRequest( ) svc := service.TestNewServiceTier2(runtimeConfig, tr.StreamFactory) - return svc.TestProcessRange(ctx, request, responseCollector.Collect, traceID) + return svc.TestProcessRange(ctx, request, responseCollector.Collect) } func processRequest( diff --git a/test/worker_test.go b/test/worker_test.go index 7f4e048c0..8de68d1ee 100644 --- a/test/worker_test.go +++ b/test/worker_test.go @@ -23,7 +23,6 @@ type TestWorker struct { blockProcessedCallBack blockProcessedCallBack testTempDir string id uint64 - traceID *string } var workerID atomic.Uint64 @@ -49,7 +48,7 @@ func (w *TestWorker) Work(ctx context.Context, unit stage.Unit, workRange *block ) return func() loop.Msg { - if err := processInternalRequest(w.t, ctx, request, nil, w.newBlockGenerator, w.responseCollector, w.blockProcessedCallBack, w.testTempDir, w.traceID); err != nil { + if err := processInternalRequest(w.t, ctx, request, nil, w.newBlockGenerator, w.responseCollector, w.blockProcessedCallBack, w.testTempDir); err != nil { return work.MsgJobFailed{Unit: unit, Error: fmt.Errorf("processing test tier2 request: %w", err)} } logger.Info("worker done running job", diff --git a/tools/analytics_store_stats.go b/tools/analytics_store_stats.go index 90f0f2293..0d7993a19 100644 --- a/tools/analytics_store_stats.go +++ b/tools/analytics_store_stats.go @@ -109,7 +109,6 @@ func StoreStatsE(cmd *cobra.Command, args []string) error { module.GetKind().(*pbsubstreams.Module_KindStore_).KindStore.UpdatePolicy, module.GetKind().(*pbsubstreams.Module_KindStore_).KindStore.ValueType, baseDStore, - "", ) if err != nil { zlog.Error("creating store config", zap.Error(err)) diff --git a/tools/check.go b/tools/check.go index 4a9e7f7b3..e4ce0fbb9 100644 --- a/tools/check.go +++ b/tools/check.go @@ -64,7 +64,7 @@ func newStore(storeURL string) (*store2.FullKV, dstore.Store, error) { return nil, nil, fmt.Errorf("could not create store from %s: %w", storeURL, err) } - config, err := store2.NewConfig("", 0, "", pbsubstreams.Module_KindStore_UPDATE_POLICY_SET_IF_NOT_EXISTS, "", remoteStore, "") + config, err := store2.NewConfig("", 0, "", pbsubstreams.Module_KindStore_UPDATE_POLICY_SET_IF_NOT_EXISTS, "", remoteStore) if err != nil { return nil, nil, err } diff --git a/tools/decode.go b/tools/decode.go index c132bf49b..a0f2cbeaa 100644 --- a/tools/decode.go +++ b/tools/decode.go @@ -284,7 +284,7 @@ func searchStateModule( stateStore dstore.Store, protoFiles []*descriptorpb.FileDescriptorProto, ) error { - config, err := store.NewConfig(module.Name, module.InitialBlock, moduleHash, module.GetKindStore().GetUpdatePolicy(), module.GetKindStore().GetValueType(), stateStore, "") + config, err := store.NewConfig(module.Name, module.InitialBlock, moduleHash, module.GetKindStore().GetUpdatePolicy(), module.GetKindStore().GetValueType(), stateStore) if err != nil { return fmt.Errorf("initializing store config module %q: %w", module.Name, err) } diff --git a/tools/module.go b/tools/module.go index cb06fd1c2..8e22bbceb 100644 --- a/tools/module.go +++ b/tools/module.go @@ -114,7 +114,6 @@ func moduleRunE(cmd *cobra.Command, args []string) error { module.GetKindStore().UpdatePolicy, module.GetKindStore().ValueType, stateStore, - "", ) cli.NoError(err, "unable to create store config") diff --git a/wasm/call_test.go b/wasm/call_test.go index c425f5da6..8cc81c158 100644 --- a/wasm/call_test.go +++ b/wasm/call_test.go @@ -482,7 +482,7 @@ func Test_CallStoreOps(t *testing.T) { func newTestCall(updatePolicy pbsubstreams.Module_KindStore_UpdatePolicy, valueType string) *Call { myStore := dstore.NewMockStore(nil) - storeConf, err := store.NewConfig("test", 0, "", updatePolicy, valueType, myStore, "test") + storeConf, err := store.NewConfig("test", 0, "", updatePolicy, valueType, myStore) if err != nil { panic("failed") }