Skip to content

Commit

Permalink
Add missing block polling in committer (#114)
Browse files Browse the repository at this point in the history
### TL;DR

Added handling for missing staging data in the Committer and improved error handling.

### What changed?

- Implemented `handleMissingStagingData` function in the Committer to detect and handle missing blocks in staging data.
- Updated `getSequentialBlockDataToCommit` to call `handleMissingStagingData` when no blocks are found in staging.
- Added new tests to cover the missing staging data handling functionality.
- Removed unnecessary assertion calls in test files.
- Removed a debug log message in the `GetMaxBlockNumber` function of ClickHouseConnector.

### How to test?

1. Run the updated unit tests, particularly focusing on `TestHandleMissingStagingData` and `TestHandleMissingStagingDataIsPolledWithCorrectBatchSize`.
2. Simulate a scenario where staging data is missing and verify that the Committer correctly detects and handles the situation.
3. Check that the Committer polls for missing blocks with the correct batch size when staging data is missing.

### Why make this change?

This change improves the robustness of the Committer by adding a mechanism to handle cases where expected staging data is missing. It ensures that the system can recover from potential data gaps and maintain data integrity. The additional tests provide better coverage for this new functionality, increasing overall system reliability.
  • Loading branch information
iuwqyir authored Oct 25, 2024
2 parents 31df1e0 + 6238fe2 commit 81d8f50
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 20 deletions.
23 changes: 23 additions & 0 deletions internal/orchestrator/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
}
if blocksData == nil || len(*blocksData) == 0 {
log.Warn().Msgf("Committer didn't find the following range in staging: %v - %v", blocksToCommit[0].Int64(), blocksToCommit[len(blocksToCommit)-1].Int64())
c.handleMissingStagingData(blocksToCommit)
return nil, nil
}

Expand Down Expand Up @@ -189,3 +190,25 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc
poller.Poll(missingBlockNumbers)
return fmt.Errorf("first block number (%s) in commit batch does not match expected (%s)", actualFirstBlock.Number.String(), expectedStartBlockNumber.String())
}

func (c *Committer) handleMissingStagingData(blocksToCommit []*big.Int) {
// Checks if there are any blocks in staging after the current range end
lastStagedBlockNumber, err := c.storage.StagingStorage.GetLastStagedBlockNumber(c.rpc.GetChainID(), blocksToCommit[len(blocksToCommit)-1], big.NewInt(0))
if err != nil {
log.Error().Err(err).Msg("Error checking staged data for missing range")
return
}
if lastStagedBlockNumber == nil || lastStagedBlockNumber.Sign() <= 0 {
log.Debug().Msgf("Committer is caught up with staging. No need to poll for missing blocks.")
return
}
log.Debug().Msgf("Detected missing blocks in staging data starting from %s.", blocksToCommit[0].String())

poller := NewBoundlessPoller(c.rpc, c.storage)
blocksToPoll := blocksToCommit
if len(blocksToCommit) > int(poller.blocksPerPoll) {
blocksToPoll = blocksToCommit[:int(poller.blocksPerPoll)]
}
poller.Poll(blocksToPoll)
log.Debug().Msgf("Polled %d blocks due to committer detecting them as missing. Range: %s - %s", len(blocksToPoll), blocksToPoll[0].String(), blocksToPoll[len(blocksToPoll)-1].String())
}
107 changes: 88 additions & 19 deletions internal/orchestrator/committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@ func TestGetBlockNumbersToCommit(t *testing.T) {
assert.Equal(t, committer.blocksPerCommit, len(blockNumbers))
assert.Equal(t, big.NewInt(101), blockNumbers[0])
assert.Equal(t, big.NewInt(100+int64(committer.blocksPerCommit)), blockNumbers[len(blockNumbers)-1])

mockRPC.AssertExpectations(t)
mockMainStorage.AssertExpectations(t)
}

func TestGetSequentialBlockDataToCommit(t *testing.T) {
Expand Down Expand Up @@ -87,10 +84,6 @@ func TestGetSequentialBlockDataToCommit(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, result)
assert.Equal(t, 3, len(*result))

mockRPC.AssertExpectations(t)
mockMainStorage.AssertExpectations(t)
mockStagingStorage.AssertExpectations(t)
}

