Skip to content

Commit

Permalink
Poll missing blocks in gap handler (#113)
Browse files Browse the repository at this point in the history
### TL;DR
Changed block gap handling to poll missing blocks instead of marking them as failures.

### What changed?
- Replaced the block failure recording mechanism in `handleGap` with a new polling approach
- Created a new `BoundlessPoller` that can poll arbitrary block ranges
- Refactored the `Poller` to extract common functionality into `BoundlessPoller`
- Updated tests to reflect the new polling behavior

### How to test?
1. Run the indexer with a gap in block sequence
2. Verify that missing blocks are now polled and processed
3. Confirm that blocks are properly indexed instead of being marked as failures
4. Run unit tests to verify the new gap handling behavior

### Why make this change?
The previous implementation would mark missing blocks as failures without attempting to retrieve them. This new approach actively tries to fetch and process missing blocks, improving indexing speed
  • Loading branch information
iuwqyir authored Oct 25, 2024
2 parents c6fc50e + 7187f40 commit 31df1e0
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 85 deletions.
31 changes: 3 additions & 28 deletions internal/orchestrator/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,33 +184,8 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc
}
log.Debug().Msgf("Detected %d missing blocks between blocks %s and %s", missingBlockCount, expectedStartBlockNumber.String(), actualFirstBlock.Number.String())

existingBlockFailures, err := c.storage.OrchestratorStorage.GetBlockFailures(storage.QueryFilter{BlockNumbers: missingBlockNumbers, ChainId: c.rpc.GetChainID()})
if err != nil {
return fmt.Errorf("error getting block failures while handling gap: %v", err)
}

existingBlockFailuresMap := make(map[string]*common.BlockFailure)
for _, failure := range existingBlockFailures {
blockNumberStr := failure.BlockNumber.String()
existingBlockFailuresMap[blockNumberStr] = &failure
}

blockFailures := make([]common.BlockFailure, 0)
for _, blockNumber := range missingBlockNumbers {
blockNumberStr := blockNumber.String()
if _, ok := existingBlockFailuresMap[blockNumberStr]; !ok {
blockFailures = append(blockFailures, common.BlockFailure{
BlockNumber: blockNumber,
ChainId: c.rpc.GetChainID(),
FailureTime: time.Now(),
FailureCount: 1,
FailureReason: "Gap detected for this block",
})
}
}
log.Debug().Msgf("Storing %d block failures while handling gap", len(blockFailures))
if err := c.storage.OrchestratorStorage.StoreBlockFailures(blockFailures); err != nil {
return fmt.Errorf("error storing block failures while handling gap: %v", err)
}
poller := NewBoundlessPoller(c.rpc, c.storage)
log.Debug().Msgf("Polling %d blocks while handling gap: %v", len(missingBlockNumbers), missingBlockNumbers)
poller.Poll(missingBlockNumbers)
return fmt.Errorf("first block number (%s) in commit batch does not match expected (%s)", actualFirstBlock.Number.String(), expectedStartBlockNumber.String())
}
34 changes: 12 additions & 22 deletions internal/orchestrator/committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/mock"
config "github.com/thirdweb-dev/indexer/configs"
"github.com/thirdweb-dev/indexer/internal/common"
"github.com/thirdweb-dev/indexer/internal/rpc"
"github.com/thirdweb-dev/indexer/internal/storage"
mocks "github.com/thirdweb-dev/indexer/test/mocks"
)
Expand Down Expand Up @@ -173,36 +174,25 @@ func TestHandleGap(t *testing.T) {
}
committer := NewCommitter(mockRPC, mockStorage)

chainID := big.NewInt(1)
mockRPC.EXPECT().GetChainID().Return(chainID)

expectedStartBlockNumber := big.NewInt(100)
actualFirstBlock := common.Block{Number: big.NewInt(105)}

