Skip to content

Commit

Permalink
fix optimistically fetched block bottle neck
Browse files Browse the repository at this point in the history
  • Loading branch information
billettc committed Apr 22, 2024
1 parent dba0687 commit 726964a
Showing 1 changed file with 46 additions and 17 deletions.
63 changes: 46 additions & 17 deletions blockpoller/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math"
"time"

"github.com/streamingfast/bstream"
"github.com/streamingfast/bstream/forkable"
Expand Down Expand Up @@ -39,6 +40,8 @@ type BlockPoller struct {
logger *zap.Logger

optimisticallyPolledBlocks map[uint64]*BlockItem

fetching bool
}

func New(
Expand Down Expand Up @@ -103,7 +106,20 @@ func (p *BlockPoller) run(resolvedStartBlock bstream.BlockRef, numberOfBlockToFe
if hashToFetch != nil {
fetchedBlock, err = p.fetchBlockWithHash(blockToFetch, *hashToFetch)
} else {
fetchedBlock, err = p.fetchBlock(blockToFetch, numberOfBlockToFetch)

for {
requestedBlockItem := p.requestBlock(blockToFetch, numberOfBlockToFetch)
fetchedBlockItem := <-requestedBlockItem

if fetchedBlockItem.skipped {
numberOfBlockToFetch++
continue
}

fetchedBlock = fetchedBlockItem.block
break
}

}

if err != nil {
Expand Down Expand Up @@ -179,6 +195,7 @@ type BlockItem struct {

func (p *BlockPoller) loadNextBlocks(requestedBlock uint64, numberOfBlockToFetch int) error {
p.optimisticallyPolledBlocks = map[uint64]*BlockItem{}
p.fetching = true

nailer := dhammer.NewNailer(10, func(ctx context.Context, blockToFetch uint64) (*BlockItem, error) {
var blockItem *BlockItem
Expand Down Expand Up @@ -220,7 +237,6 @@ func (p *BlockPoller) loadNextBlocks(requestedBlock uint64, numberOfBlockToFetch
for blockItem := range nailer.Out {
p.optimisticallyPolledBlocks[blockItem.blockNumber] = blockItem
}

close(done)
}()

Expand Down Expand Up @@ -249,31 +265,44 @@ func (p *BlockPoller) loadNextBlocks(requestedBlock uint64, numberOfBlockToFetch

<-done

p.fetching = false

if nailer.Err() != nil {
return fmt.Errorf("failed optimistically fetch blocks starting at %d: %w", requestedBlock, nailer.Err())
}

return nil
}

func (p *BlockPoller) fetchBlock(blockNumber uint64, numberOfBlockToFetch int) (*pbbstream.Block, error) {
for {
blockItem, found := p.optimisticallyPolledBlocks[blockNumber]
if !found {
err := p.loadNextBlocks(blockNumber, numberOfBlockToFetch)
if err != nil {
return nil, fmt.Errorf("failed to load next blocks: %w", err)
func (p *BlockPoller) requestBlock(blockNumber uint64, numberOfBlockToFetch int) chan *BlockItem {
requestedBlock := make(chan *BlockItem)

go func(requestedBlock chan *BlockItem) {
for {
blockItem, found := p.optimisticallyPolledBlocks[blockNumber]
if !found {
if !p.fetching {
go func() {
err := p.loadNextBlocks(blockNumber, numberOfBlockToFetch)
if err != nil {
p.Shutdown(err)
return
}
}()
}
p.logger.Info("waiting for block to be fetched", zap.Uint64("block_num", blockNumber))
time.Sleep(100 * time.Millisecond)
continue
}
continue //that will retry the current block after loading the more blocks
}
if blockItem.skipped {
blockNumber++
continue

p.logger.Info("block was optimistically polled", zap.Uint64("block_num", blockNumber))
requestedBlock <- blockItem
break
}

p.logger.Info("block was optimistically polled", zap.Uint64("block_num", blockNumber))
return blockItem.block, nil
}
}(requestedBlock)

return requestedBlock
}

func (p *BlockPoller) fetchBlockWithHash(blkNum uint64, hash string) (*pbbstream.Block, error) {
Expand Down

0 comments on commit 726964a

Please sign in to comment.