From 2901a033f6a7de212c335b82cbdf76ae90af73c8 Mon Sep 17 00:00:00 2001 From: Charles Billette Date: Thu, 14 Nov 2024 11:41:04 -0500 Subject: [PATCH] Refactor BlockPoller to support generics and client handling Refactor BlockPoller to support generic clients with type parameter 'C'. Updated associated methods, test functions, and interfaces to accommodate this change. This enhancement includes moving the retry mechanism and client handling to the poller, improving flexibility for different client types. --- blockpoller/fetcher.go | 6 +- blockpoller/init_test.go | 17 ++-- blockpoller/options.go | 18 ++-- blockpoller/poller.go | 122 +++++++++++++++++---------- blockpoller/poller_client_test.go | 133 ++++++++++++++++++++++++++++++ blockpoller/poller_test.go | 21 +++-- blockpoller/state_file.go | 29 +++++-- blockpoller/state_file_test.go | 13 ++- 8 files changed, 278 insertions(+), 81 deletions(-) create mode 100644 blockpoller/poller_client_test.go diff --git a/blockpoller/fetcher.go b/blockpoller/fetcher.go index 46d9c90..8027fe9 100644 --- a/blockpoller/fetcher.go +++ b/blockpoller/fetcher.go @@ -1,12 +1,10 @@ package blockpoller import ( - "context" - pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" ) -type BlockFetcher interface { +type BlockFetcher[C any] interface { IsBlockAvailable(requestedSlot uint64) bool - Fetch(ctx context.Context, blkNum uint64) (b *pbbstream.Block, skipped bool, err error) + Fetch(client C, blkNum uint64) (b *pbbstream.Block, skipped bool, err error) } diff --git a/blockpoller/init_test.go b/blockpoller/init_test.go index 68b9e9b..841c4f0 100644 --- a/blockpoller/init_test.go +++ b/blockpoller/init_test.go @@ -1,7 +1,6 @@ package blockpoller import ( - "context" "fmt" "testing" "time" @@ -27,31 +26,31 @@ type TestBlock struct { send *pbbstream.Block } -var _ BlockFetcher = &TestBlockFetcher{} +var _ BlockFetcher[any] = &TestBlockFetcher[any]{} -type TestBlockFetcher struct { +type TestBlockFetcher[C any] struct { t *testing.T blocks []*TestBlock idx uint64 completed bool } -func newTestBlockFetcher(t *testing.T, blocks []*TestBlock) *TestBlockFetcher { - return &TestBlockFetcher{ +func newTestBlockFetcher[C any](t *testing.T, blocks []*TestBlock) *TestBlockFetcher[C] { + return &TestBlockFetcher[C]{ t: t, blocks: blocks, } } -func (b *TestBlockFetcher) PollingInterval() time.Duration { +func (b *TestBlockFetcher[C]) PollingInterval() time.Duration { return 0 } -func (b *TestBlockFetcher) IsBlockAvailable(requestedSlot uint64) bool { +func (b *TestBlockFetcher[C]) IsBlockAvailable(requestedSlot uint64) bool { return true } -func (b *TestBlockFetcher) Fetch(_ context.Context, blkNum uint64) (*pbbstream.Block, bool, error) { +func (b *TestBlockFetcher[C]) Fetch(c C, blkNum uint64) (*pbbstream.Block, bool, error) { if len(b.blocks) == 0 { assert.Fail(b.t, fmt.Sprintf("should not have fetched block %d", blkNum)) } @@ -69,7 +68,7 @@ func (b *TestBlockFetcher) Fetch(_ context.Context, blkNum uint64) (*pbbstream.B return blkToSend, false, nil } -func (b *TestBlockFetcher) check(t *testing.T) { +func (b *TestBlockFetcher[C]) check(t *testing.T) { t.Helper() require.Equal(b.t, uint64(len(b.blocks)), b.idx, "we should have fetched all %d blocks, only fired %d blocks", len(b.blocks), b.idx) } diff --git a/blockpoller/options.go b/blockpoller/options.go index 057512d..c0d1253 100644 --- a/blockpoller/options.go +++ b/blockpoller/options.go @@ -2,30 +2,30 @@ package blockpoller import "go.uber.org/zap" -type Option func(*BlockPoller) +type Option[C any] func(*BlockPoller[C]) -func WithBlockFetchRetryCount(v uint64) Option { - return func(p *BlockPoller) { +func WithBlockFetchRetryCount[C any](v uint64) Option[C] { + return func(p *BlockPoller[C]) { p.fetchBlockRetryCount = v } } -func WithStoringState(stateStorePath string) Option { - return func(p *BlockPoller) { +func WithStoringState[C any](stateStorePath string) Option[C] { + return func(p *BlockPoller[C]) { p.stateStorePath = stateStorePath } } // IgnoreCursor ensures the poller will ignore the cursor and start from the startBlockNum // the cursor will still be saved as the poller progresses -func IgnoreCursor() Option { - return func(p *BlockPoller) { +func IgnoreCursor[C any]() Option[C] { + return func(p *BlockPoller[C]) { p.ignoreCursor = true } } -func WithLogger(logger *zap.Logger) Option { - return func(p *BlockPoller) { +func WithLogger[C any](logger *zap.Logger) Option[C] { + return func(p *BlockPoller[C]) { p.logger = logger } } diff --git a/blockpoller/poller.go b/blockpoller/poller.go index 070276f..ad6706e 100644 --- a/blockpoller/poller.go +++ b/blockpoller/poller.go @@ -8,12 +8,12 @@ import ( "time" "github.com/streamingfast/bstream" - "github.com/streamingfast/bstream/forkable" pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" "github.com/streamingfast/derr" "github.com/streamingfast/dhammer" "github.com/streamingfast/firehose-core/internal/utils" + "github.com/streamingfast/firehose-core/rpc" "github.com/streamingfast/shutter" "go.uber.org/zap" ) @@ -27,7 +27,7 @@ func newBlock(block2 *pbbstream.Block) *block { return &block{block2, false} } -type BlockPoller struct { +type BlockPoller[C any] struct { *shutter.Shutter startBlockNumGate uint64 fetchBlockRetryCount uint64 @@ -35,9 +35,11 @@ type BlockPoller struct { ignoreCursor bool forceFinalityAfterBlocks *uint64 - blockFetcher BlockFetcher + blockFetcher BlockFetcher[C] blockHandler BlockHandler - forkDB *forkable.ForkDB + clients *rpc.Clients[C] + + forkDB *forkable.ForkDB logger *zap.Logger @@ -47,16 +49,18 @@ type BlockPoller struct { optimisticallyPolledBlocksLock sync.Mutex } -func New( - blockFetcher BlockFetcher, +func New[C any]( + blockFetcher BlockFetcher[C], blockHandler BlockHandler, - opts ...Option, -) *BlockPoller { + clients *rpc.Clients[C], + opts ...Option[C], +) *BlockPoller[C] { - b := &BlockPoller{ + b := &BlockPoller[C]{ Shutter: shutter.New(), blockFetcher: blockFetcher, blockHandler: blockHandler, + clients: clients, fetchBlockRetryCount: math.MaxUint64, logger: zap.NewNop(), forceFinalityAfterBlocks: utils.GetEnvForceFinalityAfterBlocks(), @@ -69,7 +73,7 @@ func New( return b } -func (p *BlockPoller) Run(ctx context.Context, firstStreamableBlockNum uint64, blockFetchBatchSize int) error { +func (p *BlockPoller[C]) Run(firstStreamableBlockNum uint64, stopBlock *uint64, blockFetchBatchSize int) error { p.startBlockNumGate = firstStreamableBlockNum p.logger.Info("starting poller", zap.Uint64("first_streamable_block", firstStreamableBlockNum), @@ -83,14 +87,25 @@ func (p *BlockPoller) Run(ctx context.Context, firstStreamableBlockNum uint64, b } p.forkDB = forkDB - return p.run(resolvedStartBlock, blockFetchBatchSize) + resolveStopBlock := uint64(math.MaxUint64) + if stopBlock != nil { + resolveStopBlock = *stopBlock + } + + return p.run(resolvedStartBlock, resolveStopBlock, blockFetchBatchSize) } -func (p *BlockPoller) run(resolvedStartBlock bstream.BlockRef, blockFetchBatchSize int) (err error) { +func (p *BlockPoller[C]) run(resolvedStartBlock bstream.BlockRef, stopBlock uint64, blockFetchBatchSize int) (err error) { currentCursor := &cursor{state: ContinuousSegState, logger: p.logger} blockToFetch := resolvedStartBlock.Num() var hashToFetch *string for { + + if blockToFetch >= stopBlock { + p.logger.Info("stop block reach", zap.Uint64("stop_block", stopBlock)) + return nil + } + if p.IsTerminating() { p.logger.Info("block poller is terminating") } @@ -133,7 +148,7 @@ func (p *BlockPoller) run(resolvedStartBlock bstream.BlockRef, blockFetchBatchSi } } -func (p *BlockPoller) processBlock(currentState *cursor, block *pbbstream.Block) (uint64, *string, error) { +func (p *BlockPoller[C]) processBlock(currentState *cursor, block *pbbstream.Block) (uint64, *string, error) { p.logger.Info("processing block", zap.Stringer("block", block.AsRef()), zap.Uint64("lib_num", block.LibNum)) if block.Number < p.forkDB.LIBNum() { panic(fmt.Errorf("unexpected error block %d is below the current LIB num %d. There should be no re-org above the current LIB num", block.Number, p.forkDB.LIBNum())) @@ -189,31 +204,40 @@ type BlockItem struct { skipped bool } -func (p *BlockPoller) loadNextBlocks(requestedBlock uint64, numberOfBlockToFetch int) error { +func (p *BlockPoller[C]) loadNextBlocks(requestedBlock uint64, numberOfBlockToFetch int) error { p.optimisticallyPolledBlocks = map[uint64]*BlockItem{} p.fetching = true nailer := dhammer.NewNailer(10, func(ctx context.Context, blockToFetch uint64) (*BlockItem, error) { var blockItem *BlockItem err := derr.Retry(p.fetchBlockRetryCount, func(ctx context.Context) error { - b, skip, err := p.blockFetcher.Fetch(ctx, blockToFetch) - if err != nil { - return fmt.Errorf("unable to fetch block %d: %w", blockToFetch, err) - } - if skip { - blockItem = &BlockItem{ - blockNumber: blockToFetch, - block: nil, - skipped: true, + + bi, err := rpc.WithClients(p.clients, func(client C) (*BlockItem, error) { + b, skipped, err := p.blockFetcher.Fetch(client, blockToFetch) + if err != nil { + return nil, fmt.Errorf("fetching block %d: %w", blockToFetch, err) } - return nil - } - //todo: add block to cache - blockItem = &BlockItem{ - blockNumber: blockToFetch, - block: b, - skipped: false, + + if skipped { + return &BlockItem{ + blockNumber: blockToFetch, + block: nil, + skipped: true, + }, nil + } + + return &BlockItem{ + blockNumber: blockToFetch, + block: b, + skipped: false, + }, nil + }) + + if err != nil { + return fmt.Errorf("fetching block %d with retry : %w", blockToFetch, err) } + blockItem = bi + return nil }) @@ -272,7 +296,7 @@ func (p *BlockPoller) loadNextBlocks(requestedBlock uint64, numberOfBlockToFetch return nil } -func (p *BlockPoller) requestBlock(blockNumber uint64, numberOfBlockToFetch int) chan *BlockItem { +func (p *BlockPoller[C]) requestBlock(blockNumber uint64, numberOfBlockToFetch int) chan *BlockItem { p.logger.Info("requesting block", zap.Uint64("block_num", blockNumber)) requestedBlock := make(chan *BlockItem) @@ -314,7 +338,12 @@ func (p *BlockPoller) requestBlock(blockNumber uint64, numberOfBlockToFetch int) return requestedBlock } -func (p *BlockPoller) fetchBlockWithHash(blkNum uint64, hash string) (*pbbstream.Block, error) { +type FetchResponse struct { + Block *pbbstream.Block + Skipped bool +} + +func (p *BlockPoller[C]) fetchBlockWithHash(blkNum uint64, hash string) (*pbbstream.Block, error) { p.logger.Info("fetching block with hash", zap.Uint64("block_num", blkNum), zap.String("hash", hash)) _ = hash //todo: hash will be used to fetch block from cache @@ -322,16 +351,25 @@ func (p *BlockPoller) fetchBlockWithHash(blkNum uint64, hash string) (*pbbstream var out *pbbstream.Block var skipped bool + err := derr.Retry(p.fetchBlockRetryCount, func(ctx context.Context) error { - //todo: get block from cache - var fetchErr error - out, skipped, fetchErr = p.blockFetcher.Fetch(ctx, blkNum) - if fetchErr != nil { - return fmt.Errorf("unable to fetch block %d: %w", blkNum, fetchErr) - } - if skipped { - return nil + br, err := rpc.WithClients(p.clients, func(client C) (br *FetchResponse, err error) { + b, skipped, err := p.blockFetcher.Fetch(client, blkNum) + if err != nil { + return nil, fmt.Errorf("fetching block block %d: %w", blkNum, err) + } + return &FetchResponse{ + Block: b, + Skipped: skipped, + }, nil + }) + + if err != nil { + return fmt.Errorf("fetching block with retry %d: %w", blkNum, err) } + + out = br.Block + skipped = br.Skipped return nil }) @@ -350,7 +388,7 @@ func (p *BlockPoller) fetchBlockWithHash(blkNum uint64, hash string) (*pbbstream return out, nil } -func (p *BlockPoller) fireCompleteSegment(blocks []*forkable.Block) error { +func (p *BlockPoller[C]) fireCompleteSegment(blocks []*forkable.Block) error { for _, blk := range blocks { b := blk.Object.(*block) if _, err := p.fire(b); err != nil { @@ -360,7 +398,7 @@ func (p *BlockPoller) fireCompleteSegment(blocks []*forkable.Block) error { return nil } -func (p *BlockPoller) fire(blk *block) (bool, error) { +func (p *BlockPoller[C]) fire(blk *block) (bool, error) { if blk.fired { return false, nil } diff --git a/blockpoller/poller_client_test.go b/blockpoller/poller_client_test.go new file mode 100644 index 0000000..f9b2ee4 --- /dev/null +++ b/blockpoller/poller_client_test.go @@ -0,0 +1,133 @@ +package blockpoller + +import ( + "fmt" + "testing" + + pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" + "github.com/streamingfast/firehose-core/rpc" + "github.com/stretchr/testify/require" +) + +type TestBlockItem struct { + skipped bool + err error + block *pbbstream.Block +} +type TestBlockClient struct { + currentIndex int + blockItems []*TestBlockItem + + blockProduceCount int + skippedCount int + errProduceCount int + name string +} + +func NewTestBlockClient(blockItems []*TestBlockItem, name string) *TestBlockClient { + return &TestBlockClient{ + blockItems: blockItems, + name: name, + } +} + +func (c *TestBlockClient) GetBlock(blockNumber uint64) (*TestBlockItem, error) { + fmt.Printf("%s: GetBlock %d\n", c.name, blockNumber) + if c.EOB() { + fmt.Println("TestBlockClient: EOB", blockNumber) + return nil, TestErrCompleteDone + } + b := c.blockItems[c.currentIndex] + if b.block.Number != blockNumber { + panic(fmt.Sprintf("%s expected requested block %d, got %d", c.name, b.block.Number, blockNumber)) + } + + c.currentIndex++ + + if b.err != nil { + fmt.Printf("%s: error producing block %d\n", c.name, blockNumber) + c.errProduceCount++ + return nil, b.err + } + + if b.skipped { + fmt.Printf("%s: skipped producing block %d\n", c.name, blockNumber) + c.skippedCount++ + return b, nil + } + + c.blockProduceCount++ + + return b, nil +} + +func (c *TestBlockClient) EOB() bool { + return c.currentIndex >= len(c.blockItems) +} + +type TestBlockFetcherWithClient struct { +} + +func (t TestBlockFetcherWithClient) IsBlockAvailable(requestedSlot uint64) bool { + return true +} + +func (t TestBlockFetcherWithClient) Fetch(client *TestBlockClient, blkNum uint64) (b *pbbstream.Block, skipped bool, err error) { + bi, err := client.GetBlock(blkNum) + if err != nil { + return nil, false, err + } + if bi.skipped { + return nil, true, nil + } + return bi.block, false, nil +} + +func TestPollerClient(t *testing.T) { + clients := rpc.NewClients[*TestBlockClient]() + var blockItems1 []*TestBlockItem + var blockItems2 []*TestBlockItem + + //init call + blockItems1 = append(blockItems1, &TestBlockItem{block: blk("99a", "98a", 97)}) + //1st fetch block + blockItems1 = append(blockItems1, &TestBlockItem{block: blk("99a", "98a", 97)}) + + //c1 will produce an error that c2 will be call and return requested block without error + blockItems1 = append(blockItems1, &TestBlockItem{block: blk("100a", "99a", 98), err: fmt.Errorf("test error")}) + blockItems2 = append(blockItems2, &TestBlockItem{block: blk("100a", "99a", 98)}) + + //c1 and c2 will produce errors the c1 will be call and return requested block without error + blockItems1 = append(blockItems1, &TestBlockItem{block: blk("101a", "100a", 100), err: fmt.Errorf("test error")}) + blockItems2 = append(blockItems2, &TestBlockItem{block: blk("101a", "100a", 100), err: fmt.Errorf("test error")}) + blockItems1 = append(blockItems1, &TestBlockItem{block: blk("101a", "100a", 100)}) + + //test skip block + blockItems1 = append(blockItems1, &TestBlockItem{block: blk("102a", "101a", 101)}) + blockItems1 = append(blockItems1, &TestBlockItem{block: blk("103a", "101a", 101), skipped: true}) + blockItems1 = append(blockItems1, &TestBlockItem{block: blk("104a", "102a", 102)}) + + c1 := NewTestBlockClient(blockItems1, "c1") + c2 := NewTestBlockClient(blockItems2, "c2") + + clients.Add(c1) + clients.Add(c2) + + fetcher := TestBlockFetcherWithClient{} + handler := &TestNoopBlockFinalizer{} + poller := New(fetcher, handler, clients) + + stopBlock := uint64(104) + err := poller.Run(99, &stopBlock, 1) + + require.NoError(t, err) + + require.Equal(t, 5, c1.blockProduceCount) + require.Equal(t, 1, c1.skippedCount) + require.Equal(t, 2, c1.errProduceCount) + + require.Equal(t, 1, c2.blockProduceCount) + require.Equal(t, 0, c2.skippedCount) + require.Equal(t, 1, c2.errProduceCount) + +} diff --git a/blockpoller/poller_test.go b/blockpoller/poller_test.go index 5821588..8e2467a 100644 --- a/blockpoller/poller_test.go +++ b/blockpoller/poller_test.go @@ -1,12 +1,13 @@ package blockpoller import ( - "context" "errors" "fmt" "strconv" "testing" + "github.com/streamingfast/firehose-core/rpc" + "github.com/streamingfast/bstream" "github.com/streamingfast/bstream/forkable" pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" @@ -18,12 +19,14 @@ func TestForkHandler_run(t *testing.T) { tests := []struct { name string firstStreamableBlock bstream.BlockRef + stopAtBlock uint64 blocks []*TestBlock expectFireBlock []*pbbstream.Block }{ { name: "start block 0", firstStreamableBlock: blk("0a", "", 0).AsRef(), + stopAtBlock: uint64(3), blocks: []*TestBlock{ tb("0a", "", 0), //init the state fetch tb("0a", "", 0), @@ -39,6 +42,7 @@ func TestForkHandler_run(t *testing.T) { { name: "Fork 1", firstStreamableBlock: blk("100a", "99a", 100).AsRef(), + stopAtBlock: uint64(108), blocks: []*TestBlock{ tb("100a", "99a", 100), //init the state fetch tb("100a", "99a", 100), @@ -53,6 +57,7 @@ func TestForkHandler_run(t *testing.T) { tb("102b", "101a", 100), tb("106a", "105a", 100), tb("105a", "104a", 100), + tb("107a", "106a", 100), }, expectFireBlock: []*pbbstream.Block{ blk("100a", "99a", 100), @@ -71,6 +76,7 @@ func TestForkHandler_run(t *testing.T) { { name: "Fork 2", firstStreamableBlock: blk("100a", "99a", 100).AsRef(), + stopAtBlock: uint64(106), blocks: []*TestBlock{ tb("100a", "99a", 100), //init the state fetch tb("100a", "99a", 100), @@ -99,6 +105,7 @@ func TestForkHandler_run(t *testing.T) { { name: "with lib advancing", firstStreamableBlock: blk("100a", "99a", 100).AsRef(), + stopAtBlock: uint64(106), blocks: []*TestBlock{ tb("100a", "99a", 100), //init the state fetch tb("100a", "99a", 100), @@ -127,6 +134,7 @@ func TestForkHandler_run(t *testing.T) { { name: "with skipping blocks", firstStreamableBlock: blk("100a", "99a", 100).AsRef(), + stopAtBlock: uint64(106), blocks: []*TestBlock{ tb("100a", "99a", 100), //init the state fetch tb("100a", "99a", 100), @@ -156,14 +164,17 @@ func TestForkHandler_run(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - blockFetcher := newTestBlockFetcher(t, tt.blocks) + blockFetcher := newTestBlockFetcher[any](t, tt.blocks) blockFinalizer := newTestBlockFinalizer(t, tt.expectFireBlock) - poller := New(blockFetcher, blockFinalizer) + clients := rpc.NewClients[any]() + clients.Add(new(any)) + + poller := New(blockFetcher, blockFinalizer, clients) poller.fetchBlockRetryCount = 0 poller.forkDB = forkable.NewForkDB() - err := poller.Run(context.Background(), tt.firstStreamableBlock.Num(), 1) + err := poller.Run(tt.firstStreamableBlock.Num(), &tt.stopAtBlock, 1) if !errors.Is(err, TestErrCompleteDone) { require.NoError(t, err) } @@ -210,7 +221,7 @@ func TestForkHandler_fire(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - poller := &BlockPoller{startBlockNumGate: test.startBlockNum, blockHandler: &TestNoopBlockFinalizer{}} + poller := &BlockPoller[any]{startBlockNumGate: test.startBlockNum, blockHandler: &TestNoopBlockFinalizer{}} ok, err := poller.fire(test.block) require.NoError(t, err) assert.Equal(t, test.expect, ok) diff --git a/blockpoller/state_file.go b/blockpoller/state_file.go index 44320db..4603fcf 100644 --- a/blockpoller/state_file.go +++ b/blockpoller/state_file.go @@ -1,12 +1,13 @@ package blockpoller import ( - "context" "encoding/json" "fmt" "os" "path/filepath" + "github.com/streamingfast/firehose-core/rpc" + "github.com/streamingfast/bstream" "github.com/streamingfast/bstream/forkable" pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" @@ -33,7 +34,7 @@ type stateFile struct { Blocks []blockRefWithPrev } -func (p *BlockPoller) isStateFileExist(stateStorePath string) bool { +func (p *BlockPoller[C]) isStateFileExist(stateStorePath string) bool { if stateStorePath == "" { p.logger.Info("No state store path set, skipping cursor check") return false @@ -67,7 +68,7 @@ func getState(stateStorePath string) (*stateFile, error) { return &sf, nil } -func (p *BlockPoller) saveState(blocks []*forkable.Block) error { +func (p *BlockPoller[C]) saveState(blocks []*forkable.Block) error { p.logger.Debug("saving cursor", zap.String("state_store_path", p.stateStorePath)) if p.stateStorePath == "" { return nil @@ -108,23 +109,35 @@ func (p *BlockPoller) saveState(blocks []*forkable.Block) error { return nil } -func (p *BlockPoller) initState(firstStreamableBlockNum uint64, stateStorePath string, ignoreCursor bool, logger *zap.Logger) (*forkable.ForkDB, bstream.BlockRef, error) { +func (p *BlockPoller[C]) initState(firstStreamableBlockNum uint64, stateStorePath string, ignoreCursor bool, logger *zap.Logger) (*forkable.ForkDB, bstream.BlockRef, error) { forkDB := forkable.NewForkDB(forkable.ForkDBWithLogger(logger)) if ignoreCursor || !p.isStateFileExist(stateStorePath) { logger.Info("ignoring cursor, fetching first streamable block", zap.Uint64("first_streamable_block", firstStreamableBlockNum)) for { - firstStreamableBlock, skip, err := p.blockFetcher.Fetch(context.Background(), firstStreamableBlockNum) - firstStreamableBlockRef := firstStreamableBlock.AsRef() + br, err := rpc.WithClients(p.clients, func(client C) (*FetchResponse, error) { + firstStreamableBlock, skip, err := p.blockFetcher.Fetch(client, firstStreamableBlockNum) + if err != nil { + return nil, fmt.Errorf("fetching first streamable block: %w", err) + } + if skip { + return nil, fmt.Errorf("expecting first streamable block %q not to be skipped", firstStreamableBlock.AsRef()) + } + return &FetchResponse{ + Block: firstStreamableBlock, + }, nil + }) if err != nil { p.logger.Warn("fetching first streamable block", zap.Uint64("first_streamable_block", firstStreamableBlockNum), zap.Error(err)) continue } - if skip { - return nil, nil, fmt.Errorf("expecting first streamable block %q not to be skiped", firstStreamableBlockRef) + if br.Skipped { + return nil, nil, fmt.Errorf("expecting first streamable block %q not to be skiped", firstStreamableBlockNum) } + firstStreamableBlockRef := br.Block.AsRef() + logger.Info("ignoring cursor, will start from...", zap.Stringer("first_streamable_block", firstStreamableBlockRef), zap.Stringer("lib", firstStreamableBlockRef), diff --git a/blockpoller/state_file_test.go b/blockpoller/state_file_test.go index 266d190..2a85523 100644 --- a/blockpoller/state_file_test.go +++ b/blockpoller/state_file_test.go @@ -7,6 +7,7 @@ import ( "github.com/streamingfast/bstream" "github.com/streamingfast/bstream/forkable" + "github.com/streamingfast/firehose-core/rpc" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -42,9 +43,9 @@ func TestFireBlockFinalizer_state(t *testing.T) { expectedStateFileCnt := `{"Lib":{"id":"101a","num":101},"LastFiredBlock":{"id":"105a","num":105,"previous_ref_id":"104a"},"Blocks":[{"id":"101a","num":101,"previous_ref_id":"100a"},{"id":"102a","num":102,"previous_ref_id":"101a"},{"id":"103a","num":103,"previous_ref_id":"102a"},{"id":"104a","num":104,"previous_ref_id":"103a"},{"id":"105a","num":105,"previous_ref_id":"104a"}]}` - blockFetcher := newTestBlockFetcher(t, []*TestBlock{tb("60a", "59a", 60)}) + blockFetcher := newTestBlockFetcher[any](t, []*TestBlock{tb("60a", "59a", 60)}) - poller := &BlockPoller{ + poller := &BlockPoller[any]{ stateStorePath: dirName, forkDB: fk, logger: zap.NewNop(), @@ -73,11 +74,15 @@ func TestFireBlockFinalizer_noSstate(t *testing.T) { require.NoError(t, err) defer os.Remove(dirName) - blockFetcher := newTestBlockFetcher(t, []*TestBlock{tb("60a", "59a", 60)}) - poller := &BlockPoller{ + blockFetcher := newTestBlockFetcher[any](t, []*TestBlock{tb("60a", "59a", 60)}) + clients := rpc.NewClients[any]() + clients.Add(new(any)) + + poller := &BlockPoller[any]{ stateStorePath: dirName, logger: zap.NewNop(), blockFetcher: blockFetcher, + clients: clients, } forkDB, startBlock, err := poller.initState(60, dirName, false, zap.NewNop())