diff --git a/blockpoller/fetcher.go b/blockpoller/fetcher.go index 46d9c90..8027fe9 100644 --- a/blockpoller/fetcher.go +++ b/blockpoller/fetcher.go @@ -1,12 +1,10 @@ package blockpoller import ( - "context" - pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" ) -type BlockFetcher interface { +type BlockFetcher[C any] interface { IsBlockAvailable(requestedSlot uint64) bool - Fetch(ctx context.Context, blkNum uint64) (b *pbbstream.Block, skipped bool, err error) + Fetch(client C, blkNum uint64) (b *pbbstream.Block, skipped bool, err error) } diff --git a/blockpoller/init_test.go b/blockpoller/init_test.go index 68b9e9b..841c4f0 100644 --- a/blockpoller/init_test.go +++ b/blockpoller/init_test.go @@ -1,7 +1,6 @@ package blockpoller import ( - "context" "fmt" "testing" "time" @@ -27,31 +26,31 @@ type TestBlock struct { send *pbbstream.Block } -var _ BlockFetcher = &TestBlockFetcher{} +var _ BlockFetcher[any] = &TestBlockFetcher[any]{} -type TestBlockFetcher struct { +type TestBlockFetcher[C any] struct { t *testing.T blocks []*TestBlock idx uint64 completed bool } -func newTestBlockFetcher(t *testing.T, blocks []*TestBlock) *TestBlockFetcher { - return &TestBlockFetcher{ +func newTestBlockFetcher[C any](t *testing.T, blocks []*TestBlock) *TestBlockFetcher[C] { + return &TestBlockFetcher[C]{ t: t, blocks: blocks, } } -func (b *TestBlockFetcher) PollingInterval() time.Duration { +func (b *TestBlockFetcher[C]) PollingInterval() time.Duration { return 0 } -func (b *TestBlockFetcher) IsBlockAvailable(requestedSlot uint64) bool { +func (b *TestBlockFetcher[C]) IsBlockAvailable(requestedSlot uint64) bool { return true } -func (b *TestBlockFetcher) Fetch(_ context.Context, blkNum uint64) (*pbbstream.Block, bool, error) { +func (b *TestBlockFetcher[C]) Fetch(c C, blkNum uint64) (*pbbstream.Block, bool, error) { if len(b.blocks) == 0 { assert.Fail(b.t, fmt.Sprintf("should not have fetched block %d", blkNum)) } @@ -69,7 +68,7 @@ func (b *TestBlockFetcher) Fetch(_ context.Context, blkNum uint64) (*pbbstream.B return blkToSend, false, nil } -func (b *TestBlockFetcher) check(t *testing.T) { +func (b *TestBlockFetcher[C]) check(t *testing.T) { t.Helper() require.Equal(b.t, uint64(len(b.blocks)), b.idx, "we should have fetched all %d blocks, only fired %d blocks", len(b.blocks), b.idx) } diff --git a/blockpoller/options.go b/blockpoller/options.go index 057512d..c0d1253 100644 --- a/blockpoller/options.go +++ b/blockpoller/options.go @@ -2,30 +2,30 @@ package blockpoller import "go.uber.org/zap" -type Option func(*BlockPoller) +type Option[C any] func(*BlockPoller[C]) -func WithBlockFetchRetryCount(v uint64) Option { - return func(p *BlockPoller) { +func WithBlockFetchRetryCount[C any](v uint64) Option[C] { + return func(p *BlockPoller[C]) { p.fetchBlockRetryCount = v } } -func WithStoringState(stateStorePath string) Option { - return func(p *BlockPoller) { +func WithStoringState[C any](stateStorePath string) Option[C] { + return func(p *BlockPoller[C]) { p.stateStorePath = stateStorePath } } // IgnoreCursor ensures the poller will ignore the cursor and start from the startBlockNum // the cursor will still be saved as the poller progresses -func IgnoreCursor() Option { - return func(p *BlockPoller) { +func IgnoreCursor[C any]() Option[C] { + return func(p *BlockPoller[C]) { p.ignoreCursor = true } } -func WithLogger(logger *zap.Logger) Option { - return func(p *BlockPoller) { +func WithLogger[C any](logger *zap.Logger) Option[C] { + return func(p *BlockPoller[C]) { p.logger = logger } } diff --git a/blockpoller/poller.go b/blockpoller/poller.go index 070276f..ad6706e 100644 --- a/blockpoller/poller.go +++ b/blockpoller/poller.go @@ -8,12 +8,12 @@ import ( "time" "github.com/streamingfast/bstream" - "github.com/streamingfast/bstream/forkable" pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" "github.com/streamingfast/derr" "github.com/streamingfast/dhammer" "github.com/streamingfast/firehose-core/internal/utils" + "github.com/streamingfast/firehose-core/rpc" "github.com/streamingfast/shutter" "go.uber.org/zap" ) @@ -27,7 +27,7 @@ func newBlock(block2 *pbbstream.Block) *block { return &block{block2, false} } -type BlockPoller struct { +type BlockPoller[C any] struct { *shutter.Shutter startBlockNumGate uint64 fetchBlockRetryCount uint64 @@ -35,9 +35,11 @@ type BlockPoller struct { ignoreCursor bool forceFinalityAfterBlocks *uint64 - blockFetcher BlockFetcher + blockFetcher BlockFetcher[C] blockHandler BlockHandler - forkDB *forkable.ForkDB + clients *rpc.Clients[C] + + forkDB *forkable.ForkDB logger *zap.Logger @@ -47,16 +49,18 @@ type BlockPoller struct { optimisticallyPolledBlocksLock sync.Mutex } -func New( - blockFetcher BlockFetcher, +func New[C any]( + blockFetcher BlockFetcher[C], blockHandler BlockHandler, - opts ...Option, -) *BlockPoller { + clients *rpc.Clients[C], + opts ...Option[C], +) *BlockPoller[C] { - b := &BlockPoller{ + b := &BlockPoller[C]{ Shutter: shutter.New(), blockFetcher: blockFetcher, blockHandler: blockHandler, + clients: clients, fetchBlockRetryCount: math.MaxUint64, logger: zap.NewNop(), forceFinalityAfterBlocks: utils.GetEnvForceFinalityAfterBlocks(), @@ -69,7 +73,7 @@ func New( return b } -func (p *BlockPoller) Run(ctx context.Context, firstStreamableBlockNum uint64, blockFetchBatchSize int) error { +func (p *BlockPoller[C]) Run(firstStreamableBlockNum uint64, stopBlock *uint64, blockFetchBatchSize int) error { p.startBlockNumGate = firstStreamableBlockNum p.logger.Info("starting poller", zap.Uint64("first_streamable_block", firstStreamableBlockNum), @@ -83,14 +87,25 @@ func (p *BlockPoller) Run(ctx context.Context, firstStreamableBlockNum uint64, b } p.forkDB = forkDB - return p.run(resolvedStartBlock, blockFetchBatchSize) + resolveStopBlock := uint64(math.MaxUint64) + if stopBlock != nil { + resolveStopBlock = *stopBlock + } + + return p.run(resolvedStartBlock, resolveStopBlock, blockFetchBatchSize) } -func (p *BlockPoller) run(resolvedStartBlock bstream.BlockRef, blockFetchBatchSize int) (err error) { +func (p *BlockPoller[C]) run(resolvedStartBlock bstream.BlockRef, stopBlock uint64, blockFetchBatchSize int) (err error) { currentCursor := &cursor{state: ContinuousSegState, logger: p.logger} blockToFetch := resolvedStartBlock.Num() var hashToFetch *string for { + + if blockToFetch >= stopBlock { + p.logger.Info("stop block reach", zap.Uint64("stop_block", stopBlock)) + return nil + } + if p.IsTerminating() { p.logger.Info("block poller is terminating") } @@ -133,7 +148,7 @@ func (p *BlockPoller) run(resolvedStartBlock bstream.BlockRef, blockFetchBatchSi } } -func (p *BlockPoller) processBlock(currentState *cursor, block *pbbstream.Block) (uint64, *string, error) { +func (p *BlockPoller[C]) processBlock(currentState *cursor, block *pbbstream.Block) (uint64, *string, error) { p.logger.Info("processing block", zap.Stringer("block", block.AsRef()), zap.Uint64("lib_num", block.LibNum)) if block.Number < p.forkDB.LIBNum() { panic(fmt.Errorf("unexpected error block %d is below the current LIB num %d. There should be no re-org above the current LIB num", block.Number, p.forkDB.LIBNum())) @@ -189,31 +204,40 @@ type BlockItem struct { skipped bool } -func (p *BlockPoller) loadNextBlocks(requestedBlock uint64, numberOfBlockToFetch int) error { +func (p *BlockPoller[C]) loadNextBlocks(requestedBlock uint64, numberOfBlockToFetch int) error { p.optimisticallyPolledBlocks = map[uint64]*BlockItem{} p.fetching = true nailer := dhammer.NewNailer(10, func(ctx context.Context, blockToFetch uint64) (*BlockItem, error) { var blockItem *BlockItem err := derr.Retry(p.fetchBlockRetryCount, func(ctx context.Context) error { - b, skip, err := p.blockFetcher.Fetch(ctx, blockToFetch) - if err != nil { - return fmt.Errorf("unable to fetch block %d: %w", blockToFetch, err) - } - if skip { - blockItem = &BlockItem{ - blockNumber: blockToFetch, - block: nil, - skipped: true, + + bi, err := rpc.WithClients(p.clients, func(client C) (*BlockItem, error) { + b, skipped, err := p.blockFetcher.Fetch(client, blockToFetch) + if err != nil { + return nil, fmt.Errorf("fetching block %d: %w", blockToFetch, err) } - return nil - } - //todo: add block to cache - blockItem = &BlockItem{ - blockNumber: blockToFetch, - block: b, - skipped: false, + + if skipped { + return &BlockItem{ + blockNumber: blockToFetch, + block: nil, + skipped: true, + }, nil + } + + return &BlockItem{ + blockNumber: blockToFetch, + block: b, + skipped: false, + }, nil + }) + + if err != nil { + return fmt.Errorf("fetching block %d with retry : %w", blockToFetch, err) } + blockItem = bi + return nil }) @@ -272,7 +296,7 @@ func (p *BlockPoller) loadNextBlocks(requestedBlock uint64, numberOfBlockToFetch return nil } -func (p *BlockPoller) requestBlock(blockNumber uint64, numberOfBlockToFetch int) chan *BlockItem { +func (p *BlockPoller[C]) requestBlock(blockNumber uint64, numberOfBlockToFetch int) chan *BlockItem { p.logger.Info("requesting block", zap.Uint64("block_num", blockNumber)) requestedBlock := make(chan *BlockItem) @@ -314,7 +338,12 @@ func (p *BlockPoller) requestBlock(blockNumber uint64, numberOfBlockToFetch int) return requestedBlock } -func (p *BlockPoller) fetchBlockWithHash(blkNum uint64, hash string) (*pbbstream.Block, error) { +type FetchResponse struct { + Block *pbbstream.Block + Skipped bool +} + +func (p *BlockPoller[C]) fetchBlockWithHash(blkNum uint64, hash string) (*pbbstream.Block, error) { p.logger.Info("fetching block with hash", zap.Uint64("block_num", blkNum), zap.String("hash", hash)) _ = hash //todo: hash will be used to fetch block from cache @@ -322,16 +351,25 @@ func (p *BlockPoller) fetchBlockWithHash(blkNum uint64, hash string) (*pbbstream var out *pbbstream.Block var skipped bool + err := derr.Retry(p.fetchBlockRetryCount, func(ctx context.Context) error { - //todo: get block from cache - var fetchErr error - out, skipped, fetchErr = p.blockFetcher.Fetch(ctx, blkNum) - if fetchErr != nil { - return fmt.Errorf("unable to fetch block %d: %w", blkNum, fetchErr) - } - if skipped { - return nil + br, err := rpc.WithClients(p.clients, func(client C) (br *FetchResponse, err error) { + b, skipped, err := p.blockFetcher.Fetch(client, blkNum) + if err != nil { + return nil, fmt.Errorf("fetching block block %d: %w", blkNum, err) + } + return &FetchResponse{ + Block: b, + Skipped: skipped, + }, nil + }) + + if err != nil { + return fmt.Errorf("fetching block with retry %d: %w", blkNum, err) } + + out = br.Block + skipped = br.Skipped return nil }) @@ -350,7 +388,7 @@ func (p *BlockPoller) fetchBlockWithHash(blkNum uint64, hash string) (*pbbstream return out, nil } -func (p *BlockPoller) fireCompleteSegment(blocks []*forkable.Block) error { +func (p *BlockPoller[C]) fireCompleteSegment(blocks []*forkable.Block) error { for _, blk := range blocks { b := blk.Object.(*block) if _, err := p.fire(b); err != nil { @@ -360,7 +398,7 @@ func (p *BlockPoller) fireCompleteSegment(blocks []*forkable.Block) error { return nil } -func (p *BlockPoller) fire(blk *block) (bool, error) { +func (p *BlockPoller[C]) fire(blk *block) (bool, error) { if blk.fired { return false, nil } diff --git a/blockpoller/poller_client_test.go b/blockpoller/poller_client_test.go new file mode 100644 index 0000000..f9b2ee4 --- /dev/null +++ b/blockpoller/poller_client_test.go @@ -0,0 +1,133 @@ +package blockpoller + +import ( + "fmt" + "testing" + + pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" + "github.com/streamingfast/firehose-core/rpc" + "github.com/stretchr/testify/require" +) + +type TestBlockItem struct { + skipped bool + err error + block *pbbstream.Block +} +type TestBlockClient struct { + currentIndex int + blockItems []*TestBlockItem + + blockProduceCount int + skippedCount int + errProduceCount int + name string +} + +func NewTestBlockClient(blockItems []*TestBlockItem, name string) *TestBlockClient { + return &TestBlockClient{ + blockItems: blockItems, + name: name, + } +} + +func (c *TestBlockClient) GetBlock(blockNumber uint64) (*TestBlockItem, error) { + fmt.Printf("%s: GetBlock %d\n", c.name, blockNumber) + if c.EOB() { + fmt.Println("TestBlockClient: EOB", blockNumber) + return nil, TestErrCompleteDone + } + b := c.blockItems[c.currentIndex] + if b.block.Number != blockNumber { + panic(fmt.Sprintf("%s expected requested block %d, got %d", c.name, b.block.Number, blockNumber)) + } + + c.currentIndex++ + + if b.err != nil { + fmt.Printf("%s: error producing block %d\n", c.name, blockNumber) + c.errProduceCount++ + return nil, b.err + } + + if b.skipped { + fmt.Printf("%s: skipped producing block %d\n", c.name, blockNumber) + c.skippedCount++ + return b, nil + } + + c.blockProduceCount++ + + return b, nil +} + +func (c *TestBlockClient) EOB() bool { + return c.currentIndex >= len(c.blockItems) +} + +type TestBlockFetcherWithClient struct { +} + +func (t TestBlockFetcherWithClient) IsBlockAvailable(requestedSlot uint64) bool { + return true +} + +func (t TestBlockFetcherWithClient) Fetch(client *TestBlockClient, blkNum uint64) (b *pbbstream.Block, skipped bool, err error) { + bi, err := client.GetBlock(blkNum) + if err != nil { + return nil, false, err + } + if bi.skipped { + return nil, true, nil + } + return bi.block, false, nil +} + +func TestPollerClient(t *testing.T) { + clients := rpc.NewClients[*TestBlockClient]() + var blockItems1 []*TestBlockItem + var blockItems2 []*TestBlockItem + + //init call + blockItems1 = append(blockItems1, &TestBlockItem{block: blk("99a", "98a", 97)}) + //1st fetch block + blockItems1 = append(blockItems1, &TestBlockItem{block: blk("99a", "98a", 97)}) + + //c1 will produce an error that c2 will be call and return requested block without error + blockItems1 = append(blockItems1, &TestBlockItem{block: blk("100a", "99a", 98), err: fmt.Errorf("test error")}) + blockItems2 = append(blockItems2, &TestBlockItem{block: blk("100a", "99a", 98)}) + + //c1 and c2 will produce errors the c1 will be call and return requested block without error + blockItems1 = append(blockItems1, &TestBlockItem{block: blk("101a", "100a", 100), err: fmt.Errorf("test error")}) + blockItems2 = append(blockItems2, &TestBlockItem{block: blk("101a", "100a", 100), err: fmt.Errorf("test error")}) + blockItems1 = append(blockItems1, &TestBlockItem{block: blk("101a", "100a", 100)}) + + //test skip block + blockItems1 = append(blockItems1, &TestBlockItem{block: blk("102a", "101a", 101)}) + blockItems1 = append(blockItems1, &TestBlockItem{block: blk("103a", "101a", 101), skipped: true}) + blockItems1 = append(blockItems1, &TestBlockItem{block: blk("104a", "102a", 102)}) + + c1 := NewTestBlockClient(blockItems1, "c1") + c2 := NewTestBlockClient(blockItems2, "c2") + + clients.Add(c1) + clients.Add(c2) + + fetcher := TestBlockFetcherWithClient{} + handler := &TestNoopBlockFinalizer{} + poller := New(fetcher, handler, clients) + + stopBlock := uint64(104) + err := poller.Run(99, &stopBlock, 1) + + require.NoError(t, err) + + require.Equal(t, 5, c1.blockProduceCount) + require.Equal(t, 1, c1.skippedCount) + require.Equal(t, 2, c1.errProduceCount) + + require.Equal(t, 1, c2.blockProduceCount) + require.Equal(t, 0, c2.skippedCount) + require.Equal(t, 1, c2.errProduceCount) + +} diff --git a/blockpoller/poller_test.go b/blockpoller/poller_test.go index 5821588..8e2467a 100644 --- a/blockpoller/poller_test.go +++ b/blockpoller/poller_test.go @@ -1,12 +1,13 @@ package blockpoller import ( - "context" "errors" "fmt" "strconv" "testing" + "github.com/streamingfast/firehose-core/rpc" + "github.com/streamingfast/bstream" "github.com/streamingfast/bstream/forkable" pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" @@ -18,12 +19,14 @@ func TestForkHandler_run(t *testing.T) { tests := []struct { name string firstStreamableBlock bstream.BlockRef + stopAtBlock uint64 blocks []*TestBlock expectFireBlock []*pbbstream.Block }{ { name: "start block 0", firstStreamableBlock: blk("0a", "", 0).AsRef(), + stopAtBlock: uint64(3), blocks: []*TestBlock{ tb("0a", "", 0), //init the state fetch tb("0a", "", 0), @@ -39,6 +42,7 @@ func TestForkHandler_run(t *testing.T) { { name: "Fork 1", firstStreamableBlock: blk("100a", "99a", 100).AsRef(), + stopAtBlock: uint64(108), blocks: []*TestBlock{ tb("100a", "99a", 100), //init the state fetch tb("100a", "99a", 100), @@ -53,6 +57,7 @@ func TestForkHandler_run(t *testing.T) { tb("102b", "101a", 100), tb("106a", "105a", 100), tb("105a", "104a", 100), + tb("107a", "106a", 100), }, expectFireBlock: []*pbbstream.Block{ blk("100a", "99a", 100), @@ -71,6 +76,7 @@ func TestForkHandler_run(t *testing.T) { { name: "Fork 2", firstStreamableBlock: blk("100a", "99a", 100).AsRef(), + stopAtBlock: uint64(106), blocks: []*TestBlock{ tb("100a", "99a", 100), //init the state fetch tb("100a", "99a", 100), @@ -99,6 +105,7 @@ func TestForkHandler_run(t *testing.T) { { name: "with lib advancing", firstStreamableBlock: blk("100a", "99a", 100).AsRef(), + stopAtBlock: uint64(106), blocks: []*TestBlock{ tb("100a", "99a", 100), //init the state fetch tb("100a", "99a", 100), @@ -127,6 +134,7 @@ func TestForkHandler_run(t *testing.T) { { name: "with skipping blocks", firstStreamableBlock: blk("100a", "99a", 100).AsRef(), + stopAtBlock: uint64(106), blocks: []*TestBlock{ tb("100a", "99a", 100), //init the state fetch tb("100a", "99a", 100), @@ -156,14 +164,17 @@ func TestForkHandler_run(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - blockFetcher := newTestBlockFetcher(t, tt.blocks) + blockFetcher := newTestBlockFetcher[any](t, tt.blocks) blockFinalizer := newTestBlockFinalizer(t, tt.expectFireBlock) - poller := New(blockFetcher, blockFinalizer) + clients := rpc.NewClients[any]() + clients.Add(new(any)) + + poller := New(blockFetcher, blockFinalizer, clients) poller.fetchBlockRetryCount = 0 poller.forkDB = forkable.NewForkDB() - err := poller.Run(context.Background(), tt.firstStreamableBlock.Num(), 1) + err := poller.Run(tt.firstStreamableBlock.Num(), &tt.stopAtBlock, 1) if !errors.Is(err, TestErrCompleteDone) { require.NoError(t, err) } @@ -210,7 +221,7 @@ func TestForkHandler_fire(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - poller := &BlockPoller{startBlockNumGate: test.startBlockNum, blockHandler: &TestNoopBlockFinalizer{}} + poller := &BlockPoller[any]{startBlockNumGate: test.startBlockNum, blockHandler: &TestNoopBlockFinalizer{}} ok, err := poller.fire(test.block) require.NoError(t, err) assert.Equal(t, test.expect, ok) diff --git a/blockpoller/state_file.go b/blockpoller/state_file.go index 44320db..4603fcf 100644 --- a/blockpoller/state_file.go +++ b/blockpoller/state_file.go @@ -1,12 +1,13 @@ package blockpoller import ( - "context" "encoding/json" "fmt" "os" "path/filepath" + "github.com/streamingfast/firehose-core/rpc" + "github.com/streamingfast/bstream" "github.com/streamingfast/bstream/forkable" pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" @@ -33,7 +34,7 @@ type stateFile struct { Blocks []blockRefWithPrev } -func (p *BlockPoller) isStateFileExist(stateStorePath string) bool { +func (p *BlockPoller[C]) isStateFileExist(stateStorePath string) bool { if stateStorePath == "" { p.logger.Info("No state store path set, skipping cursor check") return false @@ -67,7 +68,7 @@ func getState(stateStorePath string) (*stateFile, error) { return &sf, nil } -func (p *BlockPoller) saveState(blocks []*forkable.Block) error { +func (p *BlockPoller[C]) saveState(blocks []*forkable.Block) error { p.logger.Debug("saving cursor", zap.String("state_store_path", p.stateStorePath)) if p.stateStorePath == "" { return nil @@ -108,23 +109,35 @@ func (p *BlockPoller) saveState(blocks []*forkable.Block) error { return nil } -func (p *BlockPoller) initState(firstStreamableBlockNum uint64, stateStorePath string, ignoreCursor bool, logger *zap.Logger) (*forkable.ForkDB, bstream.BlockRef, error) { +func (p *BlockPoller[C]) initState(firstStreamableBlockNum uint64, stateStorePath string, ignoreCursor bool, logger *zap.Logger) (*forkable.ForkDB, bstream.BlockRef, error) { forkDB := forkable.NewForkDB(forkable.ForkDBWithLogger(logger)) if ignoreCursor || !p.isStateFileExist(stateStorePath) { logger.Info("ignoring cursor, fetching first streamable block", zap.Uint64("first_streamable_block", firstStreamableBlockNum)) for { - firstStreamableBlock, skip, err := p.blockFetcher.Fetch(context.Background(), firstStreamableBlockNum) - firstStreamableBlockRef := firstStreamableBlock.AsRef() + br, err := rpc.WithClients(p.clients, func(client C) (*FetchResponse, error) { + firstStreamableBlock, skip, err := p.blockFetcher.Fetch(client, firstStreamableBlockNum) + if err != nil { + return nil, fmt.Errorf("fetching first streamable block: %w", err) + } + if skip { + return nil, fmt.Errorf("expecting first streamable block %q not to be skipped", firstStreamableBlock.AsRef()) + } + return &FetchResponse{ + Block: firstStreamableBlock, + }, nil + }) if err != nil { p.logger.Warn("fetching first streamable block", zap.Uint64("first_streamable_block", firstStreamableBlockNum), zap.Error(err)) continue } - if skip { - return nil, nil, fmt.Errorf("expecting first streamable block %q not to be skiped", firstStreamableBlockRef) + if br.Skipped { + return nil, nil, fmt.Errorf("expecting first streamable block %q not to be skiped", firstStreamableBlockNum) } + firstStreamableBlockRef := br.Block.AsRef() + logger.Info("ignoring cursor, will start from...", zap.Stringer("first_streamable_block", firstStreamableBlockRef), zap.Stringer("lib", firstStreamableBlockRef), diff --git a/blockpoller/state_file_test.go b/blockpoller/state_file_test.go index 266d190..2a85523 100644 --- a/blockpoller/state_file_test.go +++ b/blockpoller/state_file_test.go @@ -7,6 +7,7 @@ import ( "github.com/streamingfast/bstream" "github.com/streamingfast/bstream/forkable" + "github.com/streamingfast/firehose-core/rpc" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -42,9 +43,9 @@ func TestFireBlockFinalizer_state(t *testing.T) { expectedStateFileCnt := `{"Lib":{"id":"101a","num":101},"LastFiredBlock":{"id":"105a","num":105,"previous_ref_id":"104a"},"Blocks":[{"id":"101a","num":101,"previous_ref_id":"100a"},{"id":"102a","num":102,"previous_ref_id":"101a"},{"id":"103a","num":103,"previous_ref_id":"102a"},{"id":"104a","num":104,"previous_ref_id":"103a"},{"id":"105a","num":105,"previous_ref_id":"104a"}]}` - blockFetcher := newTestBlockFetcher(t, []*TestBlock{tb("60a", "59a", 60)}) + blockFetcher := newTestBlockFetcher[any](t, []*TestBlock{tb("60a", "59a", 60)}) - poller := &BlockPoller{ + poller := &BlockPoller[any]{ stateStorePath: dirName, forkDB: fk, logger: zap.NewNop(), @@ -73,11 +74,15 @@ func TestFireBlockFinalizer_noSstate(t *testing.T) { require.NoError(t, err) defer os.Remove(dirName) - blockFetcher := newTestBlockFetcher(t, []*TestBlock{tb("60a", "59a", 60)}) - poller := &BlockPoller{ + blockFetcher := newTestBlockFetcher[any](t, []*TestBlock{tb("60a", "59a", 60)}) + clients := rpc.NewClients[any]() + clients.Add(new(any)) + + poller := &BlockPoller[any]{ stateStorePath: dirName, logger: zap.NewNop(), blockFetcher: blockFetcher, + clients: clients, } forkDB, startBlock, err := poller.initState(60, dirName, false, zap.NewNop())