Skip to content

Commit

Permalink
clock distributor implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
ArnaudBger committed Mar 26, 2024
1 parent 615d0fe commit f32ee4d
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 23 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ require (
github.com/streamingfast/sf-tracing v0.0.0-20240209202324-9daa52c71a52
github.com/streamingfast/shutter v1.5.0
github.com/streamingfast/substreams-sink-sql v1.0.1-0.20231127153906-acf5f3e34330
github.com/test-go/testify v1.1.4
github.com/tetratelabs/wazero v1.1.0
github.com/tidwall/pretty v1.2.1
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0
Expand Down
36 changes: 13 additions & 23 deletions service/tier2.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,31 +349,23 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P
}

var streamErr error
if canSkipBlocks(existingExecOuts, modulesRequiredToRun, s.blockType) {
var referenceMapper *execout.File
for k, v := range existingExecOuts {
referenceMapper = v
logger.Info("running from mapper", zap.String("module", k))
break
if canSkipBlockSource(existingExecOuts, modulesRequiredToRun, s.blockType) {
maxDistributorLength := int(request.StopBlockNum - requestDetails.ResolvedStartBlockNum)
var clocksDistributor map[uint64]*pbsubstreams.Clock
for _, execOutput := range existingExecOuts {
execOutput.ExtractClocks(clocksDistributor)
if len(clocksDistributor) >= maxDistributorLength {
break
}
}

sortedClocksDistributor := sortClocksDistributor(clocksDistributor)
ctx, span := reqctx.WithSpan(ctx, "substreams/tier2/pipeline/mapper_stream")
for _, v := range referenceMapper.SortedItems() {
if v.BlockNum < request.StartBlockNum || v.BlockNum >= request.StopBlockNum {
for _, clock := range sortedClocksDistributor {
if clock.Number < request.StartBlockNum || clock.Number >= request.StopBlockNum {
panic("reading from mapper, block was out of range") // we don't want to have this case undetected
}
clock := &pbsubstreams.Clock{
Id: v.BlockId,
Number: v.BlockNum,
Timestamp: v.Timestamp,
}

cursor := &bstream.Cursor{
Step: bstream.StepNewIrreversible,
Block: bstream.NewBlockRef(v.BlockId, v.BlockNum),
LIB: bstream.NewBlockRef(v.BlockId, v.BlockNum),
HeadBlock: bstream.NewBlockRef(v.BlockId, v.BlockNum),
}
cursor := irreversibleCursorFromClock(clock)

if err := pipe.ProcessFromExecOutput(ctx, clock, cursor); err != nil {
span.EndWithErr(&err)
Expand Down Expand Up @@ -419,7 +411,6 @@ func evaluateModulesRequiredToRun(
execoutConfigs *execout.Configs,
storeConfigs store.ConfigMap,
) (requiredModules map[string]*pbsubstreams.Module, existingExecOuts map[string]*execout.File, execoutWriters map[string]*execout.Writer, err error) {

existingExecOuts = make(map[string]*execout.File)
requiredModules = make(map[string]*pbsubstreams.Module)
execoutWriters = make(map[string]*execout.Writer)
Expand All @@ -430,7 +421,6 @@ func evaluateModulesRequiredToRun(
}

runningLastStage := outputGraph.StagedUsedModules()[stage].IsLastStage()

for name, c := range execoutConfigs.ConfigMap {
if _, found := usedModules[name]; !found { // skip modules that are only present in later stages
continue
Expand Down Expand Up @@ -490,7 +480,7 @@ func evaluateModulesRequiredToRun(

}

func canSkipBlocks(existingExecOuts map[string]*execout.File, requiredModules map[string]*pbsubstreams.Module, blockType string) bool {
func canSkipBlockSource(existingExecOuts map[string]*execout.File, requiredModules map[string]*pbsubstreams.Module, blockType string) bool {
if len(existingExecOuts) == 0 {
return false
}
Expand Down
27 changes: 27 additions & 0 deletions service/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package service

import (
"sort"

"github.com/streamingfast/bstream"
pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1"
)

func sortClocksDistributor(clockDistributor map[uint64]*pbsubstreams.Clock) (sortedClockDistributor []*pbsubstreams.Clock) {
sortedClockDistributor = make([]*pbsubstreams.Clock, 0, len(clockDistributor))
for _, clock := range clockDistributor {
sortedClockDistributor = append(sortedClockDistributor, clock)
}

sort.Slice(sortedClockDistributor, func(i, j int) bool { return sortedClockDistributor[i].Number < sortedClockDistributor[j].Number })
return
}

func irreversibleCursorFromClock(clock *pbsubstreams.Clock) *bstream.Cursor {
return &bstream.Cursor{
Step: bstream.StepNewIrreversible,
Block: bstream.NewBlockRef(clock.Id, clock.Number),
LIB: bstream.NewBlockRef(clock.Id, clock.Number),
HeadBlock: bstream.NewBlockRef(clock.Id, clock.Number),
}
}
37 changes: 37 additions & 0 deletions service/utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package service

import (
"testing"

pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1"
"github.com/test-go/testify/require"
)

func TestSortClocksDistributor(t *testing.T) {
cases := []struct {
name string
clocksMap map[uint64]*pbsubstreams.Clock
expectedResult []*pbsubstreams.Clock
}{
{
name: "sunny path",
clocksMap: map[uint64]*pbsubstreams.Clock{
2: {Number: 2, Id: "test2"},
3: {Number: 3, Id: "test3"},
1: {Number: 1, Id: "test1"},
},
expectedResult: []*pbsubstreams.Clock{
{Number: 1, Id: "test1"},
{Number: 2, Id: "test2"},
{Number: 3, Id: "test3"},
},
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
result := sortClocksDistributor(c.clocksMap)
require.Equal(t, c.expectedResult, result)
})
}
}
13 changes: 13 additions & 0 deletions storage/execout/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,19 @@ func (c *File) SortedItems() (out []*pboutput.Item) {
return
}

func (c *File) ExtractClocks(clocksMap map[uint64]*pbsubstreams.Clock) {
for _, item := range c.kv {
if _, found := clocksMap[item.BlockNum]; !found {
clocksMap[item.BlockNum] = &pbsubstreams.Clock{
Number: item.BlockNum,
Id: item.BlockId,
Timestamp: item.Timestamp,
}
}
}
return
}

func (c *File) SetItem(clock *pbsubstreams.Clock, data []byte) {
c.Lock()
defer c.Unlock()
Expand Down
36 changes: 36 additions & 0 deletions storage/execout/file_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package execout

import (
"testing"

pboutput "github.com/streamingfast/substreams/storage/execout/pb"

pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1"
"github.com/stretchr/testify/require"
)

func TestExtractClocks(t *testing.T) {
cases := []struct {
name string
file File
clocksDistributor map[uint64]*pbsubstreams.Clock
expectedResult map[uint64]*pbsubstreams.Clock
}{
{
name: "sunny path",
file: File{
ModuleName: "sunny_path",
kv: map[string]*pboutput.Item{"id1": {BlockNum: 1, BlockId: "1"}, "id2": {BlockNum: 2, BlockId: "3"}},
},
clocksDistributor: map[uint64]*pbsubstreams.Clock{},
expectedResult: map[uint64]*pbsubstreams.Clock{1: {Number: 1, Id: "1"}, 2: {Number: 2, Id: "3"}},
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
c.file.ExtractClocks(c.clocksDistributor)
require.Equal(t, c.expectedResult, c.clocksDistributor)
})
}
}

0 comments on commit f32ee4d

Please sign in to comment.