diff --git a/blockpoller/poller.go b/blockpoller/poller.go index 08198f2..3baf8d3 100644 --- a/blockpoller/poller.go +++ b/blockpoller/poller.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math" + "time" "github.com/streamingfast/bstream" "github.com/streamingfast/bstream/forkable" @@ -39,6 +40,8 @@ type BlockPoller struct { logger *zap.Logger optimisticallyPolledBlocks map[uint64]*BlockItem + + fetching bool } func New( @@ -103,7 +106,20 @@ func (p *BlockPoller) run(resolvedStartBlock bstream.BlockRef, numberOfBlockToFe if hashToFetch != nil { fetchedBlock, err = p.fetchBlockWithHash(blockToFetch, *hashToFetch) } else { - fetchedBlock, err = p.fetchBlock(blockToFetch, numberOfBlockToFetch) + + for { + requestedBlockItem := p.requestBlock(blockToFetch, numberOfBlockToFetch) + fetchedBlockItem := <-requestedBlockItem + + if fetchedBlockItem.skipped { + numberOfBlockToFetch++ + continue + } + + fetchedBlock = fetchedBlockItem.block + break + } + } if err != nil { @@ -179,6 +195,7 @@ type BlockItem struct { func (p *BlockPoller) loadNextBlocks(requestedBlock uint64, numberOfBlockToFetch int) error { p.optimisticallyPolledBlocks = map[uint64]*BlockItem{} + p.fetching = true nailer := dhammer.NewNailer(10, func(ctx context.Context, blockToFetch uint64) (*BlockItem, error) { var blockItem *BlockItem @@ -220,7 +237,6 @@ func (p *BlockPoller) loadNextBlocks(requestedBlock uint64, numberOfBlockToFetch for blockItem := range nailer.Out { p.optimisticallyPolledBlocks[blockItem.blockNumber] = blockItem } - close(done) }() @@ -249,6 +265,8 @@ func (p *BlockPoller) loadNextBlocks(requestedBlock uint64, numberOfBlockToFetch <-done + p.fetching = false + if nailer.Err() != nil { return fmt.Errorf("failed optimistically fetch blocks starting at %d: %w", requestedBlock, nailer.Err()) } @@ -256,24 +274,35 @@ func (p *BlockPoller) loadNextBlocks(requestedBlock uint64, numberOfBlockToFetch return nil } -func (p *BlockPoller) fetchBlock(blockNumber uint64, numberOfBlockToFetch int) (*pbbstream.Block, error) { - for { - blockItem, found := p.optimisticallyPolledBlocks[blockNumber] - if !found { - err := p.loadNextBlocks(blockNumber, numberOfBlockToFetch) - if err != nil { - return nil, fmt.Errorf("failed to load next blocks: %w", err) +func (p *BlockPoller) requestBlock(blockNumber uint64, numberOfBlockToFetch int) chan *BlockItem { + requestedBlock := make(chan *BlockItem) + + go func(requestedBlock chan *BlockItem) { + for { + blockItem, found := p.optimisticallyPolledBlocks[blockNumber] + if !found { + if !p.fetching { + go func() { + err := p.loadNextBlocks(blockNumber, numberOfBlockToFetch) + if err != nil { + p.Shutdown(err) + return + } + }() + } + p.logger.Info("waiting for block to be fetched", zap.Uint64("block_num", blockNumber)) + time.Sleep(100 * time.Millisecond) + continue } - continue //that will retry the current block after loading the more blocks - } - if blockItem.skipped { - blockNumber++ - continue + + p.logger.Info("block was optimistically polled", zap.Uint64("block_num", blockNumber)) + requestedBlock <- blockItem + break } - p.logger.Info("block was optimistically polled", zap.Uint64("block_num", blockNumber)) - return blockItem.block, nil - } + }(requestedBlock) + + return requestedBlock } func (p *BlockPoller) fetchBlockWithHash(blkNum uint64, hash string) (*pbbstream.Block, error) {