diff --git a/docs/release-notes/change-log.md b/docs/release-notes/change-log.md index fbe099cec..7c7b881de 100644 --- a/docs/release-notes/change-log.md +++ b/docs/release-notes/change-log.md @@ -22,7 +22,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), * Tier2 will now read back mapper outputs (if they exist) to prevent running them again. Additionally, it will not read back the full blocks if its inputs can be satisfied from existing cached mapper outputs. -* Tier2 will skip processing completely if the `output_module` is a mapper that has already been processed (ex: when multiple requests are indexing the same data at the same time) +* Tier2 will skip processing completely if it's processing the last stage and the `output_module` is a mapper that has already been processed (ex: when multiple requests are indexing the same data at the same time) + +* Tier2 will skip processing completely if it's processing a stage that is not the last, but all the stores and outputs have been processed and cached. + +* The "partial" store outputs no longer contain the trace ID in the filename, allowing them to be reused. If many requests point to the same modules being squashed, the squasher will detect if another Tier1 has squashed its file and reload the store from the produced full KV. * [Operator] Readiness metric for Substreams tier1 app is now named `substreams_tier1` (was mistakenly called `firehose` before). diff --git a/orchestrator/parallelprocessor.go b/orchestrator/parallelprocessor.go index d8ba12180..06ef6d427 100644 --- a/orchestrator/parallelprocessor.go +++ b/orchestrator/parallelprocessor.go @@ -33,13 +33,12 @@ func BuildParallelProcessor( execoutStorage *execout.Configs, respFunc func(resp substreams.ResponseFromAnyTier) error, storeConfigs store.ConfigMap, - traceID string, ) (*ParallelProcessor, error) { stream := response.New(respFunc) sched := scheduler.New(ctx, stream) - stages := stage.NewStages(ctx, outputGraph, reqPlan, storeConfigs, traceID) + stages := stage.NewStages(ctx, outputGraph, reqPlan, storeConfigs) sched.Stages = stages // OPTIMIZATION: We should fetch the ExecOut files too, and see if they @@ -91,7 +90,6 @@ func BuildParallelProcessor( reqPlan.StoresSegmenter(), storeConfigs, execoutStorage, - traceID, ) if err != nil { return nil, fmt.Errorf("fetch stores storage state: %w", err) @@ -102,7 +100,6 @@ func BuildParallelProcessor( reqPlan.WriteOutSegmenter(), storeConfigs, execoutStorage, - traceID, ) if err != nil { return nil, fmt.Errorf("fetch stores storage state: %w", err) diff --git a/orchestrator/stage/fetchstorage.go b/orchestrator/stage/fetchstorage.go index a3ccd5696..61fdba561 100644 --- a/orchestrator/stage/fetchstorage.go +++ b/orchestrator/stage/fetchstorage.go @@ -17,7 +17,6 @@ func (s *Stages) FetchStoresState( segmenter *block.Segmenter, storeConfigMap store.ConfigMap, execoutConfigs *execout.Configs, - traceID string, ) error { completes := make(unitMap) partials := make(unitMap) @@ -103,9 +102,6 @@ func (s *Stages) FetchStoresState( if !rng.Equals(partial.Range) { continue } - if traceID != partial.TraceID { - continue - } unit := Unit{Stage: stageIdx, Segment: segmentIdx} if s.getState(unit) == UnitCompleted { diff --git a/orchestrator/stage/squash.go b/orchestrator/stage/squash.go index bc22c3047..1876c7471 100644 --- a/orchestrator/stage/squash.go +++ b/orchestrator/stage/squash.go @@ -58,7 +58,7 @@ func (s *Stages) singleSquash(stage *Stage, modState *StoreModuleState, mergeUni metrics.start = time.Now() rng := modState.segmenter.Range(mergeUnit.Segment) - partialFile := store.NewPartialFileInfo(modState.name, rng.StartBlock, rng.ExclusiveEndBlock, s.traceID) + partialFile := store.NewPartialFileInfo(modState.name, rng.StartBlock, rng.ExclusiveEndBlock) partialKV := modState.derivePartialKV(rng.StartBlock) segmentEndsOnInterval := modState.segmenter.EndsOnInterval(mergeUnit.Segment) @@ -72,6 +72,12 @@ func (s *Stages) singleSquash(stage *Stage, modState *StoreModuleState, mergeUni // Load metrics.loadStart = time.Now() if err := partialKV.Load(s.ctx, partialFile); err != nil { + if nextFull, err := modState.getStore(s.ctx, rng.ExclusiveEndBlock); err == nil { // try to load an already-merged file + s.logger.Info("got full store from cache instead of partial, reloading", zap.Stringer("store", nextFull)) + modState.cachedStore = nextFull + modState.lastBlockInStore = rng.ExclusiveEndBlock + return nil + } return fmt.Errorf("loading partial: %q: %w", partialFile.Filename, err) } metrics.loadEnd = time.Now() diff --git a/orchestrator/stage/stages.go b/orchestrator/stage/stages.go index 3c0bce35d..2e5af06c0 100644 --- a/orchestrator/stage/stages.go +++ b/orchestrator/stage/stages.go @@ -32,9 +32,8 @@ import ( // that the Stage is completed, kicking off the next layer of jobs. type Stages struct { - ctx context.Context - logger *zap.Logger - traceID string + ctx context.Context + logger *zap.Logger globalSegmenter *block.Segmenter // This segmenter covers both the stores and the mapper storeSegmenter *block.Segmenter // This segmenter covers only jobs needed to build up stores according to the RequestPlan. @@ -58,7 +57,6 @@ func NewStages( outputGraph *outputmodules.Graph, reqPlan *plan.RequestPlan, storeConfigs store.ConfigMap, - traceID string, ) (out *Stages) { if !reqPlan.RequiresParallelProcessing() { @@ -70,7 +68,6 @@ func NewStages( stagedModules := outputGraph.StagedUsedModules() out = &Stages{ ctx: ctx, - traceID: traceID, logger: reqctx.Logger(ctx), globalSegmenter: reqPlan.BackprocessSegmenter(), } diff --git a/orchestrator/stage/stages_test.go b/orchestrator/stage/stages_test.go index 9167d86fc..9adad77f0 100644 --- a/orchestrator/stage/stages_test.go +++ b/orchestrator/stage/stages_test.go @@ -23,7 +23,6 @@ func TestNewStages(t *testing.T) { outputmodules.TestGraphStagedModules(5, 7, 12, 22, 25), reqPlan, nil, - "trace", ) assert.Equal(t, 8, stages.globalSegmenter.Count()) // from 5 to 75 @@ -49,7 +48,6 @@ func TestNewStagesNextJobs(t *testing.T) { outputmodules.TestGraphStagedModules(5, 5, 5, 5, 5), reqPlan, nil, - "trace", ) stages.allocSegments(0) diff --git a/orchestrator/work/worker.go b/orchestrator/work/worker.go index 7bbc36880..c11d7efbd 100644 --- a/orchestrator/work/worker.go +++ b/orchestrator/work/worker.go @@ -288,7 +288,7 @@ func toRPCPartialFiles(completed *pbssinternal.Completed) (out store.FileInfos) // stores to all having been processed. out = make(store.FileInfos, len(completed.AllProcessedRanges)) for i, b := range completed.AllProcessedRanges { - out[i] = store.NewPartialFileInfo("TODO:CHANGE-ME", b.StartBlock, b.EndBlock, completed.TraceId) + out[i] = store.NewPartialFileInfo("TODO:CHANGE-ME", b.StartBlock, b.EndBlock) } return } diff --git a/pb/last_generate.txt b/pb/last_generate.txt index ca8ca49df..1dfee1170 100644 --- a/pb/last_generate.txt +++ b/pb/last_generate.txt @@ -1,2 +1,2 @@ -generate.sh - Thu 14 Mar 2024 10:02:19 EDT - arnaudberger -streamingfast/proto revision: 2999cd42d71a82c4adf739557f9520f0609f7b10 +generate.sh - Tue Mar 19 09:09:54 EDT 2024 - stepd +streamingfast/proto revision: fde3637cef38103a68301d0ca19b3f2af9b6079b diff --git a/pb/sf/substreams/intern/v2/deltas.pb.go b/pb/sf/substreams/intern/v2/deltas.pb.go index 4ee11ae4f..eedd5ed6f 100644 --- a/pb/sf/substreams/intern/v2/deltas.pb.go +++ b/pb/sf/substreams/intern/v2/deltas.pb.go @@ -22,6 +22,100 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +type Operation_Type int32 + +const ( + Operation_SET Operation_Type = 0 + Operation_SET_BYTES Operation_Type = 1 + Operation_SET_IF_NOT_EXISTS Operation_Type = 2 + Operation_SET_BYTES_IF_NOT_EXISTS Operation_Type = 3 + Operation_APPEND Operation_Type = 4 + Operation_DELETE_PREFIX Operation_Type = 5 + Operation_SET_MAX_BIG_INT Operation_Type = 6 + Operation_SET_MAX_INT64 Operation_Type = 7 + Operation_SET_MAX_FLOAT64 Operation_Type = 8 + Operation_SET_MAX_BIG_DECIMAL Operation_Type = 9 + Operation_SET_MIN_BIG_INT Operation_Type = 10 + Operation_SET_MIN_INT64 Operation_Type = 11 + Operation_SET_MIN_FLOAT64 Operation_Type = 12 + Operation_SET_MIN_BIG_DECIMAL Operation_Type = 13 + Operation_SUM_BIG_INT Operation_Type = 14 + Operation_SUM_INT64 Operation_Type = 15 + Operation_SUM_FLOAT64 Operation_Type = 16 + Operation_SUM_BIG_DECIMAL Operation_Type = 17 +) + +// Enum value maps for Operation_Type. +var ( + Operation_Type_name = map[int32]string{ + 0: "SET", + 1: "SET_BYTES", + 2: "SET_IF_NOT_EXISTS", + 3: "SET_BYTES_IF_NOT_EXISTS", + 4: "APPEND", + 5: "DELETE_PREFIX", + 6: "SET_MAX_BIG_INT", + 7: "SET_MAX_INT64", + 8: "SET_MAX_FLOAT64", + 9: "SET_MAX_BIG_DECIMAL", + 10: "SET_MIN_BIG_INT", + 11: "SET_MIN_INT64", + 12: "SET_MIN_FLOAT64", + 13: "SET_MIN_BIG_DECIMAL", + 14: "SUM_BIG_INT", + 15: "SUM_INT64", + 16: "SUM_FLOAT64", + 17: "SUM_BIG_DECIMAL", + } + Operation_Type_value = map[string]int32{ + "SET": 0, + "SET_BYTES": 1, + "SET_IF_NOT_EXISTS": 2, + "SET_BYTES_IF_NOT_EXISTS": 3, + "APPEND": 4, + "DELETE_PREFIX": 5, + "SET_MAX_BIG_INT": 6, + "SET_MAX_INT64": 7, + "SET_MAX_FLOAT64": 8, + "SET_MAX_BIG_DECIMAL": 9, + "SET_MIN_BIG_INT": 10, + "SET_MIN_INT64": 11, + "SET_MIN_FLOAT64": 12, + "SET_MIN_BIG_DECIMAL": 13, + "SUM_BIG_INT": 14, + "SUM_INT64": 15, + "SUM_FLOAT64": 16, + "SUM_BIG_DECIMAL": 17, + } +) + +func (x Operation_Type) Enum() *Operation_Type { + p := new(Operation_Type) + *p = x + return p +} + +func (x Operation_Type) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (Operation_Type) Descriptor() protoreflect.EnumDescriptor { + return file_sf_substreams_intern_v2_deltas_proto_enumTypes[0].Descriptor() +} + +func (Operation_Type) Type() protoreflect.EnumType { + return &file_sf_substreams_intern_v2_deltas_proto_enumTypes[0] +} + +func (x Operation_Type) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use Operation_Type.Descriptor instead. +func (Operation_Type) EnumDescriptor() ([]byte, []int) { + return file_sf_substreams_intern_v2_deltas_proto_rawDescGZIP(), []int{2, 0} +} + type ModuleOutput struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -135,6 +229,124 @@ func (*ModuleOutput_MapOutput) isModuleOutput_Data() {} func (*ModuleOutput_StoreDeltas) isModuleOutput_Data() {} +type Operations struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Operations []*Operation `protobuf:"bytes,1,rep,name=operations,proto3" json:"operations,omitempty"` +} + +func (x *Operations) Reset() { + *x = Operations{} + if protoimpl.UnsafeEnabled { + mi := &file_sf_substreams_intern_v2_deltas_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Operations) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Operations) ProtoMessage() {} + +func (x *Operations) ProtoReflect() protoreflect.Message { + mi := &file_sf_substreams_intern_v2_deltas_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Operations.ProtoReflect.Descriptor instead. +func (*Operations) Descriptor() ([]byte, []int) { + return file_sf_substreams_intern_v2_deltas_proto_rawDescGZIP(), []int{1} +} + +func (x *Operations) GetOperations() []*Operation { + if x != nil { + return x.Operations + } + return nil +} + +type Operation struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Type Operation_Type `protobuf:"varint,1,opt,name=type,proto3,enum=sf.substreams.internal.v2.Operation_Type" json:"type,omitempty"` + Ord uint64 `protobuf:"varint,2,opt,name=ord,proto3" json:"ord,omitempty"` + Key string `protobuf:"bytes,3,opt,name=key,proto3" json:"key,omitempty"` + Value []byte `protobuf:"bytes,4,opt,name=value,proto3" json:"value,omitempty"` +} + +func (x *Operation) Reset() { + *x = Operation{} + if protoimpl.UnsafeEnabled { + mi := &file_sf_substreams_intern_v2_deltas_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Operation) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Operation) ProtoMessage() {} + +func (x *Operation) ProtoReflect() protoreflect.Message { + mi := &file_sf_substreams_intern_v2_deltas_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Operation.ProtoReflect.Descriptor instead. +func (*Operation) Descriptor() ([]byte, []int) { + return file_sf_substreams_intern_v2_deltas_proto_rawDescGZIP(), []int{2} +} + +func (x *Operation) GetType() Operation_Type { + if x != nil { + return x.Type + } + return Operation_SET +} + +func (x *Operation) GetOrd() uint64 { + if x != nil { + return x.Ord + } + return 0 +} + +func (x *Operation) GetKey() string { + if x != nil { + return x.Key + } + return "" +} + +func (x *Operation) GetValue() []byte { + if x != nil { + return x.Value + } + return nil +} + var File_sf_substreams_intern_v2_deltas_proto protoreflect.FileDescriptor var file_sf_substreams_intern_v2_deltas_proto_rawDesc = []byte{ @@ -162,13 +374,49 @@ var file_sf_substreams_intern_v2_deltas_proto_rawDesc = []byte{ 0x61, 0x74, 0x65, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x08, 0x52, 0x12, 0x64, 0x65, 0x62, 0x75, 0x67, 0x4c, 0x6f, 0x67, 0x73, 0x54, 0x72, 0x75, 0x6e, 0x63, 0x61, 0x74, 0x65, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x63, 0x61, 0x63, 0x68, 0x65, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, - 0x63, 0x61, 0x63, 0x68, 0x65, 0x64, 0x42, 0x06, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x42, 0x4d, - 0x5a, 0x4b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x74, 0x72, - 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x66, 0x61, 0x73, 0x74, 0x2f, 0x73, 0x75, 0x62, 0x73, 0x74, - 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2f, 0x70, 0x62, 0x2f, 0x73, 0x66, 0x2f, 0x73, 0x75, 0x62, 0x73, - 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x2f, 0x76, 0x32, - 0x3b, 0x70, 0x62, 0x73, 0x73, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x62, 0x06, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x63, 0x61, 0x63, 0x68, 0x65, 0x64, 0x42, 0x06, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x52, + 0x0a, 0x0a, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x44, 0x0a, 0x0a, + 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, + 0x32, 0x24, 0x2e, 0x73, 0x66, 0x2e, 0x73, 0x75, 0x62, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, + 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x76, 0x32, 0x2e, 0x4f, 0x70, 0x65, + 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0a, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x73, 0x22, 0xea, 0x03, 0x0a, 0x09, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x12, 0x3d, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x29, + 0x2e, 0x73, 0x66, 0x2e, 0x73, 0x75, 0x62, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x69, + 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x76, 0x32, 0x2e, 0x4f, 0x70, 0x65, 0x72, 0x61, + 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, + 0x10, 0x0a, 0x03, 0x6f, 0x72, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x03, 0x6f, 0x72, + 0x64, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, + 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x04, 0x20, 0x01, + 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0xe3, 0x02, 0x0a, 0x04, 0x54, 0x79, + 0x70, 0x65, 0x12, 0x07, 0x0a, 0x03, 0x53, 0x45, 0x54, 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x53, + 0x45, 0x54, 0x5f, 0x42, 0x59, 0x54, 0x45, 0x53, 0x10, 0x01, 0x12, 0x15, 0x0a, 0x11, 0x53, 0x45, + 0x54, 0x5f, 0x49, 0x46, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x45, 0x58, 0x49, 0x53, 0x54, 0x53, 0x10, + 0x02, 0x12, 0x1b, 0x0a, 0x17, 0x53, 0x45, 0x54, 0x5f, 0x42, 0x59, 0x54, 0x45, 0x53, 0x5f, 0x49, + 0x46, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x45, 0x58, 0x49, 0x53, 0x54, 0x53, 0x10, 0x03, 0x12, 0x0a, + 0x0a, 0x06, 0x41, 0x50, 0x50, 0x45, 0x4e, 0x44, 0x10, 0x04, 0x12, 0x11, 0x0a, 0x0d, 0x44, 0x45, + 0x4c, 0x45, 0x54, 0x45, 0x5f, 0x50, 0x52, 0x45, 0x46, 0x49, 0x58, 0x10, 0x05, 0x12, 0x13, 0x0a, + 0x0f, 0x53, 0x45, 0x54, 0x5f, 0x4d, 0x41, 0x58, 0x5f, 0x42, 0x49, 0x47, 0x5f, 0x49, 0x4e, 0x54, + 0x10, 0x06, 0x12, 0x11, 0x0a, 0x0d, 0x53, 0x45, 0x54, 0x5f, 0x4d, 0x41, 0x58, 0x5f, 0x49, 0x4e, + 0x54, 0x36, 0x34, 0x10, 0x07, 0x12, 0x13, 0x0a, 0x0f, 0x53, 0x45, 0x54, 0x5f, 0x4d, 0x41, 0x58, + 0x5f, 0x46, 0x4c, 0x4f, 0x41, 0x54, 0x36, 0x34, 0x10, 0x08, 0x12, 0x17, 0x0a, 0x13, 0x53, 0x45, + 0x54, 0x5f, 0x4d, 0x41, 0x58, 0x5f, 0x42, 0x49, 0x47, 0x5f, 0x44, 0x45, 0x43, 0x49, 0x4d, 0x41, + 0x4c, 0x10, 0x09, 0x12, 0x13, 0x0a, 0x0f, 0x53, 0x45, 0x54, 0x5f, 0x4d, 0x49, 0x4e, 0x5f, 0x42, + 0x49, 0x47, 0x5f, 0x49, 0x4e, 0x54, 0x10, 0x0a, 0x12, 0x11, 0x0a, 0x0d, 0x53, 0x45, 0x54, 0x5f, + 0x4d, 0x49, 0x4e, 0x5f, 0x49, 0x4e, 0x54, 0x36, 0x34, 0x10, 0x0b, 0x12, 0x13, 0x0a, 0x0f, 0x53, + 0x45, 0x54, 0x5f, 0x4d, 0x49, 0x4e, 0x5f, 0x46, 0x4c, 0x4f, 0x41, 0x54, 0x36, 0x34, 0x10, 0x0c, + 0x12, 0x17, 0x0a, 0x13, 0x53, 0x45, 0x54, 0x5f, 0x4d, 0x49, 0x4e, 0x5f, 0x42, 0x49, 0x47, 0x5f, + 0x44, 0x45, 0x43, 0x49, 0x4d, 0x41, 0x4c, 0x10, 0x0d, 0x12, 0x0f, 0x0a, 0x0b, 0x53, 0x55, 0x4d, + 0x5f, 0x42, 0x49, 0x47, 0x5f, 0x49, 0x4e, 0x54, 0x10, 0x0e, 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x55, + 0x4d, 0x5f, 0x49, 0x4e, 0x54, 0x36, 0x34, 0x10, 0x0f, 0x12, 0x0f, 0x0a, 0x0b, 0x53, 0x55, 0x4d, + 0x5f, 0x46, 0x4c, 0x4f, 0x41, 0x54, 0x36, 0x34, 0x10, 0x10, 0x12, 0x13, 0x0a, 0x0f, 0x53, 0x55, + 0x4d, 0x5f, 0x42, 0x49, 0x47, 0x5f, 0x44, 0x45, 0x43, 0x49, 0x4d, 0x41, 0x4c, 0x10, 0x11, 0x42, + 0x4d, 0x5a, 0x4b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x74, + 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x66, 0x61, 0x73, 0x74, 0x2f, 0x73, 0x75, 0x62, 0x73, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2f, 0x70, 0x62, 0x2f, 0x73, 0x66, 0x2f, 0x73, 0x75, 0x62, + 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x2f, 0x76, + 0x32, 0x3b, 0x70, 0x62, 0x73, 0x73, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -183,20 +431,26 @@ func file_sf_substreams_intern_v2_deltas_proto_rawDescGZIP() []byte { return file_sf_substreams_intern_v2_deltas_proto_rawDescData } -var file_sf_substreams_intern_v2_deltas_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_sf_substreams_intern_v2_deltas_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_sf_substreams_intern_v2_deltas_proto_msgTypes = make([]protoimpl.MessageInfo, 3) var file_sf_substreams_intern_v2_deltas_proto_goTypes = []interface{}{ - (*ModuleOutput)(nil), // 0: sf.substreams.internal.v2.ModuleOutput - (*anypb.Any)(nil), // 1: google.protobuf.Any - (*v1.StoreDeltas)(nil), // 2: sf.substreams.v1.StoreDeltas + (Operation_Type)(0), // 0: sf.substreams.internal.v2.Operation.Type + (*ModuleOutput)(nil), // 1: sf.substreams.internal.v2.ModuleOutput + (*Operations)(nil), // 2: sf.substreams.internal.v2.Operations + (*Operation)(nil), // 3: sf.substreams.internal.v2.Operation + (*anypb.Any)(nil), // 4: google.protobuf.Any + (*v1.StoreDeltas)(nil), // 5: sf.substreams.v1.StoreDeltas } var file_sf_substreams_intern_v2_deltas_proto_depIdxs = []int32{ - 1, // 0: sf.substreams.internal.v2.ModuleOutput.map_output:type_name -> google.protobuf.Any - 2, // 1: sf.substreams.internal.v2.ModuleOutput.store_deltas:type_name -> sf.substreams.v1.StoreDeltas - 2, // [2:2] is the sub-list for method output_type - 2, // [2:2] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name + 4, // 0: sf.substreams.internal.v2.ModuleOutput.map_output:type_name -> google.protobuf.Any + 5, // 1: sf.substreams.internal.v2.ModuleOutput.store_deltas:type_name -> sf.substreams.v1.StoreDeltas + 3, // 2: sf.substreams.internal.v2.Operations.operations:type_name -> sf.substreams.internal.v2.Operation + 0, // 3: sf.substreams.internal.v2.Operation.type:type_name -> sf.substreams.internal.v2.Operation.Type + 4, // [4:4] is the sub-list for method output_type + 4, // [4:4] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name } func init() { file_sf_substreams_intern_v2_deltas_proto_init() } @@ -217,6 +471,30 @@ func file_sf_substreams_intern_v2_deltas_proto_init() { return nil } } + file_sf_substreams_intern_v2_deltas_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Operations); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_sf_substreams_intern_v2_deltas_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Operation); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } file_sf_substreams_intern_v2_deltas_proto_msgTypes[0].OneofWrappers = []interface{}{ (*ModuleOutput_MapOutput)(nil), @@ -227,13 +505,14 @@ func file_sf_substreams_intern_v2_deltas_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_sf_substreams_intern_v2_deltas_proto_rawDesc, - NumEnums: 0, - NumMessages: 1, + NumEnums: 1, + NumMessages: 3, NumExtensions: 0, NumServices: 0, }, GoTypes: file_sf_substreams_intern_v2_deltas_proto_goTypes, DependencyIndexes: file_sf_substreams_intern_v2_deltas_proto_depIdxs, + EnumInfos: file_sf_substreams_intern_v2_deltas_proto_enumTypes, MessageInfos: file_sf_substreams_intern_v2_deltas_proto_msgTypes, }.Build() File_sf_substreams_intern_v2_deltas_proto = out.File diff --git a/pipeline/exec/module_executor.go b/pipeline/exec/module_executor.go index 26e3d069f..345b5720f 100644 --- a/pipeline/exec/module_executor.go +++ b/pipeline/exec/module_executor.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/streamingfast/substreams/storage/execout" + "google.golang.org/protobuf/proto" "go.opentelemetry.io/otel/attribute" "go.uber.org/zap" @@ -44,6 +45,18 @@ func RunModule(ctx context.Context, executor ModuleExecutor, execOutput execout. return moduleOutput, outputBytes, fmt.Errorf("converting output to module output: %w", err) } + if moduleOutput == nil { + return nil, nil, nil // output from PartialKV is always nil, we cannot use it + } + + // For store modules, the output in cache is in "operations", but we get the proper store deltas in "toModuleOutput", so we need to transform back those deltas into outputBytes + if storeDeltas := moduleOutput.GetStoreDeltas(); storeDeltas != nil { + outputBytes, err = proto.Marshal(moduleOutput.GetStoreDeltas()) + if err != nil { + return nil, nil, err + } + } + fillModuleOutputMetadata(executor, moduleOutput) moduleOutput.Cached = true return moduleOutput, outputBytes, nil @@ -75,5 +88,4 @@ func fillModuleOutputMetadata(executor ModuleExecutor, in *pbssinternal.ModuleOu in.ModuleName = executor.Name() in.Logs = logs in.DebugLogsTruncated = truncated - return } diff --git a/pipeline/exec/storeexec.go b/pipeline/exec/storeexec.go index d89bd0877..a43604ab5 100644 --- a/pipeline/exec/storeexec.go +++ b/pipeline/exec/storeexec.go @@ -3,6 +3,7 @@ package exec import ( "context" "fmt" + pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2" "google.golang.org/protobuf/proto" @@ -28,13 +29,12 @@ func (e *StoreModuleExecutor) Name() string { return e.moduleName } func (e *StoreModuleExecutor) String() string { return e.Name() } func (e *StoreModuleExecutor) applyCachedOutput(value []byte) error { - deltas := &pbsubstreams.StoreDeltas{} - err := proto.Unmarshal(value, deltas) - if err != nil { - return fmt.Errorf("unmarshalling output deltas: %w", err) + if pkvs, ok := e.outputStore.(*store.PartialKV); ok { + return pkvs.ApplyOps(value) } - e.outputStore.SetDeltas(deltas.StoreDeltas) - return nil + + fullkvs := e.outputStore.(*store.FullKV) + return fullkvs.ApplyOps(value) } func (e *StoreModuleExecutor) run(ctx context.Context, reader execout.ExecutionOutputGetter) (out []byte, moduleOutputData *pbssinternal.ModuleOutput, err error) { @@ -71,16 +71,19 @@ func (e *StoreModuleExecutor) wrapDeltas() ([]byte, *pbssinternal.ModuleOutput, return data, moduleOutput, nil } +// toModuleOutput returns nil,nil on partialKV, because we never use the outputs of a partial store directly func (e *StoreModuleExecutor) toModuleOutput(data []byte) (*pbssinternal.ModuleOutput, error) { - deltas := &pbsubstreams.StoreDeltas{} - err := proto.Unmarshal(data, deltas) - if err != nil { - return nil, fmt.Errorf("unmarshalling output deltas: %w", err) + if fullkvs, ok := e.outputStore.(*store.FullKV); ok { + deltas := fullkvs.GetDeltas() + fullkvs.Reset() + + return &pbssinternal.ModuleOutput{ + Data: &pbssinternal.ModuleOutput_StoreDeltas{ + StoreDeltas: &pbsubstreams.StoreDeltas{ + StoreDeltas: deltas, + }, + }, + }, nil } - - return &pbssinternal.ModuleOutput{ - Data: &pbssinternal.ModuleOutput_StoreDeltas{ - StoreDeltas: deltas, - }, - }, nil + return nil, nil } diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 15e0675ff..ca592200f 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -75,7 +75,6 @@ type Pipeline struct { // (for chains with potential block skips) lastFinalClock *pbsubstreams.Clock - traceID string blockStepMap map[bstream.StepType]uint64 } @@ -88,7 +87,6 @@ func New( execOutputCache *cache.Engine, runtimeConfig config.RuntimeConfig, respFunc substreams.ResponseFunc, - traceID string, opts ...Option, ) *Pipeline { pipe := &Pipeline{ @@ -102,7 +100,6 @@ func New( stores: stores, execoutStorage: execoutStorage, forkHandler: NewForkHandler(), - traceID: traceID, blockStepMap: make(map[bstream.StepType]uint64), startTime: time.Now(), } @@ -262,7 +259,6 @@ func (p *Pipeline) runParallelProcess(ctx context.Context, reqPlan *plan.Request p.execoutStorage, p.respFunc, p.stores.configs, - p.traceID, ) if err != nil { return nil, fmt.Errorf("building parallel processor: %w", err) diff --git a/pipeline/pipeline_test.go b/pipeline/pipeline_test.go index c05c7ddd0..12f5d5342 100644 --- a/pipeline/pipeline_test.go +++ b/pipeline/pipeline_test.go @@ -186,7 +186,7 @@ func testConfigMap(t *testing.T, configs []testStoreConfig) store2.ConfigMap { objStore := dstore.NewMockStore(nil) for _, conf := range configs { - newStore, err := store2.NewConfig(conf.name, conf.initBlock, conf.name, pbsubstreams.Module_KindStore_UPDATE_POLICY_SET, "string", objStore, "") + newStore, err := store2.NewConfig(conf.name, conf.initBlock, conf.name, pbsubstreams.Module_KindStore_UPDATE_POLICY_SET, "string", objStore) require.NoError(t, err) confMap[newStore.Name()] = newStore diff --git a/pipeline/process_block.go b/pipeline/process_block.go index f63403579..6b97ccf50 100644 --- a/pipeline/process_block.go +++ b/pipeline/process_block.go @@ -22,6 +22,7 @@ import ( "github.com/streamingfast/substreams/pipeline/exec" "github.com/streamingfast/substreams/reqctx" "github.com/streamingfast/substreams/storage/execout" + "github.com/streamingfast/substreams/storage/store" ) func (p *Pipeline) ProcessFromExecOutput( @@ -371,12 +372,23 @@ func (p *Pipeline) applyExecutionResult(ctx context.Context, executor exec.Modul return fmt.Errorf("execute module: %w", runError) } - p.saveModuleOutput(moduleOutput, executor.Name(), reqctx.Details(ctx).ProductionMode) - if err := execOutput.Set(executorName, outputBytes); err != nil { - return fmt.Errorf("set output cache: %w", err) - } - if moduleOutput != nil { - p.forkHandler.addReversibleOutput(moduleOutput, execOutput.Clock().Id) + if hasValidOutput { + p.saveModuleOutput(moduleOutput, executor.Name(), reqctx.Details(ctx).ProductionMode) + if err := execOutput.Set(executorName, outputBytes); err != nil { + return fmt.Errorf("set output cache: %w", err) + } + if moduleOutput != nil { + p.forkHandler.addReversibleOutput(moduleOutput, execOutput.Clock().Id) + } + } else { // we are in a partial store + if stor, ok := p.GetStoreMap().Get(executorName); ok { + if pkvs, ok := stor.(*store.PartialKV); ok { + if err := execOutput.Set(executorName, pkvs.ReadOps()); err != nil { + return fmt.Errorf("set output cache: %w", err) + } + } + + } } return nil } diff --git a/proto/sf/substreams/intern/v2/deltas.proto b/proto/sf/substreams/intern/v2/deltas.proto index 14a314ea3..bef041ea1 100644 --- a/proto/sf/substreams/intern/v2/deltas.proto +++ b/proto/sf/substreams/intern/v2/deltas.proto @@ -17,3 +17,36 @@ message ModuleOutput { bool debug_logs_truncated = 5; bool cached = 6; } + +message Operations { + repeated Operation operations = 1; +} + +message Operation { + enum Type { + SET = 0; + SET_BYTES = 1; + SET_IF_NOT_EXISTS = 2; + SET_BYTES_IF_NOT_EXISTS = 3; + APPEND = 4; + DELETE_PREFIX = 5; + SET_MAX_BIG_INT = 6; + SET_MAX_INT64 = 7; + SET_MAX_FLOAT64 = 8; + SET_MAX_BIG_DECIMAL = 9; + SET_MIN_BIG_INT = 10; + SET_MIN_INT64 = 11; + SET_MIN_FLOAT64 = 12; + SET_MIN_BIG_DECIMAL = 13; + SUM_BIG_INT = 14; + SUM_INT64 = 15; + SUM_FLOAT64 = 16; + SUM_BIG_DECIMAL = 17; + } + + Type type = 1; + uint64 ord = 2; + string key = 3; + bytes value = 4; + +} diff --git a/service/testing.go b/service/testing.go index 30a935011..1766479a2 100644 --- a/service/testing.go +++ b/service/testing.go @@ -11,18 +11,8 @@ import ( pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2" "github.com/streamingfast/substreams/pipeline/outputmodules" "github.com/streamingfast/substreams/service/config" - "github.com/streamingfast/substreams/storage/store" ) -// TestTraceID must be used everywhere a TraceID is required. It must be the same -// between tier1 and tier2, otherwise tier1 will not find the file produced by -// tier2 correctly. -var TestTraceID = "00000000000000000000000000000000" - -func TestTraceIDParam() store.TraceIDParam { - return store.TraceIDParam(TestTraceID) -} - func TestNewService(runtimeConfig config.RuntimeConfig, linearHandoffBlockNum uint64, streamFactoryFunc StreamFactoryFunc) *Tier1Service { return &Tier1Service{ blockType: "sf.substreams.v1.test.Block", @@ -58,10 +48,6 @@ func TestNewServiceTier2(runtimeConfig config.RuntimeConfig, streamFactoryFunc S } } -func (s *Tier2Service) TestProcessRange(ctx context.Context, request *pbssinternal.ProcessRangeRequest, respFunc substreams.ResponseFunc, traceID *string) error { - if traceID == nil { - traceID = &TestTraceID - } - - return s.processRange(ctx, request, respFunc, *traceID) +func (s *Tier2Service) TestProcessRange(ctx context.Context, request *pbssinternal.ProcessRangeRequest, respFunc substreams.ResponseFunc) error { + return s.processRange(ctx, request, respFunc) } diff --git a/service/tier1.go b/service/tier1.go index 7f6a86a7b..95708bf35 100644 --- a/service/tier1.go +++ b/service/tier1.go @@ -399,7 +399,7 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ return fmt.Errorf("new config map: %w", err) } - storeConfigs, err := store.NewConfigMap(cacheStore, outputGraph.Stores(), outputGraph.ModuleHashes(), tracing.GetTraceID(ctx).String()) + storeConfigs, err := store.NewConfigMap(cacheStore, outputGraph.Stores(), outputGraph.ModuleHashes()) if err != nil { return fmt.Errorf("configuring stores: %w", err) } @@ -433,7 +433,6 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ execOutputCacheEngine, s.runtimeConfig, respFunc, - tracing.GetTraceID(ctx).String(), opts..., ) diff --git a/service/tier2.go b/service/tier2.go index 077cd1103..00a391a3b 100644 --- a/service/tier2.go +++ b/service/tier2.go @@ -221,7 +221,7 @@ func (s *Tier2Service) ProcessRange(request *pbssinternal.ProcessRangeRequest, s logger.Info("incoming substreams ProcessRange request", fields...) respFunc := tier2ResponseHandler(ctx, logger, streamSrv) - err = s.processRange(ctx, request, respFunc, tracing.GetTraceID(ctx).String()) + err = s.processRange(ctx, request, respFunc) grpcError = toGRPCError(ctx, err) switch status.Code(grpcError) { @@ -232,7 +232,7 @@ func (s *Tier2Service) ProcessRange(request *pbssinternal.ProcessRangeRequest, s return grpcError } -func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.ProcessRangeRequest, respFunc substreams.ResponseFunc, traceID string) error { +func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.ProcessRangeRequest, respFunc substreams.ResponseFunc) error { logger := reqctx.Logger(ctx) if err := outputmodules.ValidateTier2Request(request, s.blockType); err != nil { @@ -278,12 +278,23 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P return fmt.Errorf("internal error setting store: %w", err) } + if clonableStore, ok := cacheStore.(dstore.Clonable); ok { + cloned, err := clonableStore.Clone(ctx) + if err != nil { + return fmt.Errorf("cloning store: %w", err) + } + cloned.SetMeter(dmetering.GetBytesMeter(ctx)) + } + // gs://substreams.streamingfast.store.google/cache/v2/{modulehash}/outputs/00001000-00002000.outputs.zstd + + // gs://substreams.streamingfast.store.google/cache/v2/ {modulehash}/state + execOutputConfigs, err := execout.NewConfigs(cacheStore, outputGraph.UsedModulesUpToStage(int(request.Stage)), outputGraph.ModuleHashes(), s.runtimeConfig.StateBundleSize, logger) if err != nil { return fmt.Errorf("new config map: %w", err) } - storeConfigs, err := store.NewConfigMap(cacheStore, outputGraph.Stores(), outputGraph.ModuleHashes(), traceID) + storeConfigs, err := store.NewConfigMap(cacheStore, outputGraph.Stores(), outputGraph.ModuleHashes()) if err != nil { return fmt.Errorf("configuring stores: %w", err) } @@ -322,7 +333,6 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P s.runtimeConfig, respFunc, // This must always be the parent/global trace id, the one that comes from tier1 - traceID, opts..., ) @@ -437,20 +447,28 @@ func evaluateModulesRequiredToRun( if c.ModuleKind() == pbsubstreams.ModuleKindMap { if runningLastStage && name == outputModule { + // WARNING be careful, if we want to force producing module outputs/stores states for ALL STAGES on the first block range, + // this optimization will be in our way.. logger.Info("found existing exec output for output_module, skipping run", zap.String("output_module", name)) return nil, nil, nil, nil } continue } - // TODO when the partial KV stores are back to being 'generic' (without traceID), we will also be able to skip the store if the partial exists + // if either full or partial kv exists, we can skip the module storeExists, err := storeConfigs[name].ExistsFullKV(ctx, stopBlock) if err != nil { - return nil, nil, nil, fmt.Errorf("checking file existence: %w", err) + return nil, nil, nil, fmt.Errorf("checking fullkv file existence: %w", err) } if !storeExists { - // some stores may already exist completely on this stage, but others do not, so we keep going but ignore those - requiredModules[name] = usedModules[name] + partialStoreExists, err := storeConfigs[name].ExistsPartialKV(ctx, startBlock, stopBlock) + if err != nil { + return nil, nil, nil, fmt.Errorf("checking partial file existence: %w", err) + } + if !partialStoreExists { + // some stores may already exist completely on this stage, but others do not, so we keep going but ignore those + requiredModules[name] = usedModules[name] + } } } diff --git a/storage/store/base_store_test.go b/storage/store/base_store_test.go index 5e55ae9da..8b8187e87 100644 --- a/storage/store/base_store_test.go +++ b/storage/store/base_store_test.go @@ -81,10 +81,10 @@ func TestFileName(t *testing.T) { stateFileName := FullStateFileName(&block.Range{StartBlock: 100, ExclusiveEndBlock: 10000}) require.Equal(t, "0000010000-0000000100.kv", stateFileName) - partialFileName := PartialFileName(&block.Range{StartBlock: 10000, ExclusiveEndBlock: 20000}, "abc") - require.Equal(t, "0000020000-0000010000.abc.partial", partialFileName) + partialFileName := PartialFileName(&block.Range{StartBlock: 10000, ExclusiveEndBlock: 20000}) + require.Equal(t, "0000020000-0000010000.partial", partialFileName) - partialFileNameLegacy := PartialFileName(&block.Range{StartBlock: 10000, ExclusiveEndBlock: 20000}, "") + partialFileNameLegacy := PartialFileName(&block.Range{StartBlock: 10000, ExclusiveEndBlock: 20000}) require.Equal(t, "0000020000-0000010000.partial", partialFileNameLegacy) } diff --git a/storage/store/config.go b/storage/store/config.go index 869d3fa2e..5cb3a699e 100644 --- a/storage/store/config.go +++ b/storage/store/config.go @@ -10,14 +10,16 @@ import ( "go.uber.org/zap" "github.com/streamingfast/substreams/block" + pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2" pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1" "github.com/streamingfast/substreams/storage/store/marshaller" ) type Config struct { - name string - moduleHash string - objStore dstore.Store + name string + moduleHash string + objStore dstore.Store + outputsStore dstore.Store moduleInitialBlock uint64 updatePolicy pbsubstreams.Module_KindStore_UpdatePolicy @@ -26,11 +28,6 @@ type Config struct { appendLimit uint64 totalSizeLimit uint64 itemSizeLimit uint64 - - // traceID uniquely identifies the connection ID so that store can be - // written to unique filename preventing some races when multiple Substreams - // request works on the same range. - traceID string } func NewConfig( @@ -40,24 +37,27 @@ func NewConfig( updatePolicy pbsubstreams.Module_KindStore_UpdatePolicy, valueType string, store dstore.Store, - traceID string, ) (*Config, error) { subStore, err := store.SubStore(fmt.Sprintf("%s/states", moduleHash)) if err != nil { return nil, fmt.Errorf("creating sub store: %w", err) } + outputsStore, err := store.SubStore(fmt.Sprintf("%s/outputs", moduleHash)) + if err != nil { + return nil, fmt.Errorf("creating sub store: %w", err) + } return &Config{ name: name, updatePolicy: updatePolicy, valueType: valueType, objStore: subStore, + outputsStore: outputsStore, moduleInitialBlock: moduleInitialBlock, moduleHash: moduleHash, appendLimit: 8_388_608, // 8MiB = 8 * 1024 * 1024, totalSizeLimit: 1_073_741_824, // 1GiB itemSizeLimit: 10_485_760, // 10MiB - traceID: traceID, }, nil } @@ -99,9 +99,15 @@ func (c *Config) ExistsFullKV(ctx context.Context, upTo uint64) (bool, error) { return c.objStore.FileExists(ctx, filename) } +func (c *Config) ExistsPartialKV(ctx context.Context, from, to uint64) (bool, error) { + filename := PartialFileName(block.NewRange(from, to)) + return c.objStore.FileExists(ctx, filename) +} + func (c *Config) NewPartialKV(initialBlock uint64, logger *zap.Logger) *PartialKV { return &PartialKV{ baseStore: c.newBaseStore(logger), + operations: &pbssinternal.Operations{}, initialBlock: initialBlock, seen: make(map[string]bool), } diff --git a/storage/store/configmap.go b/storage/store/configmap.go index 64d225806..e0a9d1801 100644 --- a/storage/store/configmap.go +++ b/storage/store/configmap.go @@ -10,7 +10,7 @@ import ( type ConfigMap map[string]*Config -func NewConfigMap(baseObjectStore dstore.Store, storeModules []*pbsubstreams.Module, moduleHashes *manifest.ModuleHashes, traceID string) (out ConfigMap, err error) { +func NewConfigMap(baseObjectStore dstore.Store, storeModules []*pbsubstreams.Module, moduleHashes *manifest.ModuleHashes) (out ConfigMap, err error) { out = make(ConfigMap) for _, storeModule := range storeModules { c, err := NewConfig( @@ -20,7 +20,6 @@ func NewConfigMap(baseObjectStore dstore.Store, storeModules []*pbsubstreams.Mod storeModule.GetKindStore().UpdatePolicy, storeModule.GetKindStore().ValueType, baseObjectStore, - traceID, ) if err != nil { return nil, fmt.Errorf("new store config for %q: %w", storeModule.Name, err) diff --git a/storage/store/filename.go b/storage/store/filename.go index 1018c72ff..b4294f782 100644 --- a/storage/store/filename.go +++ b/storage/store/filename.go @@ -38,7 +38,6 @@ type FileInfo struct { ModuleName string Filename string Range *block.Range - TraceID string Partial bool } @@ -53,14 +52,13 @@ func NewCompleteFileInfo(moduleName string, moduleInitialBlock uint64, exclusive } } -func NewPartialFileInfo(moduleName string, start uint64, exclusiveEndBlock uint64, traceID string) *FileInfo { +func NewPartialFileInfo(moduleName string, start uint64, exclusiveEndBlock uint64) *FileInfo { bRange := block.NewRange(start, exclusiveEndBlock) return &FileInfo{ ModuleName: moduleName, - Filename: PartialFileName(bRange, traceID), + Filename: PartialFileName(bRange), Range: bRange, - TraceID: traceID, Partial: true, } } @@ -75,18 +73,12 @@ func parseFileName(moduleName, filename string) (*FileInfo, bool) { ModuleName: moduleName, Filename: filename, Range: block.NewRange(uint64(mustAtoi(res[0][2])), uint64(mustAtoi(res[0][1]))), - TraceID: res[0][3], Partial: res[0][4] == "partial", }, true } -func PartialFileName(r *block.Range, traceID string) string { - if traceID == "" { - // Generate legacy partial filename - return fmt.Sprintf("%010d-%010d.partial", r.ExclusiveEndBlock, r.StartBlock) - } - - return fmt.Sprintf("%010d-%010d.%s.partial", r.ExclusiveEndBlock, r.StartBlock, traceID) +func PartialFileName(r *block.Range) string { + return fmt.Sprintf("%010d-%010d.partial", r.ExclusiveEndBlock, r.StartBlock) } func FullStateFileName(r *block.Range) string { diff --git a/storage/store/filename_test.go b/storage/store/filename_test.go index 546c8bdbc..896d3192c 100644 --- a/storage/store/filename_test.go +++ b/storage/store/filename_test.go @@ -15,22 +15,16 @@ func Test_parseFileName(t *testing.T) { want *FileInfo want1 bool }{ - { - "partial legacy", - fmt.Sprintf("%010d-%010d.partial", 100, 0), - &FileInfo{ModuleName: "test", Filename: "0000000100-0000000000.partial", Range: block.NewRange(0, 100), TraceID: "", Partial: true}, - true, - }, { "partial", - fmt.Sprintf("%010d-%010d.abcdef.partial", 100, 0), - &FileInfo{ModuleName: "test", Filename: "0000000100-0000000000.abcdef.partial", Range: block.NewRange(0, 100), TraceID: "abcdef", Partial: true}, + fmt.Sprintf("%010d-%010d.partial", 100, 0), + &FileInfo{ModuleName: "test", Filename: "0000000100-0000000000.partial", Range: block.NewRange(0, 100), Partial: true}, true, }, { "full", fmt.Sprintf("%010d-%010d.kv", 100, 0), - &FileInfo{ModuleName: "test", Filename: "0000000100-0000000000.kv", Range: block.NewRange(0, 100), TraceID: "", Partial: false}, + &FileInfo{ModuleName: "test", Filename: "0000000100-0000000000.kv", Range: block.NewRange(0, 100), Partial: false}, true, }, } diff --git a/storage/store/full_kv.go b/storage/store/full_kv.go index ce0014b80..11c9d2443 100644 --- a/storage/store/full_kv.go +++ b/storage/store/full_kv.go @@ -98,6 +98,10 @@ func (s *FullKV) Reset() { s.lastOrdinal = 0 } +func (p *FullKV) ApplyOps(in []byte) error { + return applyOps(in, p.baseStore) +} + func (s *FullKV) String() string { return fmt.Sprintf("fullKV name %s moduleInitialBlock %d keyCount %d loadedFrom %s deltasCount %d", s.Name(), s.moduleInitialBlock, len(s.kv), s.loadedFrom, len(s.deltas)) } diff --git a/storage/store/init_test.go b/storage/store/init_test.go index e6fdacc5e..672d02312 100644 --- a/storage/store/init_test.go +++ b/storage/store/init_test.go @@ -24,7 +24,7 @@ func newTestBaseStore( appendLimit = 10 } - config, err := NewConfig("test", 0, "test.module.hash", updatePolicy, valueType, store, "") + config, err := NewConfig("test", 0, "test.module.hash", updatePolicy, valueType, store) config.appendLimit = appendLimit config.totalSizeLimit = 9999 config.itemSizeLimit = 10_485_760 diff --git a/storage/store/partial_kv.go b/storage/store/partial_kv.go index 758ae3433..5e85e776a 100644 --- a/storage/store/partial_kv.go +++ b/storage/store/partial_kv.go @@ -2,10 +2,17 @@ package store import ( "context" + "encoding/binary" "fmt" + "math" + "math/big" + "github.com/shopspring/decimal" + pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2" + pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1" "github.com/streamingfast/substreams/storage/store/marshaller" "go.uber.org/zap" + "google.golang.org/protobuf/proto" ) var _ Store = (*PartialKV)(nil) @@ -13,6 +20,7 @@ var _ Store = (*PartialKV)(nil) type PartialKV struct { *baseStore + operations *pbssinternal.Operations initialBlock uint64 // block at which we initialized this store DeletedPrefixes []string @@ -65,7 +73,7 @@ func (p *PartialKV) Save(endBoundaryBlock uint64) (*FileInfo, *fileWriter, error return nil, nil, fmt.Errorf("marshal partial data: %w", err) } - file := NewPartialFileInfo(p.name, p.initialBlock, endBoundaryBlock, p.traceID) + file := NewPartialFileInfo(p.name, p.initialBlock, endBoundaryBlock) p.logger.Debug("partial store save written", zap.String("file_name", file.Filename), zap.Stringer("block_range", file.Range)) fw := &fileWriter{ @@ -78,6 +86,12 @@ func (p *PartialKV) Save(endBoundaryBlock uint64) (*FileInfo, *fileWriter, error } func (p *PartialKV) DeletePrefix(ord uint64, prefix string) { + p.operations.Operations = append(p.operations.Operations, &pbssinternal.Operation{ + Type: pbssinternal.Operation_DELETE_PREFIX, + Ord: ord, + Key: prefix, + }) + p.baseStore.DeletePrefix(ord, prefix) if !p.seen[prefix] { @@ -98,3 +112,307 @@ func (p *PartialKV) DeleteStore(ctx context.Context, file *FileInfo) (err error) func (p *PartialKV) String() string { return fmt.Sprintf("partialKV name %s moduleInitialBlock %d keyCount %d deltasCount %d loadFrom %s", p.Name(), p.moduleInitialBlock, len(p.kv), len(p.deltas), p.loadedFrom) } + +func (p *PartialKV) Reset() { + p.operations = &pbssinternal.Operations{} + p.baseStore.Reset() +} + +func (p *PartialKV) ApplyOps(in []byte) error { + return applyOps(in, p.baseStore) +} + +func (p *PartialKV) ReadOps() []byte { + data, err := proto.Marshal(p.operations) + if err != nil { + panic(err) + } + return data +} + +func applyOps(in []byte, store *baseStore) error { + + ops := &pbssinternal.Operations{} + if err := proto.Unmarshal(in, ops); err != nil { + return err + } + + for _, op := range ops.Operations { + switch op.Type { + case pbssinternal.Operation_SET: + store.Set(op.Ord, op.Key, string(op.Value)) + case pbssinternal.Operation_SET_BYTES: + store.SetBytes(op.Ord, op.Key, op.Value) + case pbssinternal.Operation_SET_IF_NOT_EXISTS: + store.SetIfNotExists(op.Ord, op.Key, string(op.Value)) + case pbssinternal.Operation_SET_BYTES_IF_NOT_EXISTS: + store.SetBytesIfNotExists(op.Ord, op.Key, op.Value) + case pbssinternal.Operation_APPEND: + store.Append(op.Ord, op.Key, op.Value) + case pbssinternal.Operation_DELETE_PREFIX: + store.DeletePrefix(op.Ord, op.Key) + case pbssinternal.Operation_SET_MAX_BIG_INT: + big := new(big.Int) + big.SetBytes(op.Value) + store.SetMaxBigInt(op.Ord, op.Key, big) + case pbssinternal.Operation_SET_MAX_INT64: + big := new(big.Int) + big.SetBytes(op.Value) + val := big.Int64() + store.SetMaxInt64(op.Ord, op.Key, val) + case pbssinternal.Operation_SET_MAX_FLOAT64: + val := math.Float64frombits(binary.LittleEndian.Uint64(op.Value)) + store.SetMaxFloat64(op.Ord, op.Key, val) + case pbssinternal.Operation_SET_MAX_BIG_DECIMAL: + val, err := decimal.NewFromString(string(op.Value)) + if err != nil { + return err + } + store.SetMaxBigDecimal(op.Ord, op.Key, val) + case pbssinternal.Operation_SET_MIN_BIG_INT: + big := new(big.Int) + big.SetBytes(op.Value) + store.SetMinBigInt(op.Ord, op.Key, big) + case pbssinternal.Operation_SET_MIN_INT64: + big := new(big.Int) + big.SetBytes(op.Value) + val := big.Int64() + store.SetMinInt64(op.Ord, op.Key, val) + case pbssinternal.Operation_SET_MIN_FLOAT64: + val := math.Float64frombits(binary.LittleEndian.Uint64(op.Value)) + store.SetMinFloat64(op.Ord, op.Key, val) + case pbssinternal.Operation_SET_MIN_BIG_DECIMAL: + val, err := decimal.NewFromString(string(op.Value)) + if err != nil { + return err + } + store.SetMinBigDecimal(op.Ord, op.Key, val) + case pbssinternal.Operation_SUM_BIG_INT: + big := new(big.Int) + big.SetBytes(op.Value) + store.SumBigInt(op.Ord, op.Key, big) + case pbssinternal.Operation_SUM_INT64: + big := new(big.Int) + big.SetBytes(op.Value) + val := big.Int64() + store.SumInt64(op.Ord, op.Key, val) + case pbssinternal.Operation_SUM_FLOAT64: + val := math.Float64frombits(binary.LittleEndian.Uint64(op.Value)) + store.SumFloat64(op.Ord, op.Key, val) + case pbssinternal.Operation_SUM_BIG_DECIMAL: + val, err := decimal.NewFromString(string(op.Value)) + if err != nil { + return err + } + store.SumBigDecimal(op.Ord, op.Key, val) + } + } + return nil +} + +func (p *PartialKV) ApplyDelta(delta *pbsubstreams.StoreDelta) { + panic("caching store cannot be used with deltas") +} + +func (p *PartialKV) ApplyDeltasReverse(deltas []*pbsubstreams.StoreDelta) { + panic("caching store cannot be used with deltas") +} + +func (p *PartialKV) Set(ord uint64, key string, value string) { + p.operations.Operations = append(p.operations.Operations, &pbssinternal.Operation{ + Type: pbssinternal.Operation_SET, + Ord: ord, + Key: key, + Value: []byte(value), + }) + + p.baseStore.Set(ord, key, value) +} + +func (p *PartialKV) SetBytes(ord uint64, key string, value []byte) { + p.operations.Operations = append(p.operations.Operations, &pbssinternal.Operation{ + Type: pbssinternal.Operation_SET_BYTES, + Ord: ord, + Key: key, + Value: value, + }) + + p.baseStore.SetBytes(ord, key, value) +} + +func (p *PartialKV) SetIfNotExists(ord uint64, key string, value string) { + p.operations.Operations = append(p.operations.Operations, &pbssinternal.Operation{ + Type: pbssinternal.Operation_SET_IF_NOT_EXISTS, + Ord: ord, + Key: key, + Value: []byte(value), + }) + + p.baseStore.SetIfNotExists(ord, key, value) +} + +func (p *PartialKV) SetBytesIfNotExists(ord uint64, key string, value []byte) { + p.operations.Operations = append(p.operations.Operations, &pbssinternal.Operation{ + Type: pbssinternal.Operation_SET_BYTES_IF_NOT_EXISTS, + Ord: ord, + Key: key, + Value: value, + }) + + p.baseStore.SetBytesIfNotExists(ord, key, value) +} + +func (p *PartialKV) Append(ord uint64, key string, value []byte) error { + p.operations.Operations = append(p.operations.Operations, &pbssinternal.Operation{ + Type: pbssinternal.Operation_APPEND, + Ord: ord, + Key: key, + Value: value, + }) + + return p.baseStore.Append(ord, key, value) +} + +func (p *PartialKV) SetMaxBigInt(ord uint64, key string, value *big.Int) { + p.operations.Operations = append(p.operations.Operations, &pbssinternal.Operation{ + Type: pbssinternal.Operation_SET_MAX_BIG_INT, + Ord: ord, + Key: key, + Value: value.Bytes(), + }) + + p.baseStore.SetMaxBigInt(ord, key, value) +} + +func (p *PartialKV) SetMaxInt64(ord uint64, key string, value int64) { + big := new(big.Int) + big.SetInt64(value) + + p.operations.Operations = append(p.operations.Operations, &pbssinternal.Operation{ + Type: pbssinternal.Operation_SET_MAX_INT64, + Ord: ord, + Key: key, + Value: big.Bytes(), + }) + p.baseStore.SetMaxInt64(ord, key, value) +} + +func (p *PartialKV) SetMaxFloat64(ord uint64, key string, value float64) { + var buf [8]byte + binary.LittleEndian.PutUint64(buf[:], math.Float64bits(value)) + p.operations.Operations = append(p.operations.Operations, &pbssinternal.Operation{ + Type: pbssinternal.Operation_SET_MAX_FLOAT64, + Ord: ord, + Key: key, + Value: buf[:], + }) + + p.baseStore.SetMaxFloat64(ord, key, value) +} + +func (p *PartialKV) SetMaxBigDecimal(ord uint64, key string, value decimal.Decimal) { + p.operations.Operations = append(p.operations.Operations, &pbssinternal.Operation{ + Type: pbssinternal.Operation_SET_MAX_BIG_DECIMAL, + Ord: ord, + Key: key, + Value: []byte(value.String()), + }) + + p.baseStore.SetMaxBigDecimal(ord, key, value) +} + +func (p *PartialKV) SetMinBigInt(ord uint64, key string, value *big.Int) { + p.operations.Operations = append(p.operations.Operations, &pbssinternal.Operation{ + Type: pbssinternal.Operation_SET_MIN_BIG_INT, + Ord: ord, + Key: key, + Value: value.Bytes(), + }) + p.baseStore.SetMinBigInt(ord, key, value) +} + +func (p *PartialKV) SetMinInt64(ord uint64, key string, value int64) { + big := new(big.Int) + big.SetInt64(value) + p.operations.Operations = append(p.operations.Operations, &pbssinternal.Operation{ + Type: pbssinternal.Operation_SET_MIN_INT64, + Ord: ord, + Key: key, + Value: big.Bytes(), + }) + + p.baseStore.SetMinInt64(ord, key, value) +} + +func (p *PartialKV) SetMinFloat64(ord uint64, key string, value float64) { + var buf [8]byte + binary.LittleEndian.PutUint64(buf[:], math.Float64bits(value)) + p.operations.Operations = append(p.operations.Operations, &pbssinternal.Operation{ + Type: pbssinternal.Operation_SET_MIN_FLOAT64, + Ord: ord, + Key: key, + Value: buf[:], + }) + + p.baseStore.SetMinFloat64(ord, key, value) +} + +func (p *PartialKV) SetMinBigDecimal(ord uint64, key string, value decimal.Decimal) { + p.operations.Operations = append(p.operations.Operations, &pbssinternal.Operation{ + Type: pbssinternal.Operation_SET_MIN_BIG_DECIMAL, + Ord: ord, + Key: key, + Value: []byte(value.String()), + }) + + p.baseStore.SetMinBigDecimal(ord, key, value) +} + +func (p *PartialKV) SumBigInt(ord uint64, key string, value *big.Int) { + p.operations.Operations = append(p.operations.Operations, &pbssinternal.Operation{ + Type: pbssinternal.Operation_SUM_BIG_INT, + Ord: ord, + Key: key, + Value: value.Bytes(), + }) + + p.baseStore.SumBigInt(ord, key, value) +} + +func (p *PartialKV) SumInt64(ord uint64, key string, value int64) { + big := new(big.Int) + big.SetInt64(value) + + p.operations.Operations = append(p.operations.Operations, &pbssinternal.Operation{ + Type: pbssinternal.Operation_SUM_INT64, + Ord: ord, + Key: key, + Value: big.Bytes(), + }) + + p.baseStore.SumInt64(ord, key, value) +} + +func (p *PartialKV) SumFloat64(ord uint64, key string, value float64) { + var buf [8]byte + binary.LittleEndian.PutUint64(buf[:], math.Float64bits(value)) + p.operations.Operations = append(p.operations.Operations, &pbssinternal.Operation{ + Type: pbssinternal.Operation_SUM_FLOAT64, + Ord: ord, + Key: key, + Value: buf[:], + }) + + p.baseStore.SumFloat64(ord, key, value) +} + +func (p *PartialKV) SumBigDecimal(ord uint64, key string, value decimal.Decimal) { + p.operations.Operations = append(p.operations.Operations, &pbssinternal.Operation{ + Type: pbssinternal.Operation_SUM_BIG_DECIMAL, + Ord: ord, + Key: key, + Value: []byte(value.String()), + }) + + p.baseStore.SumBigDecimal(ord, key, value) +} diff --git a/storage/store/state/snapshot.go b/storage/store/state/snapshot.go index 00155ec5c..e5cf76047 100644 --- a/storage/store/state/snapshot.go +++ b/storage/store/state/snapshot.go @@ -40,12 +40,6 @@ func (s *storeSnapshots) Sort() { left := s.Partials[i] right := s.Partials[j] - // Sort by start block first, then by trace ID so at least we - // take partials all from the same producer. - if left.Range.StartBlock == right.Range.StartBlock { - return left.TraceID < right.TraceID - } - return left.Range.StartBlock < right.Range.StartBlock }) } diff --git a/storage/store/testing.go b/storage/store/testing.go index e87f118b7..33392a762 100644 --- a/storage/store/testing.go +++ b/storage/store/testing.go @@ -53,15 +53,9 @@ func fileFromRanges(kind string, in string, params ...FileInfoParam) FileInfos { param.apply(file) } - file.Filename = PartialFileName(blockRange, file.TraceID) + file.Filename = PartialFileName(blockRange) files[i] = file } return files } - -type TraceIDParam string - -func (t TraceIDParam) apply(file *FileInfo) { - file.TraceID = string(t) -} diff --git a/test/integration_test.go b/test/integration_test.go index 7bb54ba1b..11267c8cf 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -203,7 +203,7 @@ func TestOneStoreOneMap(t *testing.T) { "ebd5bb65aaf4471e468efea126f27dbddb37b59e/outputs/0000000010-0000000020.output", "ebd5bb65aaf4471e468efea126f27dbddb37b59e/states/0000000010-0000000001.kv", // store states "ebd5bb65aaf4471e468efea126f27dbddb37b59e/states/0000000020-0000000001.kv", - // "states/0000000025-0000000020.00000000000000000000000000000000.partial", // produced, then deleted + // "states/0000000025-0000000020.partial", // produced, then deleted }, }, { @@ -218,7 +218,7 @@ func TestOneStoreOneMap(t *testing.T) { "ebd5bb65aaf4471e468efea126f27dbddb37b59e/outputs/0000000010-0000000020.output", "ebd5bb65aaf4471e468efea126f27dbddb37b59e/states/0000000010-0000000001.kv", // store states "ebd5bb65aaf4471e468efea126f27dbddb37b59e/states/0000000020-0000000001.kv", - // "states/0000000025-0000000020.00000000000000000000000000000000.partial", // produced, then deleted + // "states/0000000025-0000000020.partial", // produced, then deleted //"states/0000000030-0000000001.kv", // Again, backprocess wouldn't save this one, nor does it need to. }, }, @@ -300,7 +300,7 @@ func TestOneStoreOneMap(t *testing.T) { stopBlock: 29, production: true, preWork: func(t *testing.T, run *testRun, workerFactory work.WorkerFactory) { - partialPreWork(t, 1, 10, 0, run, workerFactory, "00000000000000000000000000000000") + partialPreWork(t, 1, 10, 0, run, workerFactory) }, expectedResponseCount: 28, expectFiles: []string{ @@ -311,86 +311,6 @@ func TestOneStoreOneMap(t *testing.T) { "3574de26d590713344b911bbc1c3bf3305ccb906/outputs/0000000001-0000000010.output", "3574de26d590713344b911bbc1c3bf3305ccb906/outputs/0000000010-0000000020.output", "3574de26d590713344b911bbc1c3bf3305ccb906/outputs/0000000020-0000000029.output", - - // Existing partial files are not re-used - //"states/0000000010-0000000001.00000000000000000000000000000000.partial", // FIXME: perhaps wasn't deleted before? - }, - }, - { - name: "prod_mode_multiple_partial_different_trace_id", - startBlock: 1, - linearBlock: 29, - stopBlock: 29, - production: true, - preWork: func(t *testing.T, run *testRun, workerFactory work.WorkerFactory) { - partialPreWork(t, 1, 10, 0, run, workerFactory, "11111111111111111111") - partialPreWork(t, 1, 10, 0, run, workerFactory, "22222222222222222222") - }, - expectedResponseCount: 28, - expectFiles: []string{ - "ebd5bb65aaf4471e468efea126f27dbddb37b59e/states/0000000010-0000000001.kv", - "ebd5bb65aaf4471e468efea126f27dbddb37b59e/states/0000000020-0000000001.kv", - "ebd5bb65aaf4471e468efea126f27dbddb37b59e/outputs/0000000001-0000000010.output", - "ebd5bb65aaf4471e468efea126f27dbddb37b59e/outputs/0000000010-0000000020.output", - - "3574de26d590713344b911bbc1c3bf3305ccb906/outputs/0000000001-0000000010.output", - "3574de26d590713344b911bbc1c3bf3305ccb906/outputs/0000000010-0000000020.output", - "3574de26d590713344b911bbc1c3bf3305ccb906/outputs/0000000020-0000000029.output", - - // Existing partial files are not re-used - "ebd5bb65aaf4471e468efea126f27dbddb37b59e/states/0000000010-0000000001.11111111111111111111.partial", - "ebd5bb65aaf4471e468efea126f27dbddb37b59e/states/0000000010-0000000001.22222222222222222222.partial", - }, - }, - { - name: "prod_mode_partial_legacy_generated", - startBlock: 1, - linearBlock: 29, - stopBlock: 29, - production: true, - preWork: func(t *testing.T, run *testRun, workerFactory work.WorkerFactory) { - // Using an empty trace id brings up the old behavior where files are not suffixed with a trace id - partialPreWork(t, 1, 10, 0, run, workerFactory, "") - }, - expectedResponseCount: 28, - expectFiles: []string{ - "ebd5bb65aaf4471e468efea126f27dbddb37b59e/states/0000000010-0000000001.kv", - "ebd5bb65aaf4471e468efea126f27dbddb37b59e/states/0000000020-0000000001.kv", - "ebd5bb65aaf4471e468efea126f27dbddb37b59e/outputs/0000000001-0000000010.output", - "ebd5bb65aaf4471e468efea126f27dbddb37b59e/outputs/0000000010-0000000020.output", - "3574de26d590713344b911bbc1c3bf3305ccb906/outputs/0000000001-0000000010.output", - "3574de26d590713344b911bbc1c3bf3305ccb906/outputs/0000000010-0000000020.output", - "3574de26d590713344b911bbc1c3bf3305ccb906/outputs/0000000020-0000000029.output", - - // Existing partial files are not re-used - "ebd5bb65aaf4471e468efea126f27dbddb37b59e/states/0000000010-0000000001.partial", - }, - }, - { - name: "prod_mode_multiple_partial_mixed_legacy_and_new", - startBlock: 1, - linearBlock: 29, - stopBlock: 29, - production: true, - preWork: func(t *testing.T, run *testRun, workerFactory work.WorkerFactory) { - // Using an empty trace id brings up the old behavior where files are not suffixed with a trace id - partialPreWork(t, 1, 10, 0, run, workerFactory, "") - partialPreWork(t, 1, 10, 0, run, workerFactory, "11111111111111111111") - }, - expectedResponseCount: 28, - expectFiles: []string{ - "ebd5bb65aaf4471e468efea126f27dbddb37b59e/states/0000000010-0000000001.kv", - "ebd5bb65aaf4471e468efea126f27dbddb37b59e/states/0000000020-0000000001.kv", - "ebd5bb65aaf4471e468efea126f27dbddb37b59e/outputs/0000000001-0000000010.output", - "ebd5bb65aaf4471e468efea126f27dbddb37b59e/outputs/0000000010-0000000020.output", - - "3574de26d590713344b911bbc1c3bf3305ccb906/outputs/0000000001-0000000010.output", - "3574de26d590713344b911bbc1c3bf3305ccb906/outputs/0000000010-0000000020.output", - "3574de26d590713344b911bbc1c3bf3305ccb906/outputs/0000000020-0000000029.output", - - // Existing partial files are not re-used - "ebd5bb65aaf4471e468efea126f27dbddb37b59e/states/0000000010-0000000001.partial", - "ebd5bb65aaf4471e468efea126f27dbddb37b59e/states/0000000010-0000000001.11111111111111111111.partial", }, }, } @@ -543,9 +463,8 @@ func assertFiles(t *testing.T, tempDir string, wantedFiles ...string) { assert.ElementsMatch(t, wantedFiles, actualFiles) } -func partialPreWork(t *testing.T, start, end uint64, stageIdx int, run *testRun, workerFactory work.WorkerFactory, traceID string) { +func partialPreWork(t *testing.T, start, end uint64, stageIdx int, run *testRun, workerFactory work.WorkerFactory) { worker := workerFactory(zlog) - worker.(*TestWorker).traceID = &traceID // FIXME: use the new `Work` interface here, and validate that the // caller to `partialPreWork` doesn't need to be changed too much? :) diff --git a/test/runnable_test.go b/test/runnable_test.go index 4c1bc5a0c..72b9d75c9 100644 --- a/test/runnable_test.go +++ b/test/runnable_test.go @@ -237,7 +237,6 @@ func processInternalRequest( responseCollector *responseCollector, blockProcessedCallBack blockProcessedCallBack, testTempDir string, - traceID *string, ) error { t.Helper() @@ -263,7 +262,7 @@ func processInternalRequest( ) svc := service.TestNewServiceTier2(runtimeConfig, tr.StreamFactory) - return svc.TestProcessRange(ctx, request, responseCollector.Collect, traceID) + return svc.TestProcessRange(ctx, request, responseCollector.Collect) } func processRequest( diff --git a/test/worker_test.go b/test/worker_test.go index 7f4e048c0..8de68d1ee 100644 --- a/test/worker_test.go +++ b/test/worker_test.go @@ -23,7 +23,6 @@ type TestWorker struct { blockProcessedCallBack blockProcessedCallBack testTempDir string id uint64 - traceID *string } var workerID atomic.Uint64 @@ -49,7 +48,7 @@ func (w *TestWorker) Work(ctx context.Context, unit stage.Unit, workRange *block ) return func() loop.Msg { - if err := processInternalRequest(w.t, ctx, request, nil, w.newBlockGenerator, w.responseCollector, w.blockProcessedCallBack, w.testTempDir, w.traceID); err != nil { + if err := processInternalRequest(w.t, ctx, request, nil, w.newBlockGenerator, w.responseCollector, w.blockProcessedCallBack, w.testTempDir); err != nil { return work.MsgJobFailed{Unit: unit, Error: fmt.Errorf("processing test tier2 request: %w", err)} } logger.Info("worker done running job", diff --git a/tools/analytics_store_stats.go b/tools/analytics_store_stats.go index 90f0f2293..0d7993a19 100644 --- a/tools/analytics_store_stats.go +++ b/tools/analytics_store_stats.go @@ -109,7 +109,6 @@ func StoreStatsE(cmd *cobra.Command, args []string) error { module.GetKind().(*pbsubstreams.Module_KindStore_).KindStore.UpdatePolicy, module.GetKind().(*pbsubstreams.Module_KindStore_).KindStore.ValueType, baseDStore, - "", ) if err != nil { zlog.Error("creating store config", zap.Error(err)) diff --git a/tools/check.go b/tools/check.go index 4a9e7f7b3..e4ce0fbb9 100644 --- a/tools/check.go +++ b/tools/check.go @@ -64,7 +64,7 @@ func newStore(storeURL string) (*store2.FullKV, dstore.Store, error) { return nil, nil, fmt.Errorf("could not create store from %s: %w", storeURL, err) } - config, err := store2.NewConfig("", 0, "", pbsubstreams.Module_KindStore_UPDATE_POLICY_SET_IF_NOT_EXISTS, "", remoteStore, "") + config, err := store2.NewConfig("", 0, "", pbsubstreams.Module_KindStore_UPDATE_POLICY_SET_IF_NOT_EXISTS, "", remoteStore) if err != nil { return nil, nil, err } diff --git a/tools/decode.go b/tools/decode.go index c132bf49b..a0f2cbeaa 100644 --- a/tools/decode.go +++ b/tools/decode.go @@ -284,7 +284,7 @@ func searchStateModule( stateStore dstore.Store, protoFiles []*descriptorpb.FileDescriptorProto, ) error { - config, err := store.NewConfig(module.Name, module.InitialBlock, moduleHash, module.GetKindStore().GetUpdatePolicy(), module.GetKindStore().GetValueType(), stateStore, "") + config, err := store.NewConfig(module.Name, module.InitialBlock, moduleHash, module.GetKindStore().GetUpdatePolicy(), module.GetKindStore().GetValueType(), stateStore) if err != nil { return fmt.Errorf("initializing store config module %q: %w", module.Name, err) } diff --git a/tools/module.go b/tools/module.go index cb06fd1c2..8e22bbceb 100644 --- a/tools/module.go +++ b/tools/module.go @@ -114,7 +114,6 @@ func moduleRunE(cmd *cobra.Command, args []string) error { module.GetKindStore().UpdatePolicy, module.GetKindStore().ValueType, stateStore, - "", ) cli.NoError(err, "unable to create store config") diff --git a/wasm/call_test.go b/wasm/call_test.go index c425f5da6..8cc81c158 100644 --- a/wasm/call_test.go +++ b/wasm/call_test.go @@ -482,7 +482,7 @@ func Test_CallStoreOps(t *testing.T) { func newTestCall(updatePolicy pbsubstreams.Module_KindStore_UpdatePolicy, valueType string) *Call { myStore := dstore.NewMockStore(nil) - storeConf, err := store.NewConfig("test", 0, "", updatePolicy, valueType, myStore, "test") + storeConf, err := store.NewConfig("test", 0, "", updatePolicy, valueType, myStore) if err != nil { panic("failed") }