From f32ee4d76d4f1236b38c86e0f66e1953fcd27394 Mon Sep 17 00:00:00 2001 From: arnaudberger Date: Tue, 26 Mar 2024 18:11:52 -0400 Subject: [PATCH] clock distributor implementation --- go.mod | 1 + service/tier2.go | 36 +++++++++++++---------------------- service/utils.go | 27 ++++++++++++++++++++++++++ service/utils_test.go | 37 ++++++++++++++++++++++++++++++++++++ storage/execout/file.go | 13 +++++++++++++ storage/execout/file_test.go | 36 +++++++++++++++++++++++++++++++++++ 6 files changed, 127 insertions(+), 23 deletions(-) create mode 100644 service/utils.go create mode 100644 service/utils_test.go create mode 100644 storage/execout/file_test.go diff --git a/go.mod b/go.mod index 72a37e599..fc45ace09 100644 --- a/go.mod +++ b/go.mod @@ -58,6 +58,7 @@ require ( github.com/streamingfast/sf-tracing v0.0.0-20240209202324-9daa52c71a52 github.com/streamingfast/shutter v1.5.0 github.com/streamingfast/substreams-sink-sql v1.0.1-0.20231127153906-acf5f3e34330 + github.com/test-go/testify v1.1.4 github.com/tetratelabs/wazero v1.1.0 github.com/tidwall/pretty v1.2.1 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 diff --git a/service/tier2.go b/service/tier2.go index a2be7ac8b..01a621d82 100644 --- a/service/tier2.go +++ b/service/tier2.go @@ -349,31 +349,23 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P } var streamErr error - if canSkipBlocks(existingExecOuts, modulesRequiredToRun, s.blockType) { - var referenceMapper *execout.File - for k, v := range existingExecOuts { - referenceMapper = v - logger.Info("running from mapper", zap.String("module", k)) - break + if canSkipBlockSource(existingExecOuts, modulesRequiredToRun, s.blockType) { + maxDistributorLength := int(request.StopBlockNum - requestDetails.ResolvedStartBlockNum) + var clocksDistributor map[uint64]*pbsubstreams.Clock + for _, execOutput := range existingExecOuts { + execOutput.ExtractClocks(clocksDistributor) + if len(clocksDistributor) >= maxDistributorLength { + break + } } + sortedClocksDistributor := sortClocksDistributor(clocksDistributor) ctx, span := reqctx.WithSpan(ctx, "substreams/tier2/pipeline/mapper_stream") - for _, v := range referenceMapper.SortedItems() { - if v.BlockNum < request.StartBlockNum || v.BlockNum >= request.StopBlockNum { + for _, clock := range sortedClocksDistributor { + if clock.Number < request.StartBlockNum || clock.Number >= request.StopBlockNum { panic("reading from mapper, block was out of range") // we don't want to have this case undetected } - clock := &pbsubstreams.Clock{ - Id: v.BlockId, - Number: v.BlockNum, - Timestamp: v.Timestamp, - } - - cursor := &bstream.Cursor{ - Step: bstream.StepNewIrreversible, - Block: bstream.NewBlockRef(v.BlockId, v.BlockNum), - LIB: bstream.NewBlockRef(v.BlockId, v.BlockNum), - HeadBlock: bstream.NewBlockRef(v.BlockId, v.BlockNum), - } + cursor := irreversibleCursorFromClock(clock) if err := pipe.ProcessFromExecOutput(ctx, clock, cursor); err != nil { span.EndWithErr(&err) @@ -419,7 +411,6 @@ func evaluateModulesRequiredToRun( execoutConfigs *execout.Configs, storeConfigs store.ConfigMap, ) (requiredModules map[string]*pbsubstreams.Module, existingExecOuts map[string]*execout.File, execoutWriters map[string]*execout.Writer, err error) { - existingExecOuts = make(map[string]*execout.File) requiredModules = make(map[string]*pbsubstreams.Module) execoutWriters = make(map[string]*execout.Writer) @@ -430,7 +421,6 @@ func evaluateModulesRequiredToRun( } runningLastStage := outputGraph.StagedUsedModules()[stage].IsLastStage() - for name, c := range execoutConfigs.ConfigMap { if _, found := usedModules[name]; !found { // skip modules that are only present in later stages continue @@ -490,7 +480,7 @@ func evaluateModulesRequiredToRun( } -func canSkipBlocks(existingExecOuts map[string]*execout.File, requiredModules map[string]*pbsubstreams.Module, blockType string) bool { +func canSkipBlockSource(existingExecOuts map[string]*execout.File, requiredModules map[string]*pbsubstreams.Module, blockType string) bool { if len(existingExecOuts) == 0 { return false } diff --git a/service/utils.go b/service/utils.go new file mode 100644 index 000000000..336f7b17e --- /dev/null +++ b/service/utils.go @@ -0,0 +1,27 @@ +package service + +import ( + "sort" + + "github.com/streamingfast/bstream" + pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1" +) + +func sortClocksDistributor(clockDistributor map[uint64]*pbsubstreams.Clock) (sortedClockDistributor []*pbsubstreams.Clock) { + sortedClockDistributor = make([]*pbsubstreams.Clock, 0, len(clockDistributor)) + for _, clock := range clockDistributor { + sortedClockDistributor = append(sortedClockDistributor, clock) + } + + sort.Slice(sortedClockDistributor, func(i, j int) bool { return sortedClockDistributor[i].Number < sortedClockDistributor[j].Number }) + return +} + +func irreversibleCursorFromClock(clock *pbsubstreams.Clock) *bstream.Cursor { + return &bstream.Cursor{ + Step: bstream.StepNewIrreversible, + Block: bstream.NewBlockRef(clock.Id, clock.Number), + LIB: bstream.NewBlockRef(clock.Id, clock.Number), + HeadBlock: bstream.NewBlockRef(clock.Id, clock.Number), + } +} diff --git a/service/utils_test.go b/service/utils_test.go new file mode 100644 index 000000000..cd22cb63e --- /dev/null +++ b/service/utils_test.go @@ -0,0 +1,37 @@ +package service + +import ( + "testing" + + pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1" + "github.com/test-go/testify/require" +) + +func TestSortClocksDistributor(t *testing.T) { + cases := []struct { + name string + clocksMap map[uint64]*pbsubstreams.Clock + expectedResult []*pbsubstreams.Clock + }{ + { + name: "sunny path", + clocksMap: map[uint64]*pbsubstreams.Clock{ + 2: {Number: 2, Id: "test2"}, + 3: {Number: 3, Id: "test3"}, + 1: {Number: 1, Id: "test1"}, + }, + expectedResult: []*pbsubstreams.Clock{ + {Number: 1, Id: "test1"}, + {Number: 2, Id: "test2"}, + {Number: 3, Id: "test3"}, + }, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + result := sortClocksDistributor(c.clocksMap) + require.Equal(t, c.expectedResult, result) + }) + } +} diff --git a/storage/execout/file.go b/storage/execout/file.go index ed22bb133..ef25208fe 100644 --- a/storage/execout/file.go +++ b/storage/execout/file.go @@ -51,6 +51,19 @@ func (c *File) SortedItems() (out []*pboutput.Item) { return } +func (c *File) ExtractClocks(clocksMap map[uint64]*pbsubstreams.Clock) { + for _, item := range c.kv { + if _, found := clocksMap[item.BlockNum]; !found { + clocksMap[item.BlockNum] = &pbsubstreams.Clock{ + Number: item.BlockNum, + Id: item.BlockId, + Timestamp: item.Timestamp, + } + } + } + return +} + func (c *File) SetItem(clock *pbsubstreams.Clock, data []byte) { c.Lock() defer c.Unlock() diff --git a/storage/execout/file_test.go b/storage/execout/file_test.go new file mode 100644 index 000000000..cd5ef2224 --- /dev/null +++ b/storage/execout/file_test.go @@ -0,0 +1,36 @@ +package execout + +import ( + "testing" + + pboutput "github.com/streamingfast/substreams/storage/execout/pb" + + pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1" + "github.com/stretchr/testify/require" +) + +func TestExtractClocks(t *testing.T) { + cases := []struct { + name string + file File + clocksDistributor map[uint64]*pbsubstreams.Clock + expectedResult map[uint64]*pbsubstreams.Clock + }{ + { + name: "sunny path", + file: File{ + ModuleName: "sunny_path", + kv: map[string]*pboutput.Item{"id1": {BlockNum: 1, BlockId: "1"}, "id2": {BlockNum: 2, BlockId: "3"}}, + }, + clocksDistributor: map[uint64]*pbsubstreams.Clock{}, + expectedResult: map[uint64]*pbsubstreams.Clock{1: {Number: 1, Id: "1"}, 2: {Number: 2, Id: "3"}}, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + c.file.ExtractClocks(c.clocksDistributor) + require.Equal(t, c.expectedResult, c.clocksDistributor) + }) + } +}