Skip to content

Commit

Permalink
Feature/live back filling (#492)
Browse files Browse the repository at this point in the history
Add a substreams `live back filler` in production-mode so the tier2 create the caches when we are running on live segment. These requests are not metered.
  • Loading branch information
ArnaudBger authored Jun 17, 2024
1 parent b448efe commit 5a1890c
Show file tree
Hide file tree
Showing 7 changed files with 361 additions and 10 deletions.
3 changes: 3 additions & 0 deletions docs/release-notes/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## Unreleased

* Add a substreams `live back filler` once substreams tier1 is requested in `production mode`.
It enables, create `substreams cache` when block are processed live on tier1.

### Remote Code Generation

The `substreams init` command now fetches a list of available 'code generators' to "https://codegen.substreams.dev".
Expand Down
1 change: 1 addition & 0 deletions pipeline/exec/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func (g *Graph) ModuleHashes() *manifest.ModuleHashes { return g.moduleHashes }
func (g *Graph) LowestInitBlock() uint64 { return g.lowestInitBlock }
func (g *Graph) LowestStoresInitBlock() *uint64 { return g.lowestStoresInitBlock }
func (g *Graph) ModulesInitBlocks() map[string]uint64 { return g.modulesInitBlocks }
func (g *Graph) OutputModuleStageIndex() int { return len(g.stagedUsedModules) - 1 }

func NewOutputModuleGraph(outputModule string, productionMode bool, modules *pbsubstreams.Modules) (out *Graph, err error) {
out = &Graph{
Expand Down
11 changes: 7 additions & 4 deletions service/config/runtimeconfig.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"github.com/streamingfast/substreams/client"
"github.com/streamingfast/substreams/orchestrator/work"

"github.com/streamingfast/dstore"
Expand All @@ -15,10 +16,10 @@ type RuntimeConfig struct {
DefaultParallelSubrequests uint64 // how many sub-jobs to launch for a given user
// derives substores `states/`, for `store` modules snapshots (full and partial)
// and `outputs/` for execution output of both `map` and `store` module kinds
BaseObjectStore dstore.Store
DefaultCacheTag string // appended to BaseObjectStore unless overriden by auth layer
WorkerFactory work.WorkerFactory

BaseObjectStore dstore.Store
DefaultCacheTag string // appended to BaseObjectStore unless overriden by auth layer
WorkerFactory work.WorkerFactory
ClientFactory client.InternalClientFactory
ModuleExecutionTracing bool
}

Expand All @@ -29,6 +30,7 @@ func NewTier1RuntimeConfig(
baseObjectStore dstore.Store,
defaultCacheTag string,
workerFactory work.WorkerFactory,
clientFactory client.InternalClientFactory,
) RuntimeConfig {
return RuntimeConfig{
SegmentSize: segmentSize,
Expand All @@ -37,6 +39,7 @@ func NewTier1RuntimeConfig(
BaseObjectStore: baseObjectStore,
DefaultCacheTag: defaultCacheTag,
WorkerFactory: workerFactory,
ClientFactory: clientFactory,
// overridden by Tier Options
ModuleExecutionTracing: false,
}
Expand Down
157 changes: 157 additions & 0 deletions service/live_back_filler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package service

import (
"context"
"fmt"
"io"

"github.com/streamingfast/derr"
"github.com/streamingfast/substreams/block"
"github.com/streamingfast/substreams/orchestrator/work"
"github.com/streamingfast/substreams/reqctx"

"github.com/streamingfast/bstream"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/substreams/client"
pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2"
"go.uber.org/zap"
)

const finalBlockDelay = 120
const backfillRetries = 999 // no point in failing "early". It may be failing because merged blocks are lagging behind a little bit.

type RequestBackProcessingFunc = func(ctx context.Context, logger *zap.Logger, blockRange *block.Range, stageToProcess int, clientFactory client.InternalClientFactory, jobCompleted chan error)

type LiveBackFiller struct {
RequestBackProcessing RequestBackProcessingFunc
NextHandler bstream.Handler
irreversibleBlock chan uint64
currentSegment uint64
segmentSize uint64
logger *zap.Logger
stageToProcess int
clientFactory client.InternalClientFactory
}

func NewLiveBackFiller(nextHandler bstream.Handler, logger *zap.Logger, stageToProcess int, segmentSize uint64, linearHandoff uint64, clientFactory client.InternalClientFactory, requestBackProcessing RequestBackProcessingFunc) *LiveBackFiller {
return &LiveBackFiller{
RequestBackProcessing: requestBackProcessing,
stageToProcess: stageToProcess,
NextHandler: nextHandler,
irreversibleBlock: make(chan uint64),
currentSegment: linearHandoff / segmentSize,
segmentSize: segmentSize,
logger: logger,
clientFactory: clientFactory,
}
}

func (l *LiveBackFiller) ProcessBlock(blk *pbbstream.Block, obj interface{}) (err error) {
step := obj.(bstream.Stepable).Step()
if !(step.Matches(bstream.StepIrreversible)) {
return l.NextHandler.ProcessBlock(blk, obj)
}

l.irreversibleBlock <- blk.Number

return l.NextHandler.ProcessBlock(blk, obj)
}

func RequestBackProcessing(ctx context.Context, logger *zap.Logger, blockRange *block.Range, stageToProcess int, clientFactory client.InternalClientFactory, jobResult chan error) {
liveBackFillerRequest := work.NewRequest(ctx, reqctx.Details(ctx), stageToProcess, blockRange)

err := derr.RetryContext(ctx, backfillRetries, func(ctx context.Context) error {
err := requestBackProcessing(ctx, logger, liveBackFillerRequest, clientFactory)
if err != nil {
logger.Debug("retryable error while live backprocessing", zap.Error(err))
return err
}

return nil
})

jobResult <- err
}

func requestBackProcessing(ctx context.Context, logger *zap.Logger, liveCachingRequest *pbssinternal.ProcessRangeRequest, clientFactory client.InternalClientFactory) error {
zlog.Debug("request live back filling", zap.Uint64("start_block", liveCachingRequest.StartBlock()), zap.Uint64("end_block", liveCachingRequest.StopBlock()))

grpcClient, closeFunc, grpcCallOpts, _, err := clientFactory()
if err != nil {
return fmt.Errorf("failed to create live cache grpc client: %w", err)
}

stream, err := grpcClient.ProcessRange(ctx, liveCachingRequest, grpcCallOpts...)
if err != nil {
return fmt.Errorf("getting stream: %w", err)
}

for {
_, err = stream.Recv()
if err != nil {
if err == io.EOF {
break
}
}
}

defer func() {
if err = stream.CloseSend(); err != nil {
logger.Warn("closing stream", zap.Error(err))
}
if err = closeFunc(); err != nil {
logger.Warn("closing stream", zap.Error(err))
}
}()

closeFunc()

return nil
}

func (l *LiveBackFiller) Start(ctx context.Context) {
l.logger.Info("start live back filler", zap.Uint64("current_segment", l.currentSegment))

var targetSegment uint64
var jobFailed bool
var jobProcessing bool
var blockNumber uint64
jobResult := make(chan error)
for {
select {
case <-ctx.Done():
return
case err := <-jobResult:
if err != nil {
l.logger.Warn("job failed while processing live caching", zap.Error(err), zap.Uint64("segment_processed", l.currentSegment))
jobFailed = true
break
}
jobProcessing = false
l.currentSegment++
case blockNumber = <-l.irreversibleBlock:
targetSegment = blockNumber / l.segmentSize
}

if jobFailed {
// We don't want to run more jobs if one has failed permanently
continue
}

if jobProcessing {
continue
}

segmentStart := l.currentSegment * l.segmentSize
segmentEnd := (l.currentSegment + 1) * l.segmentSize
mergedBlockIsWritten := (blockNumber - segmentEnd) > finalBlockDelay

if (targetSegment > l.currentSegment) && mergedBlockIsWritten {

liveBackFillerRange := block.NewRange(segmentStart, segmentEnd)

jobProcessing = true
go l.RequestBackProcessing(ctx, l.logger, liveBackFillerRange, l.stageToProcess, l.clientFactory, jobResult)
}
}
}
167 changes: 167 additions & 0 deletions service/live_back_filler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package service

import (
"context"
"fmt"
"testing"
"time"

"github.com/streamingfast/substreams/block"

"github.com/streamingfast/bstream"

pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/test-go/testify/require"

"github.com/streamingfast/substreams/client"
"go.uber.org/zap"
)

func TestBackFiller(t *testing.T) {
cases := []struct {
name string
segmentSize uint64
startRange uint64
endRange uint64
linearHandoff uint64
stageToProcess int
errorBackProcessing bool
expectedSegmentProcessed []uint64
}{
// In those cases, from startRange to endRange, blocks are processed using a testSource.
// In the back filler, once a block is processed above next current segment + 120, the current segment should be requested to tier2 (be cached).
// That's why in the `sunny path` test case, only the segment 11 is processed. From linearHandOff=100, the first request is sent to tier2,
// once block 231 > 11 (next segment) * 10 (segment size) + 120 (finalBlockDelay) is processed.

{
name: "sunny path",
segmentSize: 10,
startRange: 101,
endRange: 231,
linearHandoff: 100,
errorBackProcessing: false,
expectedSegmentProcessed: []uint64{11},
},
{
name: "with job failing",
segmentSize: 10,
startRange: 101,
endRange: 231,
linearHandoff: 100,
errorBackProcessing: true,
expectedSegmentProcessed: []uint64{11},
},

{
name: "processing multiple segments",
segmentSize: 10,
startRange: 101,
endRange: 261,
linearHandoff: 100,
errorBackProcessing: false,
expectedSegmentProcessed: []uint64{11, 12, 13, 14},
},

{
name: "big segment size",
segmentSize: 1000,
startRange: 101,
endRange: 2021,
linearHandoff: 100,
errorBackProcessing: false,
expectedSegmentProcessed: []uint64{1},
},

{
name: "multiple big segment size",
segmentSize: 1000,
startRange: 101,
endRange: 4023,
linearHandoff: 100,
errorBackProcessing: false,
expectedSegmentProcessed: []uint64{1, 2, 3},
},
}

testContext := context.Background()
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
testHandler := &testNextHandler{}
testLogger := zap.NewNop()
segmentProcessed := make(chan uint64)

RequestBackProcessingTest := func(ctx context.Context, logger *zap.Logger, blockRange *block.Range, stageToProcess int, clientFactory client.InternalClientFactory, jobResult chan error) {
var err error
if c.errorBackProcessing {
err = fmt.Errorf("fail")
}

segmentNumber := blockRange.ExclusiveEndBlock / c.segmentSize
segmentProcessed <- segmentNumber

jobResult <- err
}

testLiveBackFiller := NewLiveBackFiller(testHandler, testLogger, c.stageToProcess, c.segmentSize, c.linearHandoff, nil, RequestBackProcessingTest)

go testLiveBackFiller.Start(testContext)

testSource := bstream.NewTestSource(testLiveBackFiller)

go testSource.Run()

//Start from fromBlocks, to toBlocks
for currentBlockNum := c.startRange; currentBlockNum <= c.endRange; currentBlockNum++ {
fmt.Println("pushing block", currentBlockNum)
block := &pbbstream.Block{
Number: currentBlockNum}
obj := &testObject{step: bstream.StepIrreversible}
err := testSource.Push(block, obj)
require.NoError(t, err)
}

done := make(chan struct{})
receivedSegmentProcessed := make([]uint64, 0)
go func() {
for process := range segmentProcessed {
receivedSegmentProcessed = append(receivedSegmentProcessed, process)
if len(receivedSegmentProcessed) == len(c.expectedSegmentProcessed) {
close(done)
return
}
}
panic("should not reach here")
}()

select {
case <-done:
case <-time.After(1 * time.Second):
fmt.Println("timeout")
t.Fail()
}
require.Equal(t, c.expectedSegmentProcessed, receivedSegmentProcessed)
})
}
}

type testObject struct {
step bstream.StepType
}

func (t *testObject) Step() bstream.StepType {
return t.step
}
func (t *testObject) FinalBlockHeight() uint64 {
return 0
}

func (t *testObject) ReorgJunctionBlock() bstream.BlockRef {
return nil
}

type testNextHandler struct {
}

func (t *testNextHandler) ProcessBlock(blk *pbbstream.Block, obj interface{}) (err error) {
return nil
}
Loading

0 comments on commit 5a1890c

Please sign in to comment.