mockOrchestratorStorage.EXPECT().GetBlockFailures(storage.QueryFilter{
ChainId: chainID,
BlockNumbers: []*big.Int{big.NewInt(100), big.NewInt(101), big.NewInt(102), big.NewInt(103), big.NewInt(104)},
}).Return([]common.BlockFailure{}, nil)
mockOrchestratorStorage.On("StoreBlockFailures", mock.MatchedBy(func(failures []common.BlockFailure) bool {
return len(failures) == 5 && failures[0].ChainId == chainID && failures[0].BlockNumber.Cmp(big.NewInt(100)) == 0 &&
failures[0].FailureCount == 1 && failures[0].FailureReason == "Gap detected for this block" &&
failures[1].ChainId == chainID && failures[1].BlockNumber.Cmp(big.NewInt(101)) == 0 &&
failures[1].FailureCount == 1 && failures[1].FailureReason == "Gap detected for this block" &&
failures[2].ChainId == chainID && failures[2].BlockNumber.Cmp(big.NewInt(102)) == 0 &&
failures[2].FailureCount == 1 && failures[2].FailureReason == "Gap detected for this block" &&
failures[3].ChainId == chainID && failures[3].BlockNumber.Cmp(big.NewInt(103)) == 0 &&
failures[3].FailureCount == 1 && failures[3].FailureReason == "Gap detected for this block" &&
failures[4].ChainId == chainID && failures[4].BlockNumber.Cmp(big.NewInt(104)) == 0 &&
failures[4].FailureCount == 1 && failures[4].FailureReason == "Gap detected for this block"
})).Return(nil)
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{
Blocks: 5,
})
mockRPC.EXPECT().GetFullBlocks([]*big.Int{big.NewInt(100), big.NewInt(101), big.NewInt(102), big.NewInt(103), big.NewInt(104)}).Return([]rpc.GetFullBlockResult{
{BlockNumber: big.NewInt(100), Data: common.BlockData{Block: common.Block{Number: big.NewInt(100)}}},
{BlockNumber: big.NewInt(101), Data: common.BlockData{Block: common.Block{Number: big.NewInt(101)}}},
{BlockNumber: big.NewInt(102), Data: common.BlockData{Block: common.Block{Number: big.NewInt(102)}}},
{BlockNumber: big.NewInt(103), Data: common.BlockData{Block: common.Block{Number: big.NewInt(103)}}},
{BlockNumber: big.NewInt(104), Data: common.BlockData{Block: common.Block{Number: big.NewInt(104)}}},
})
mockStagingStorage.EXPECT().InsertStagingData(mock.Anything).Return(nil)

err := committer.handleGap(expectedStartBlockNumber, actualFirstBlock)

assert.Error(t, err)
assert.Contains(t, err.Error(), "first block number (105) in commit batch does not match expected (100)")

mockRPC.AssertExpectations(t)
mockOrchestratorStorage.AssertExpectations(t)
}

func TestStartCommitter(t *testing.T) {
Expand Down
86 changes: 51 additions & 35 deletions internal/orchestrator/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Poller struct {
triggerIntervalMs int64
storage storage.IStorage
lastPolledBlock *big.Int
pollFromBlock *big.Int
pollUntilBlock *big.Int
parallelPollers int
}
Expand All @@ -33,7 +34,7 @@ type BlockNumberWithError struct {
Error error
}

func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
func NewBoundlessPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
blocksPerPoll := config.Cfg.Poller.BlocksPerPoll
if blocksPerPoll == 0 {
blocksPerPoll = DEFAULT_BLOCKS_PER_POLL
Expand All @@ -42,6 +43,17 @@ func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
if triggerInterval == 0 {
triggerInterval = DEFAULT_TRIGGER_INTERVAL
}
return &Poller{
rpc: rpc,
triggerIntervalMs: int64(triggerInterval),
blocksPerPoll: int64(blocksPerPoll),
storage: storage,
parallelPollers: config.Cfg.Poller.ParallelPollers,
}
}

func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
poller := NewBoundlessPoller(rpc, storage)
untilBlock := big.NewInt(int64(config.Cfg.Poller.UntilBlock))
pollFromBlock := big.NewInt(int64(config.Cfg.Poller.FromBlock))
lastPolledBlock := new(big.Int).Sub(pollFromBlock, big.NewInt(1)) // needs to include the first block
Expand All @@ -56,15 +68,10 @@ func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
log.Debug().Msgf("Last polled block found in staging: %s", lastPolledBlock.String())
}
}
return &Poller{
rpc: rpc,
triggerIntervalMs: int64(triggerInterval),
blocksPerPoll: int64(blocksPerPoll),
storage: storage,
lastPolledBlock: lastPolledBlock,
pollUntilBlock: untilBlock,
parallelPollers: config.Cfg.Poller.ParallelPollers,
}
poller.lastPolledBlock = lastPolledBlock
poller.pollFromBlock = pollFromBlock
poller.pollUntilBlock = untilBlock
return poller
}