func TestGetSequentialBlockDataToCommitWithDuplicateBlocks(t *testing.T) {
Expand Down Expand Up @@ -130,10 +123,6 @@ func TestGetSequentialBlockDataToCommitWithDuplicateBlocks(t *testing.T) {
assert.Equal(t, big.NewInt(101), (*result)[0].Block.Number)
assert.Equal(t, big.NewInt(102), (*result)[1].Block.Number)
assert.Equal(t, big.NewInt(103), (*result)[2].Block.Number)

mockRPC.AssertExpectations(t)
mockMainStorage.AssertExpectations(t)
mockStagingStorage.AssertExpectations(t)
}

func TestCommit(t *testing.T) {
Expand All @@ -157,9 +146,6 @@ func TestCommit(t *testing.T) {
err := committer.commit(&blockData)

assert.NoError(t, err)

mockMainStorage.AssertExpectations(t)
mockStagingStorage.AssertExpectations(t)
}

func TestHandleGap(t *testing.T) {
Expand Down Expand Up @@ -206,7 +192,6 @@ func TestStartCommitter(t *testing.T) {
}

committer := NewCommitter(mockRPC, mockStorage)
committer.storage = mockStorage
committer.triggerIntervalMs = 100 // Set a short interval for testing

chainID := big.NewInt(1)
Expand All @@ -226,9 +211,93 @@ func TestStartCommitter(t *testing.T) {

// Wait for a short time to allow the committer to run
time.Sleep(200 * time.Millisecond)
}

func TestHandleMissingStagingData(t *testing.T) {
defer func() { config.Cfg = config.Config{} }()
config.Cfg.Committer.BlocksPerCommit = 5

mockRPC := mocks.NewMockIRPCClient(t)
mockMainStorage := mocks.NewMockIMainStorage(t)
mockStagingStorage := mocks.NewMockIStagingStorage(t)

mockStorage := storage.IStorage{
MainStorage: mockMainStorage,
StagingStorage: mockStagingStorage,
}

committer := NewCommitter(mockRPC, mockStorage)

chainID := big.NewInt(1)
mockRPC.EXPECT().GetChainID().Return(chainID)
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{
Blocks: 100,
})
mockRPC.EXPECT().GetFullBlocks([]*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2), big.NewInt(3), big.NewInt(4)}).Return([]rpc.GetFullBlockResult{
{BlockNumber: big.NewInt(0), Data: common.BlockData{Block: common.Block{Number: big.NewInt(0)}}},
{BlockNumber: big.NewInt(1), Data: common.BlockData{Block: common.Block{Number: big.NewInt(1)}}},
{BlockNumber: big.NewInt(2), Data: common.BlockData{Block: common.Block{Number: big.NewInt(2)}}},
{BlockNumber: big.NewInt(3), Data: common.BlockData{Block: common.Block{Number: big.NewInt(3)}}},
{BlockNumber: big.NewInt(4), Data: common.BlockData{Block: common.Block{Number: big.NewInt(4)}}},
})
mockStagingStorage.EXPECT().InsertStagingData(mock.Anything).Return(nil)

mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil)
expectedEndBlock := big.NewInt(4)
mockStagingStorage.EXPECT().GetLastStagedBlockNumber(chainID, expectedEndBlock, big.NewInt(0)).Return(big.NewInt(20), nil)

blockData := []common.BlockData{}
mockStagingStorage.EXPECT().GetStagingData(storage.QueryFilter{
ChainId: chainID,
BlockNumbers: []*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2), big.NewInt(3), big.NewInt(4)},
}).Return(&blockData, nil)

result, err := committer.getSequentialBlockDataToCommit()

// Assert that the expected methods were called
mockRPC.AssertExpectations(t)
mockMainStorage.AssertExpectations(t)
mockStagingStorage.AssertExpectations(t)
assert.NoError(t, err)
assert.Nil(t, result)
}

func TestHandleMissingStagingDataIsPolledWithCorrectBatchSize(t *testing.T) {
defer func() { config.Cfg = config.Config{} }()
config.Cfg.Committer.BlocksPerCommit = 5
config.Cfg.Poller.BlocksPerPoll = 3

mockRPC := mocks.NewMockIRPCClient(t)
mockMainStorage := mocks.NewMockIMainStorage(t)
mockStagingStorage := mocks.NewMockIStagingStorage(t)

mockStorage := storage.IStorage{
MainStorage: mockMainStorage,
StagingStorage: mockStagingStorage,
}

committer := NewCommitter(mockRPC, mockStorage)

chainID := big.NewInt(1)
mockRPC.EXPECT().GetChainID().Return(chainID)
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{
Blocks: 3,
})
mockRPC.EXPECT().GetFullBlocks([]*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2)}).Return([]rpc.GetFullBlockResult{
{BlockNumber: big.NewInt(0), Data: common.BlockData{Block: common.Block{Number: big.NewInt(0)}}},
{BlockNumber: big.NewInt(1), Data: common.BlockData{Block: common.Block{Number: big.NewInt(1)}}},
{BlockNumber: big.NewInt(2), Data: common.BlockData{Block: common.Block{Number: big.NewInt(2)}}},
})
mockStagingStorage.EXPECT().InsertStagingData(mock.Anything).Return(nil)

mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil)
expectedEndBlock := big.NewInt(4)
mockStagingStorage.EXPECT().GetLastStagedBlockNumber(chainID, expectedEndBlock, big.NewInt(0)).Return(big.NewInt(20), nil)

blockData := []common.BlockData{}
mockStagingStorage.EXPECT().GetStagingData(storage.QueryFilter{
ChainId: chainID,
BlockNumbers: []*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2), big.NewInt(3), big.NewInt(4)},
}).Return(&blockData, nil)

result, err := committer.getSequentialBlockDataToCommit()

assert.NoError(t, err)
assert.Nil(t, result)
}
1 change: 0 additions & 1 deletion internal/storage/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,6 @@ func (c *ClickHouseConnector) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumbe
}
return nil, err
}
zLog.Debug().Msgf("Max block number in main storage is: %s", maxBlockNumber.String())
return maxBlockNumber, nil
}

Expand Down

0 comments on commit 81d8f50

Please sign in to comment.