func (p *Poller) Start() {
Expand All @@ -78,30 +85,16 @@ func (p *Poller) Start() {
go func() {
for range tasks {
blockRangeMutex.Lock()
blockNumbers, err := p.getBlockRange()
blockNumbers, err := p.getNextBlockRange()
blockRangeMutex.Unlock()

if err != nil {
log.Error().Err(err).Msg("Error getting block range")
continue
}
if len(blockNumbers) < 1 {
log.Debug().Msg("No blocks to poll, skipping")
continue
}
endBlock := blockNumbers[len(blockNumbers)-1]
if endBlock != nil {
p.lastPolledBlock = endBlock
}
log.Debug().Msgf("Polling %d blocks starting from %s to %s", len(blockNumbers), blockNumbers[0], endBlock)

endBlockNumberFloat, _ := endBlock.Float64()
metrics.PollerLastTriggeredBlock.Set(endBlockNumberFloat)

worker := worker.NewWorker(p.rpc)
results := worker.Run(blockNumbers)
p.handleWorkerResults(results)
if p.reachedPollLimit(endBlock) {
lastPolledBlock := p.Poll(blockNumbers)
if p.reachedPollLimit(lastPolledBlock) {
log.Debug().Msg("Reached poll limit, exiting poller")
ticker.Stop()
return
Expand All @@ -118,11 +111,31 @@ func (p *Poller) Start() {
select {}
}

func (p *Poller) Poll(blockNumbers []*big.Int) (lastPolledBlock *big.Int) {
if len(blockNumbers) < 1 {
log.Debug().Msg("No blocks to poll, skipping")
return
}
endBlock := blockNumbers[len(blockNumbers)-1]
if endBlock != nil {
p.lastPolledBlock = endBlock
}
log.Debug().Msgf("Polling %d blocks starting from %s to %s", len(blockNumbers), blockNumbers[0], endBlock)

endBlockNumberFloat, _ := endBlock.Float64()
metrics.PollerLastTriggeredBlock.Set(endBlockNumberFloat)

worker := worker.NewWorker(p.rpc)
results := worker.Run(blockNumbers)
p.handleWorkerResults(results)
return endBlock
}

func (p *Poller) reachedPollLimit(blockNumber *big.Int) bool {
return p.pollUntilBlock.Sign() > 0 && blockNumber.Cmp(p.pollUntilBlock) >= 0
}

func (p *Poller) getBlockRange() ([]*big.Int, error) {
func (p *Poller) getNextBlockRange() ([]*big.Int, error) {
latestBlock, err := p.rpc.GetLatestBlockNumber()
if err != nil {
return nil, err
Expand All @@ -140,13 +153,7 @@ func (p *Poller) getBlockRange() ([]*big.Int, error) {
return nil, nil
}

blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1
blockNumbers := make([]*big.Int, blockCount)
for i := int64(0); i < blockCount; i++ {
blockNumbers[i] = new(big.Int).Add(startBlock, big.NewInt(i))
}

return blockNumbers, nil
return p.createBlockNumbersForRange(startBlock, endBlock), nil
}

func (p *Poller) getEndBlockForRange(startBlock *big.Int, latestBlock *big.Int) *big.Int {
Expand All @@ -161,6 +168,15 @@ func (p *Poller) getEndBlockForRange(startBlock *big.Int, latestBlock *big.Int)
return endBlock
}

func (p *Poller) createBlockNumbersForRange(startBlock *big.Int, endBlock *big.Int) []*big.Int {
blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1
blockNumbers := make([]*big.Int, blockCount)
for i := int64(0); i < blockCount; i++ {
blockNumbers[i] = new(big.Int).Add(startBlock, big.NewInt(i))
}
return blockNumbers
}

func (p *Poller) handleWorkerResults(results []rpc.GetFullBlockResult) {
var successfulResults []rpc.GetFullBlockResult
var failedResults []rpc.GetFullBlockResult
Expand Down

0 comments on commit 31df1e0

Please sign in to comment.