diff --git a/dataRetriever/dataPool/proofsCache/errors.go b/dataRetriever/dataPool/proofsCache/errors.go index 63376ef0a92..630dd8cc394 100644 --- a/dataRetriever/dataPool/proofsCache/errors.go +++ b/dataRetriever/dataPool/proofsCache/errors.go @@ -7,3 +7,6 @@ var ErrMissingProof = errors.New("missing proof") // ErrNilProof signals that a nil proof has been provided var ErrNilProof = errors.New("nil proof provided") + +// ErrAlreadyExistingEquivalentProof signals that the provided proof was already exiting in the pool +var ErrAlreadyExistingEquivalentProof = errors.New("already existing equivalent proof") diff --git a/dataRetriever/dataPool/proofsCache/proofsPool.go b/dataRetriever/dataPool/proofsCache/proofsPool.go index 1662f6c5cdc..a412794a6db 100644 --- a/dataRetriever/dataPool/proofsCache/proofsPool.go +++ b/dataRetriever/dataPool/proofsCache/proofsPool.go @@ -1,6 +1,7 @@ package proofscache import ( + "encoding/hex" "fmt" "sync" @@ -14,12 +15,16 @@ var log = logger.GetOrCreate("dataRetriever/proofscache") type proofsPool struct { mutCache sync.RWMutex cache map[uint32]*proofsCache + + mutAddedProofSubscribers sync.RWMutex + addedProofSubscribers []func(headerProof data.HeaderProofHandler) } // NewProofsPool creates a new proofs pool component func NewProofsPool() *proofsPool { return &proofsPool{ - cache: make(map[uint32]*proofsCache), + cache: make(map[uint32]*proofsCache), + addedProofSubscribers: make([]func(headerProof data.HeaderProofHandler), 0), } } @@ -36,8 +41,7 @@ func (pp *proofsPool) AddProof( hasProof := pp.HasProof(shardID, headerHash) if hasProof { - log.Trace("there was already a valid proof for header, headerHash: %s", headerHash) - return nil + return fmt.Errorf("%w, headerHash: %s", ErrAlreadyExistingEquivalentProof, hex.EncodeToString(headerHash)) } pp.mutCache.Lock() @@ -59,9 +63,20 @@ func (pp *proofsPool) AddProof( proofsPerShard.addProof(headerProof) + pp.callAddedProofSubscribers(headerProof) + return nil } +func (pp *proofsPool) callAddedProofSubscribers(headerProof data.HeaderProofHandler) { + pp.mutAddedProofSubscribers.RLock() + defer pp.mutAddedProofSubscribers.RUnlock() + + for _, handler := range pp.addedProofSubscribers { + go handler(headerProof) + } +} + // CleanupProofsBehindNonce will cleanup proofs from pool based on nonce func (pp *proofsPool) CleanupProofsBehindNonce(shardID uint32, nonce uint64) error { if nonce == 0 { @@ -120,6 +135,18 @@ func (pp *proofsPool) HasProof( return err == nil } +// RegisterHandler registers a new handler to be called when a new data is added +func (pp *proofsPool) RegisterHandler(handler func(headerProof data.HeaderProofHandler)) { + if handler == nil { + log.Error("attempt to register a nil handler to proofs pool") + return + } + + pp.mutAddedProofSubscribers.Lock() + pp.addedProofSubscribers = append(pp.addedProofSubscribers, handler) + pp.mutAddedProofSubscribers.Unlock() +} + // IsInterfaceNil returns true if there is no value under the interface func (pp *proofsPool) IsInterfaceNil() bool { return pp == nil diff --git a/dataRetriever/dataPool/proofsCache/proofsPool_test.go b/dataRetriever/dataPool/proofsCache/proofsPool_test.go index cbdcb63a19a..c4e373eeba7 100644 --- a/dataRetriever/dataPool/proofsCache/proofsPool_test.go +++ b/dataRetriever/dataPool/proofsCache/proofsPool_test.go @@ -8,6 +8,7 @@ import ( "sync/atomic" "testing" + "github.com/multiversx/mx-chain-core-go/data" "github.com/multiversx/mx-chain-core-go/data/block" proofscache "github.com/multiversx/mx-chain-go/dataRetriever/dataPool/proofsCache" "github.com/stretchr/testify/assert" @@ -65,6 +66,9 @@ func TestProofsPool_ShouldWork(t *testing.T) { _ = pp.AddProof(proof3) _ = pp.AddProof(proof4) + err := pp.AddProof(proof4) + require.True(t, errors.Is(err, proofscache.ErrAlreadyExistingEquivalentProof)) + proof, err := pp.GetProof(shardID, []byte("hash3")) require.Nil(t, err) require.Equal(t, proof3, proof) @@ -81,6 +85,28 @@ func TestProofsPool_ShouldWork(t *testing.T) { require.Equal(t, proof4, proof) } +func TestProofsPool_RegisterHandler(t *testing.T) { + t.Parallel() + + pp := proofscache.NewProofsPool() + + wasCalled := false + wg := sync.WaitGroup{} + wg.Add(1) + handler := func(proof data.HeaderProofHandler) { + wasCalled = true + wg.Done() + } + pp.RegisterHandler(nil) + pp.RegisterHandler(handler) + + _ = pp.AddProof(generateProof()) + + wg.Wait() + + assert.True(t, wasCalled) +} + func TestProofsPool_Concurrency(t *testing.T) { t.Parallel() @@ -95,7 +121,7 @@ func TestProofsPool_Concurrency(t *testing.T) { for i := 0; i < numOperations; i++ { go func(idx int) { - switch idx % 5 { + switch idx % 6 { case 0, 1, 2: _ = pp.AddProof(generateProof()) case 3: @@ -105,6 +131,10 @@ func TestProofsPool_Concurrency(t *testing.T) { } case 4: _ = pp.CleanupProofsBehindNonce(generateRandomShardID(), generateRandomNonce()) + case 5: + handler := func(proof data.HeaderProofHandler) { + } + pp.RegisterHandler(handler) default: assert.Fail(t, "should have not beed called") } diff --git a/epochStart/bootstrap/common.go b/epochStart/bootstrap/common.go index da6e99fda1b..a6621f86ed8 100644 --- a/epochStart/bootstrap/common.go +++ b/epochStart/bootstrap/common.go @@ -123,6 +123,9 @@ func checkArguments(args ArgsEpochStartBootstrap) error { if check.IfNil(args.NodesCoordinatorRegistryFactory) { return fmt.Errorf("%s: %w", baseErrorMessage, nodesCoordinator.ErrNilNodesCoordinatorRegistryFactory) } + if check.IfNil(args.EnableEpochsHandler) { + return fmt.Errorf("%s: %w", baseErrorMessage, epochStart.ErrNilEnableEpochsHandler) + } return nil } diff --git a/epochStart/bootstrap/epochStartMetaBlockProcessor.go b/epochStart/bootstrap/epochStartMetaBlockProcessor.go index ff1a4370ad7..8ee40232287 100644 --- a/epochStart/bootstrap/epochStartMetaBlockProcessor.go +++ b/epochStart/bootstrap/epochStartMetaBlockProcessor.go @@ -12,6 +12,7 @@ import ( "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-core-go/hashing" "github.com/multiversx/mx-chain-core-go/marshal" + "github.com/multiversx/mx-chain-go/common" "github.com/multiversx/mx-chain-go/epochStart" "github.com/multiversx/mx-chain-go/process" "github.com/multiversx/mx-chain-go/process/factory" @@ -26,14 +27,18 @@ const minNumConnectedPeers = 1 var _ process.InterceptorProcessor = (*epochStartMetaBlockProcessor)(nil) type epochStartMetaBlockProcessor struct { - messenger Messenger - requestHandler RequestHandler - marshalizer marshal.Marshalizer - hasher hashing.Hasher - mutReceivedMetaBlocks sync.RWMutex - mapReceivedMetaBlocks map[string]data.MetaHeaderHandler - mapMetaBlocksFromPeers map[string][]core.PeerID - chanConsensusReached chan bool + messenger Messenger + requestHandler RequestHandler + marshalizer marshal.Marshalizer + hasher hashing.Hasher + enableEpochsHandler common.EnableEpochsHandler + + mutReceivedMetaBlocks sync.RWMutex + mapReceivedMetaBlocks map[string]data.MetaHeaderHandler + mapMetaBlocksFromPeers map[string][]core.PeerID + + chanConfMetaBlockReached chan bool + chanMetaBlockReached chan bool metaBlock data.MetaHeaderHandler peerCountTarget int minNumConnectedPeers int @@ -49,6 +54,7 @@ func NewEpochStartMetaBlockProcessor( consensusPercentage uint8, minNumConnectedPeersConfig int, minNumOfPeersToConsiderBlockValidConfig int, + enableEpochsHandler common.EnableEpochsHandler, ) (*epochStartMetaBlockProcessor, error) { if check.IfNil(messenger) { return nil, epochStart.ErrNilMessenger @@ -71,6 +77,9 @@ func NewEpochStartMetaBlockProcessor( if minNumOfPeersToConsiderBlockValidConfig < minNumPeersToConsiderMetaBlockValid { return nil, epochStart.ErrNotEnoughNumOfPeersToConsiderBlockValid } + if check.IfNil(enableEpochsHandler) { + return nil, epochStart.ErrNilEnableEpochsHandler + } processor := &epochStartMetaBlockProcessor{ messenger: messenger, @@ -79,10 +88,12 @@ func NewEpochStartMetaBlockProcessor( hasher: hasher, minNumConnectedPeers: minNumConnectedPeersConfig, minNumOfPeersToConsiderBlockValid: minNumOfPeersToConsiderBlockValidConfig, + enableEpochsHandler: enableEpochsHandler, mutReceivedMetaBlocks: sync.RWMutex{}, mapReceivedMetaBlocks: make(map[string]data.MetaHeaderHandler), mapMetaBlocksFromPeers: make(map[string][]core.PeerID), - chanConsensusReached: make(chan bool, 1), + chanConfMetaBlockReached: make(chan bool, 1), + chanMetaBlockReached: make(chan bool, 1), } processor.waitForEnoughNumConnectedPeers(messenger) @@ -136,22 +147,47 @@ func (e *epochStartMetaBlockProcessor) Save(data process.InterceptedData, fromCo return nil } - if !metaBlock.IsStartOfEpochBlock() { - log.Debug("received metablock is not of type epoch start", "error", epochStart.ErrNotEpochStartBlock) + mbHash := interceptedHdr.Hash() + + if metaBlock.IsStartOfEpochBlock() { + log.Debug("received epoch start meta", "epoch", metaBlock.GetEpoch(), "from peer", fromConnectedPeer.Pretty()) + e.mutReceivedMetaBlocks.Lock() + e.mapReceivedMetaBlocks[string(mbHash)] = metaBlock + e.addToPeerList(string(mbHash), fromConnectedPeer) + e.mutReceivedMetaBlocks.Unlock() + return nil } - mbHash := interceptedHdr.Hash() + if e.isEpochStartConfirmationBlockWithEquivalentMessages(metaBlock) { + log.Debug("received epoch start confirmation meta", "epoch", metaBlock.GetEpoch(), "from peer", fromConnectedPeer.Pretty()) + e.chanConfMetaBlockReached <- true - log.Debug("received epoch start meta", "epoch", metaBlock.GetEpoch(), "from peer", fromConnectedPeer.Pretty()) - e.mutReceivedMetaBlocks.Lock() - e.mapReceivedMetaBlocks[string(mbHash)] = metaBlock - e.addToPeerList(string(mbHash), fromConnectedPeer) - e.mutReceivedMetaBlocks.Unlock() + return nil + } + + log.Debug("received metablock is not of type epoch start", "error", epochStart.ErrNotEpochStartBlock) return nil } +func (e *epochStartMetaBlockProcessor) isEpochStartConfirmationBlockWithEquivalentMessages(metaBlock data.HeaderHandler) bool { + if !e.enableEpochsHandler.IsFlagEnabledInEpoch(common.EquivalentMessagesFlag, metaBlock.GetEpoch()) { + return false + } + + startOfEpochMetaBlock, err := e.getMostReceivedMetaBlock() + if err != nil { + return false + } + + if startOfEpochMetaBlock.GetNonce() != metaBlock.GetNonce()-1 { + return false + } + + return true +} + // this func should be called under mutex protection func (e *epochStartMetaBlockProcessor) addToPeerList(hash string, peer core.PeerID) { peersListForHash := e.mapMetaBlocksFromPeers[hash] @@ -180,16 +216,33 @@ func (e *epochStartMetaBlockProcessor) GetEpochStartMetaBlock(ctx context.Contex } }() - err = e.requestMetaBlock() + metaBlock, err := e.waitForMetaBlock(ctx) + if err != nil { + return nil, err + } + + if e.enableEpochsHandler.IsFlagEnabledInEpoch(common.EquivalentMessagesFlag, metaBlock.GetEpoch()) { + err = e.waitForConfMetaBlock(ctx, metaBlock) + if err != nil { + return nil, err + } + } + + return metaBlock, nil +} + +func (e *epochStartMetaBlockProcessor) waitForMetaBlock(ctx context.Context) (data.MetaHeaderHandler, error) { + err := e.requestMetaBlock() if err != nil { return nil, err } chanRequests := time.After(durationBetweenReRequests) chanCheckMaps := time.After(durationBetweenChecks) + for { select { - case <-e.chanConsensusReached: + case <-e.chanMetaBlockReached: return e.metaBlock, nil case <-ctx.Done(): return e.getMostReceivedMetaBlock() @@ -200,12 +253,40 @@ func (e *epochStartMetaBlockProcessor) GetEpochStartMetaBlock(ctx context.Contex } chanRequests = time.After(durationBetweenReRequests) case <-chanCheckMaps: - e.checkMaps() + e.checkMetaBlockMaps() chanCheckMaps = time.After(durationBetweenChecks) } } } +func (e *epochStartMetaBlockProcessor) waitForConfMetaBlock(ctx context.Context, metaBlock data.MetaHeaderHandler) error { + if check.IfNil(metaBlock) { + return epochStart.ErrNilMetaBlock + } + + err := e.requestConfirmationMetaBlock(metaBlock.GetNonce()) + if err != nil { + return err + } + + chanRequests := time.After(durationBetweenReRequests) + + for { + select { + case <-e.chanConfMetaBlockReached: + return nil + case <-ctx.Done(): + return epochStart.ErrTimeoutWaitingForMetaBlock + case <-chanRequests: + err = e.requestConfirmationMetaBlock(metaBlock.GetNonce()) + if err != nil { + return err + } + chanRequests = time.After(durationBetweenReRequests) + } + } +} + func (e *epochStartMetaBlockProcessor) getMostReceivedMetaBlock() (data.MetaHeaderHandler, error) { e.mutReceivedMetaBlocks.RLock() defer e.mutReceivedMetaBlocks.RUnlock() @@ -238,27 +319,48 @@ func (e *epochStartMetaBlockProcessor) requestMetaBlock() error { return nil } -func (e *epochStartMetaBlockProcessor) checkMaps() { +func (e *epochStartMetaBlockProcessor) requestConfirmationMetaBlock(nonce uint64) error { + numConnectedPeers := len(e.messenger.ConnectedPeers()) + err := e.requestHandler.SetNumPeersToQuery(factory.MetachainBlocksTopic, numConnectedPeers, numConnectedPeers) + if err != nil { + return err + } + + e.requestHandler.RequestMetaHeaderByNonce(nonce) + + return nil +} + +func (e *epochStartMetaBlockProcessor) checkMetaBlockMaps() { e.mutReceivedMetaBlocks.RLock() defer e.mutReceivedMetaBlocks.RUnlock() - for hash, peersList := range e.mapMetaBlocksFromPeers { + hash, metaBlockFound := e.checkReceivedMetaBlock(e.mapMetaBlocksFromPeers) + if metaBlockFound { + e.metaBlock = e.mapReceivedMetaBlocks[hash] + e.chanMetaBlockReached <- true + } +} + +func (e *epochStartMetaBlockProcessor) checkReceivedMetaBlock(blocksFromPeers map[string][]core.PeerID) (string, bool) { + for hash, peersList := range blocksFromPeers { log.Debug("metablock from peers", "num peers", len(peersList), "target", e.peerCountTarget, "hash", []byte(hash)) - found := e.processEntry(peersList, hash) - if found { - break + + metaBlockFound := e.processMetaBlockEntry(peersList, hash) + if metaBlockFound { + return hash, true } } + + return "", false } -func (e *epochStartMetaBlockProcessor) processEntry( +func (e *epochStartMetaBlockProcessor) processMetaBlockEntry( peersList []core.PeerID, hash string, ) bool { if len(peersList) >= e.peerCountTarget { log.Info("got consensus for epoch start metablock", "len", len(peersList)) - e.metaBlock = e.mapReceivedMetaBlocks[hash] - e.chanConsensusReached <- true return true } diff --git a/epochStart/bootstrap/epochStartMetaBlockProcessor_test.go b/epochStart/bootstrap/epochStartMetaBlockProcessor_test.go index 1741c63a25c..200c3f408a3 100644 --- a/epochStart/bootstrap/epochStartMetaBlockProcessor_test.go +++ b/epochStart/bootstrap/epochStartMetaBlockProcessor_test.go @@ -9,9 +9,11 @@ import ( "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/data/block" + "github.com/multiversx/mx-chain-go/common" "github.com/multiversx/mx-chain-go/epochStart" "github.com/multiversx/mx-chain-go/epochStart/mock" "github.com/multiversx/mx-chain-go/testscommon" + "github.com/multiversx/mx-chain-go/testscommon/enableEpochsHandlerMock" "github.com/multiversx/mx-chain-go/testscommon/hashingMocks" "github.com/multiversx/mx-chain-go/testscommon/p2pmocks" "github.com/stretchr/testify/assert" @@ -28,6 +30,7 @@ func TestNewEpochStartMetaBlockProcessor_NilMessengerShouldErr(t *testing.T) { 50, 3, 3, + &enableEpochsHandlerMock.EnableEpochsHandlerStub{}, ) assert.Equal(t, epochStart.ErrNilMessenger, err) @@ -45,6 +48,7 @@ func TestNewEpochStartMetaBlockProcessor_NilRequestHandlerShouldErr(t *testing.T 50, 3, 3, + &enableEpochsHandlerMock.EnableEpochsHandlerStub{}, ) assert.Equal(t, epochStart.ErrNilRequestHandler, err) @@ -62,6 +66,7 @@ func TestNewEpochStartMetaBlockProcessor_NilMarshalizerShouldErr(t *testing.T) { 50, 3, 3, + &enableEpochsHandlerMock.EnableEpochsHandlerStub{}, ) assert.Equal(t, epochStart.ErrNilMarshalizer, err) @@ -79,6 +84,7 @@ func TestNewEpochStartMetaBlockProcessor_NilHasherShouldErr(t *testing.T) { 50, 3, 3, + &enableEpochsHandlerMock.EnableEpochsHandlerStub{}, ) assert.Equal(t, epochStart.ErrNilHasher, err) @@ -96,6 +102,7 @@ func TestNewEpochStartMetaBlockProcessor_InvalidConsensusPercentageShouldErr(t * 101, 3, 3, + &enableEpochsHandlerMock.EnableEpochsHandlerStub{}, ) assert.Equal(t, epochStart.ErrInvalidConsensusThreshold, err) @@ -116,6 +123,7 @@ func TestNewEpochStartMetaBlockProcessorOkValsShouldWork(t *testing.T) { 50, 3, 3, + &enableEpochsHandlerMock.EnableEpochsHandlerStub{}, ) assert.NoError(t, err) @@ -152,6 +160,7 @@ func TestNewEpochStartMetaBlockProcessorOkValsShouldWorkAfterMoreTriesWaitingFor 50, 3, 3, + &enableEpochsHandlerMock.EnableEpochsHandlerStub{}, ) assert.NoError(t, err) @@ -172,6 +181,7 @@ func TestEpochStartMetaBlockProcessor_Validate(t *testing.T) { 50, 3, 3, + &enableEpochsHandlerMock.EnableEpochsHandlerStub{}, ) assert.Nil(t, esmbp.Validate(nil, "")) @@ -191,6 +201,7 @@ func TestEpochStartMetaBlockProcessor_SaveNilInterceptedDataShouldNotReturnError 50, 3, 3, + &enableEpochsHandlerMock.EnableEpochsHandlerStub{}, ) err := esmbp.Save(nil, "peer0", "") @@ -212,6 +223,7 @@ func TestEpochStartMetaBlockProcessor_SaveOkInterceptedDataShouldWork(t *testing 50, 3, 3, + &enableEpochsHandlerMock.EnableEpochsHandlerStub{}, ) assert.Zero(t, len(esmbp.GetMapMetaBlock())) @@ -241,6 +253,7 @@ func TestEpochStartMetaBlockProcessor_GetEpochStartMetaBlockShouldTimeOut(t *tes 50, 3, 3, + &enableEpochsHandlerMock.EnableEpochsHandlerStub{}, ) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) @@ -264,21 +277,30 @@ func TestEpochStartMetaBlockProcessor_GetEpochStartMetaBlockShouldReturnMostRece &hashingMocks.HasherMock{}, 99, 3, - 3, + 5, + &enableEpochsHandlerMock.EnableEpochsHandlerStub{}, ) expectedMetaBlock := &block.MetaBlock{ Nonce: 10, EpochStart: block.EpochStart{LastFinalizedHeaders: []block.EpochStartShardData{{Round: 1}}}, } + confirmationMetaBlock := &block.MetaBlock{ + Nonce: 11, + } intData := mock.NewInterceptedMetaBlockMock(expectedMetaBlock, []byte("hash")) + intData2 := mock.NewInterceptedMetaBlockMock(confirmationMetaBlock, []byte("hash2")) for i := 0; i < esmbp.minNumOfPeersToConsiderBlockValid; i++ { _ = esmbp.Save(intData, core.PeerID(fmt.Sprintf("peer_%d", i)), "") } + for i := 0; i < esmbp.minNumOfPeersToConsiderBlockValid; i++ { + _ = esmbp.Save(intData2, core.PeerID(fmt.Sprintf("peer_%d", i)), "") + } + // we need a slightly more time than 1 second in order to also properly test the select branches - timeout := time.Second + time.Millisecond*500 + timeout := 2*time.Second + time.Millisecond*500 ctx, cancel := context.WithTimeout(context.Background(), timeout) mb, err := esmbp.GetEpochStartMetaBlock(ctx) cancel() @@ -301,18 +323,27 @@ func TestEpochStartMetaBlockProcessor_GetEpochStartMetaBlockShouldWorkFromFirstT 50, 3, 3, + &enableEpochsHandlerMock.EnableEpochsHandlerStub{}, ) expectedMetaBlock := &block.MetaBlock{ Nonce: 10, EpochStart: block.EpochStart{LastFinalizedHeaders: []block.EpochStartShardData{{Round: 1}}}, } + confirmationMetaBlock := &block.MetaBlock{ + Nonce: 11, + } intData := mock.NewInterceptedMetaBlockMock(expectedMetaBlock, []byte("hash")) + intData2 := mock.NewInterceptedMetaBlockMock(confirmationMetaBlock, []byte("hash2")) for i := 0; i < 6; i++ { _ = esmbp.Save(intData, core.PeerID(fmt.Sprintf("peer_%d", i)), "") } + for i := 0; i < 6; i++ { + _ = esmbp.Save(intData2, core.PeerID(fmt.Sprintf("peer_%d", i)), "") + } + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) mb, err := esmbp.GetEpochStartMetaBlock(ctx) cancel() @@ -320,19 +351,53 @@ func TestEpochStartMetaBlockProcessor_GetEpochStartMetaBlockShouldWorkFromFirstT assert.Equal(t, expectedMetaBlock, mb) } -func TestEpochStartMetaBlockProcessor_GetEpochStartMetaBlockShouldWorkAfterMultipleTries(t *testing.T) { +func TestEpochStartMetaBlockProcessor_GetEpochStartMetaBlock_BeforeEquivalentMessages(t *testing.T) { t.Parallel() - testEpochStartMbIsReceivedWithSleepBetweenReceivedMessages(t, durationBetweenChecks-10*time.Millisecond) + tts := durationBetweenChecks - 10*time.Millisecond + + esmbp, _ := NewEpochStartMetaBlockProcessor( + &p2pmocks.MessengerStub{ + ConnectedPeersCalled: func() []core.PeerID { + return []core.PeerID{"peer_0", "peer_1", "peer_2", "peer_3", "peer_4", "peer_5"} + }, + }, + &testscommon.RequestHandlerStub{}, + &mock.MarshalizerMock{}, + &hashingMocks.HasherMock{}, + 64, + 3, + 3, + &enableEpochsHandlerMock.EnableEpochsHandlerStub{}, + ) + expectedMetaBlock := &block.MetaBlock{ + Nonce: 10, + EpochStart: block.EpochStart{LastFinalizedHeaders: []block.EpochStartShardData{{Round: 1}}}, + } + intData := mock.NewInterceptedMetaBlockMock(expectedMetaBlock, []byte("hash")) + + go func() { + index := 0 + for { + time.Sleep(tts) + _ = esmbp.Save(intData, core.PeerID(fmt.Sprintf("peer_%d", index)), "") + _ = esmbp.Save(intData, core.PeerID(fmt.Sprintf("peer_%d", index+1)), "") + index += 2 + } + }() + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + mb, err := esmbp.GetEpochStartMetaBlock(ctx) + cancel() + assert.NoError(t, err) + assert.Equal(t, expectedMetaBlock, mb) } -func TestEpochStartMetaBlockProcessor_GetEpochStartMetaBlockShouldWorkAfterMultipleRequests(t *testing.T) { +func TestEpochStartMetaBlockProcessor_GetEpochStartMetaBlock_AfterEquivalentMessages(t *testing.T) { t.Parallel() - testEpochStartMbIsReceivedWithSleepBetweenReceivedMessages(t, durationBetweenChecks-10*time.Millisecond) -} + tts := durationBetweenChecks - 10*time.Millisecond -func testEpochStartMbIsReceivedWithSleepBetweenReceivedMessages(t *testing.T, tts time.Duration) { esmbp, _ := NewEpochStartMetaBlockProcessor( &p2pmocks.MessengerStub{ ConnectedPeersCalled: func() []core.PeerID { @@ -345,12 +410,23 @@ func testEpochStartMbIsReceivedWithSleepBetweenReceivedMessages(t *testing.T, tt 64, 3, 3, + &enableEpochsHandlerMock.EnableEpochsHandlerStub{ + IsFlagEnabledInEpochCalled: func(flag core.EnableEpochFlag, epoch uint32) bool { + return flag == common.EquivalentMessagesFlag + }, + }, ) expectedMetaBlock := &block.MetaBlock{ Nonce: 10, EpochStart: block.EpochStart{LastFinalizedHeaders: []block.EpochStartShardData{{Round: 1}}}, } intData := mock.NewInterceptedMetaBlockMock(expectedMetaBlock, []byte("hash")) + + confirmationMetaBlock := &block.MetaBlock{ + Nonce: 11, + } + intData2 := mock.NewInterceptedMetaBlockMock(confirmationMetaBlock, []byte("hash2")) + go func() { index := 0 for { @@ -360,6 +436,17 @@ func testEpochStartMbIsReceivedWithSleepBetweenReceivedMessages(t *testing.T, tt index += 2 } }() + + go func() { + index := 0 + for { + time.Sleep(tts) + _ = esmbp.Save(intData2, core.PeerID(fmt.Sprintf("peer_%d", index)), "") + _ = esmbp.Save(intData2, core.PeerID(fmt.Sprintf("peer_%d", index+1)), "") + index += 2 + } + }() + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) mb, err := esmbp.GetEpochStartMetaBlock(ctx) cancel() diff --git a/epochStart/bootstrap/interface.go b/epochStart/bootstrap/interface.go index bfc293032ee..7a128098059 100644 --- a/epochStart/bootstrap/interface.go +++ b/epochStart/bootstrap/interface.go @@ -49,6 +49,7 @@ type Messenger interface { // RequestHandler defines which methods a request handler should implement type RequestHandler interface { RequestStartOfEpochMetaBlock(epoch uint32) + RequestMetaHeaderByNonce(nonce uint64) SetNumPeersToQuery(topic string, intra int, cross int) error GetNumPeersToQuery(topic string) (int, int, error) IsInterfaceNil() bool diff --git a/epochStart/bootstrap/process.go b/epochStart/bootstrap/process.go index c1c4eb8d4df..91d40db1a8d 100644 --- a/epochStart/bootstrap/process.go +++ b/epochStart/bootstrap/process.go @@ -122,6 +122,8 @@ type epochStartBootstrap struct { nodeProcessingMode common.NodeProcessingMode nodeOperationMode common.NodeOperation stateStatsHandler common.StateStatisticsHandler + enableEpochsHandler common.EnableEpochsHandler + // created components requestHandler process.RequestHandler mainInterceptorContainer process.InterceptorsContainer @@ -193,6 +195,7 @@ type ArgsEpochStartBootstrap struct { NodeProcessingMode common.NodeProcessingMode StateStatsHandler common.StateStatisticsHandler NodesCoordinatorRegistryFactory nodesCoordinator.NodesCoordinatorRegistryFactory + EnableEpochsHandler common.EnableEpochsHandler InterceptedDataVerifierFactory process.InterceptedDataVerifierFactory } @@ -246,6 +249,7 @@ func NewEpochStartBootstrap(args ArgsEpochStartBootstrap) (*epochStartBootstrap, stateStatsHandler: args.StateStatsHandler, startEpoch: args.GeneralConfig.EpochStartConfig.GenesisEpoch, nodesCoordinatorRegistryFactory: args.NodesCoordinatorRegistryFactory, + enableEpochsHandler: args.EnableEpochsHandler, interceptedDataVerifierFactory: args.InterceptedDataVerifierFactory, } @@ -552,6 +556,7 @@ func (e *epochStartBootstrap) prepareComponentsToSyncFromNetwork() error { thresholdForConsideringMetaBlockCorrect, epochStartConfig.MinNumConnectedPeersToStart, epochStartConfig.MinNumOfPeersToConsiderBlockValid, + e.enableEpochsHandler, ) if err != nil { return err @@ -673,7 +678,7 @@ func (e *epochStartBootstrap) syncHeadersFrom(meta data.MetaHeaderHandler) (map[ return syncedHeaders, nil } -// Bootstrap will handle requesting and receiving the needed information the node will bootstrap from +// requestAndProcessing will handle requesting and receiving the needed information the node will bootstrap from func (e *epochStartBootstrap) requestAndProcessing() (Parameters, error) { var err error e.baseData.numberOfShards = uint32(len(e.epochStartMeta.GetEpochStartHandler().GetLastFinalizedHeaderHandlers())) diff --git a/epochStart/bootstrap/process_test.go b/epochStart/bootstrap/process_test.go index 67d6a9d1295..e38737a7a3e 100644 --- a/epochStart/bootstrap/process_test.go +++ b/epochStart/bootstrap/process_test.go @@ -254,6 +254,7 @@ func createMockEpochStartBootstrapArgs( }, TrieSyncStatisticsProvider: &testscommon.SizeSyncStatisticsHandlerStub{}, StateStatsHandler: disabledStatistics.NewStateStatistics(), + EnableEpochsHandler: &enableEpochsHandlerMock.EnableEpochsHandlerStub{}, InterceptedDataVerifierFactory: &processMock.InterceptedDataVerifierFactoryMock{}, } } @@ -991,6 +992,9 @@ func TestCreateSyncers(t *testing.T) { HeartbeatsCalled: func() storage.Cacher { return cache.NewCacherStub() }, + ProofsCalled: func() dataRetriever.ProofsPool { + return &dataRetrieverMock.ProofsPoolMock{} + }, } epochStartProvider.whiteListHandler = &testscommon.WhiteListHandlerStub{} epochStartProvider.whiteListerVerifiedTxs = &testscommon.WhiteListHandlerStub{} @@ -2407,6 +2411,9 @@ func TestSyncSetGuardianTransaction(t *testing.T) { HeartbeatsCalled: func() storage.Cacher { return cache.NewCacherStub() }, + ProofsCalled: func() dataRetriever.ProofsPool { + return &dataRetrieverMock.ProofsPoolMock{} + }, } epochStartProvider.whiteListHandler = &testscommon.WhiteListHandlerStub{ IsWhiteListedCalled: func(interceptedData process.InterceptedData) bool { diff --git a/factory/bootstrap/bootstrapComponents.go b/factory/bootstrap/bootstrapComponents.go index 1c3500f9599..af6dc1aafa3 100644 --- a/factory/bootstrap/bootstrapComponents.go +++ b/factory/bootstrap/bootstrapComponents.go @@ -226,6 +226,7 @@ func (bcf *bootstrapComponentsFactory) Create() (*bootstrapComponents, error) { NodeProcessingMode: common.GetNodeProcessingMode(&bcf.importDbConfig), StateStatsHandler: bcf.statusCoreComponents.StateStatsHandler(), NodesCoordinatorRegistryFactory: nodesCoordinatorRegistryFactory, + EnableEpochsHandler: bcf.coreComponents.EnableEpochsHandler(), } var epochStartBootstrapper factory.EpochStartBootstrapper diff --git a/factory/consensus/consensusComponents.go b/factory/consensus/consensusComponents.go index d824f80ea64..170638a7268 100644 --- a/factory/consensus/consensusComponents.go +++ b/factory/consensus/consensusComponents.go @@ -499,6 +499,7 @@ func (ccf *consensusComponentsFactory) createShardBootstrapper() (process.Bootst ScheduledTxsExecutionHandler: ccf.processComponents.ScheduledTxsExecutionHandler(), ProcessWaitTime: time.Duration(ccf.config.GeneralSettings.SyncProcessTimeInMillis) * time.Millisecond, RepopulateTokensSupplies: ccf.flagsConfig.RepopulateTokensSupplies, + EnableEpochsHandler: ccf.coreComponents.EnableEpochsHandler(), } argsShardBootstrapper := sync.ArgShardBootstrapper{ @@ -629,6 +630,7 @@ func (ccf *consensusComponentsFactory) createMetaChainBootstrapper() (process.Bo ScheduledTxsExecutionHandler: ccf.processComponents.ScheduledTxsExecutionHandler(), ProcessWaitTime: time.Duration(ccf.config.GeneralSettings.SyncProcessTimeInMillis) * time.Millisecond, RepopulateTokensSupplies: ccf.flagsConfig.RepopulateTokensSupplies, + EnableEpochsHandler: ccf.coreComponents.EnableEpochsHandler(), } argsMetaBootstrapper := sync.ArgMetaBootstrapper{ diff --git a/integrationTests/multiShard/endOfEpoch/startInEpoch/startInEpoch_test.go b/integrationTests/multiShard/endOfEpoch/startInEpoch/startInEpoch_test.go index ba33c406f15..27c963a9747 100644 --- a/integrationTests/multiShard/endOfEpoch/startInEpoch/startInEpoch_test.go +++ b/integrationTests/multiShard/endOfEpoch/startInEpoch/startInEpoch_test.go @@ -35,6 +35,7 @@ import ( "github.com/multiversx/mx-chain-go/storage/storageunit" "github.com/multiversx/mx-chain-go/testscommon" "github.com/multiversx/mx-chain-go/testscommon/chainParameters" + "github.com/multiversx/mx-chain-go/testscommon/enableEpochsHandlerMock" epochNotifierMock "github.com/multiversx/mx-chain-go/testscommon/epochNotifier" "github.com/multiversx/mx-chain-go/testscommon/genericMocks" "github.com/multiversx/mx-chain-go/testscommon/genesisMocks" @@ -289,6 +290,7 @@ func testNodeStartsInEpoch(t *testing.T, shardID uint32, expectedHighestRound ui }, TrieSyncStatisticsProvider: &testscommon.SizeSyncStatisticsHandlerStub{}, StateStatsHandler: disabled.NewStateStatistics(), + EnableEpochsHandler: &enableEpochsHandlerMock.EnableEpochsHandlerStub{}, InterceptedDataVerifierFactory: interceptorsFactory.NewInterceptedDataVerifierFactory(interceptorDataVerifierArgs), } diff --git a/integrationTests/sync/basicSync/basicSync_test.go b/integrationTests/sync/basicSync/basicSync_test.go index ebdd4a2599d..1dfb82dcf80 100644 --- a/integrationTests/sync/basicSync/basicSync_test.go +++ b/integrationTests/sync/basicSync/basicSync_test.go @@ -20,7 +20,6 @@ func TestSyncWorksInShard_EmptyBlocksNoForks(t *testing.T) { if testing.Short() { t.Skip("this is not a short test") } - maxShards := uint32(1) shardId := uint32(0) numNodesPerShard := 6 @@ -198,3 +197,89 @@ func testAllNodesHaveSameLastBlock(t *testing.T, nodes []*integrationTests.TestP assert.Equal(t, 1, len(mapBlocksByHash)) } + +func TestSyncWorksInShard_EmptyBlocksNoForks_With_EquivalentProofs(t *testing.T) { + if testing.Short() { + t.Skip("this is not a short test") + } + + maxShards := uint32(1) + shardId := uint32(0) + numNodesPerShard := 3 + + enableEpochs := integrationTests.CreateEnableEpochsConfig() + enableEpochs.EquivalentMessagesEnableEpoch = uint32(0) + + nodes := make([]*integrationTests.TestProcessorNode, numNodesPerShard+1) + connectableNodes := make([]integrationTests.Connectable, 0) + for i := 0; i < numNodesPerShard; i++ { + nodes[i] = integrationTests.NewTestProcessorNode(integrationTests.ArgTestProcessorNode{ + MaxShards: maxShards, + NodeShardId: shardId, + TxSignPrivKeyShardId: shardId, + WithSync: true, + EpochsConfig: &enableEpochs, + }) + connectableNodes = append(connectableNodes, nodes[i]) + } + + metachainNode := integrationTests.NewTestProcessorNode(integrationTests.ArgTestProcessorNode{ + MaxShards: maxShards, + NodeShardId: core.MetachainShardId, + TxSignPrivKeyShardId: shardId, + WithSync: true, + }) + idxProposerMeta := numNodesPerShard + nodes[idxProposerMeta] = metachainNode + connectableNodes = append(connectableNodes, metachainNode) + + idxProposerShard0 := 0 + leaders := []*integrationTests.TestProcessorNode{nodes[idxProposerShard0], nodes[idxProposerMeta]} + + integrationTests.ConnectNodes(connectableNodes) + + defer func() { + for _, n := range nodes { + n.Close() + } + }() + + for _, n := range nodes { + _ = n.StartSync() + } + + fmt.Println("Delaying for nodes p2p bootstrap...") + time.Sleep(integrationTests.P2pBootstrapDelay) + + round := uint64(0) + nonce := uint64(0) + round = integrationTests.IncrementAndPrintRound(round) + integrationTests.UpdateRound(nodes, round) + nonce++ + + numRoundsToTest := 5 + for i := 0; i < numRoundsToTest; i++ { + integrationTests.ProposeBlock(nodes, leaders, round, nonce) + + time.Sleep(integrationTests.SyncDelay) + + round = integrationTests.IncrementAndPrintRound(round) + integrationTests.UpdateRound(nodes, round) + nonce++ + } + + time.Sleep(integrationTests.SyncDelay) + + expectedNonce := nodes[0].BlockChain.GetCurrentBlockHeader().GetNonce() + for i := 1; i < len(nodes); i++ { + if check.IfNil(nodes[i].BlockChain.GetCurrentBlockHeader()) { + assert.Fail(t, fmt.Sprintf("Node with idx %d does not have a current block", i)) + } else { + if i == idxProposerMeta { // metachain node has highest nonce since it's single node and it did not synced the header + assert.Equal(t, expectedNonce, nodes[i].BlockChain.GetCurrentBlockHeader().GetNonce()) + } else { // shard nodes have not managed to sync last header since there is no proof for it; in the complete flow, when nodes will be fully sinced they will get current header directly from consensus, so they will receive the proof for header + assert.Equal(t, expectedNonce-1, nodes[i].BlockChain.GetCurrentBlockHeader().GetNonce()) + } + } + } +} diff --git a/integrationTests/testProcessorNode.go b/integrationTests/testProcessorNode.go index b0040660d13..6416f8b6c7c 100644 --- a/integrationTests/testProcessorNode.go +++ b/integrationTests/testProcessorNode.go @@ -46,6 +46,7 @@ import ( "github.com/multiversx/mx-chain-go/consensus/spos/sposFactory" "github.com/multiversx/mx-chain-go/dataRetriever" "github.com/multiversx/mx-chain-go/dataRetriever/blockchain" + proofscache "github.com/multiversx/mx-chain-go/dataRetriever/dataPool/proofsCache" "github.com/multiversx/mx-chain-go/dataRetriever/factory/containers" requesterscontainer "github.com/multiversx/mx-chain-go/dataRetriever/factory/requestersContainer" "github.com/multiversx/mx-chain-go/dataRetriever/factory/resolverscontainer" @@ -308,6 +309,7 @@ type ArgTestProcessorNode struct { StatusMetrics external.StatusMetricsHandler WithPeersRatingHandler bool NodeOperationMode common.NodeOperation + Proofs dataRetriever.ProofsPool } // TestProcessorNode represents a container type of class used in integration tests @@ -334,6 +336,7 @@ type TestProcessorNode struct { TrieContainer common.TriesHolder BlockChain data.ChainHandler GenesisBlocks map[uint32]data.HeaderHandler + ProofsPool dataRetriever.ProofsPool EconomicsData *economics.TestEconomicsData RatingsData *rating.RatingsData @@ -1087,7 +1090,8 @@ func (tpn *TestProcessorNode) InitializeProcessors(gasMap map[string]map[string] } func (tpn *TestProcessorNode) initDataPools() { - tpn.DataPool = dataRetrieverMock.CreatePoolsHolder(1, tpn.ShardCoordinator.SelfId()) + tpn.ProofsPool = proofscache.NewProofsPool() + tpn.DataPool = dataRetrieverMock.CreatePoolsHolderWithProofsPool(1, tpn.ShardCoordinator.SelfId(), tpn.ProofsPool) cacherCfg := storageunit.CacheConfig{Capacity: 10000, Type: storageunit.LRUCache, Shards: 1} suCache, _ := storageunit.NewCache(cacherCfg) tpn.WhiteListHandler, _ = interceptors.NewWhiteListDataVerifier(suCache) @@ -2757,6 +2761,20 @@ func (tpn *TestProcessorNode) ProposeBlock(round uint64, nonce uint64) (data.Bod return nil, nil, nil } + previousProof := &dataBlock.HeaderProof{ + PubKeysBitmap: []byte{1}, + AggregatedSignature: sig, + HeaderHash: currHdrHash, + HeaderEpoch: currHdr.GetEpoch(), + HeaderNonce: currHdr.GetNonce(), + HeaderShardId: currHdr.GetShardID(), + } + blockHeader.SetPreviousProof(previousProof) + + _ = tpn.ProofsPool.AddProof(previousProof) + + log.Error("added proof", "currHdrHash", currHdrHash, "node", tpn.OwnAccount.Address) + genesisRound := tpn.BlockChain.GetGenesisHeader().GetRound() err = blockHeader.SetTimeStamp((round - genesisRound) * uint64(tpn.RoundHandler.TimeDuration().Seconds())) if err != nil { @@ -3431,7 +3449,11 @@ func GetDefaultStatusComponents() *mock.StatusComponentsStub { func getDefaultBootstrapComponents(shardCoordinator sharding.Coordinator) *mainFactoryMocks.BootstrapComponentsStub { var versionedHeaderFactory nodeFactory.VersionedHeaderFactory - headerVersionHandler := &testscommon.HeaderVersionHandlerStub{} + headerVersionHandler := &testscommon.HeaderVersionHandlerStub{ + GetVersionCalled: func(epoch uint32) string { + return "2" + }, + } versionedHeaderFactory, _ = hdrFactory.NewShardHeaderFactory(headerVersionHandler) if shardCoordinator.SelfId() == core.MetachainShardId { versionedHeaderFactory, _ = hdrFactory.NewMetaHeaderFactory(headerVersionHandler) diff --git a/integrationTests/testSyncNode.go b/integrationTests/testSyncNode.go index b28d5e3f953..31c2ac46111 100644 --- a/integrationTests/testSyncNode.go +++ b/integrationTests/testSyncNode.go @@ -176,6 +176,7 @@ func (tpn *TestProcessorNode) createShardBootstrapper() (TestBootstrapper, error ScheduledTxsExecutionHandler: &testscommon.ScheduledTxsExecutionStub{}, ProcessWaitTime: tpn.RoundHandler.TimeDuration(), RepopulateTokensSupplies: false, + EnableEpochsHandler: tpn.EnableEpochsHandler, } argsShardBootstrapper := sync.ArgShardBootstrapper{ @@ -222,6 +223,7 @@ func (tpn *TestProcessorNode) createMetaChainBootstrapper() (TestBootstrapper, e ScheduledTxsExecutionHandler: &testscommon.ScheduledTxsExecutionStub{}, ProcessWaitTime: tpn.RoundHandler.TimeDuration(), RepopulateTokensSupplies: false, + EnableEpochsHandler: &enableEpochsHandlerMock.EnableEpochsHandlerStub{}, } argsMetaBootstrapper := sync.ArgMetaBootstrapper{ diff --git a/process/block/interceptedBlocks/interceptedEquivalentProof.go b/process/block/interceptedBlocks/interceptedEquivalentProof.go index b1ddba19b67..a7937a5aef2 100644 --- a/process/block/interceptedBlocks/interceptedEquivalentProof.go +++ b/process/block/interceptedBlocks/interceptedEquivalentProof.go @@ -9,6 +9,8 @@ import ( "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-core-go/marshal" "github.com/multiversx/mx-chain-go/consensus" + "github.com/multiversx/mx-chain-go/dataRetriever" + proofscache "github.com/multiversx/mx-chain-go/dataRetriever/dataPool/proofsCache" "github.com/multiversx/mx-chain-go/process" "github.com/multiversx/mx-chain-go/sharding" logger "github.com/multiversx/mx-chain-logger-go" @@ -22,12 +24,14 @@ type ArgInterceptedEquivalentProof struct { Marshaller marshal.Marshalizer ShardCoordinator sharding.Coordinator HeaderSigVerifier consensus.HeaderSigVerifier + Proofs dataRetriever.ProofsPool } type interceptedEquivalentProof struct { proof *block.HeaderProof isForCurrentShard bool headerSigVerifier consensus.HeaderSigVerifier + proofsPool dataRetriever.ProofsPool } // NewInterceptedEquivalentProof returns a new instance of interceptedEquivalentProof @@ -46,6 +50,7 @@ func NewInterceptedEquivalentProof(args ArgInterceptedEquivalentProof) (*interce proof: equivalentProof, isForCurrentShard: extractIsForCurrentShard(args.ShardCoordinator, equivalentProof), headerSigVerifier: args.HeaderSigVerifier, + proofsPool: args.Proofs, }, nil } @@ -62,6 +67,9 @@ func checkArgInterceptedEquivalentProof(args ArgInterceptedEquivalentProof) erro if check.IfNil(args.HeaderSigVerifier) { return process.ErrNilHeaderSigVerifier } + if check.IfNil(args.Proofs) { + return process.ErrNilProofsPool + } return nil } @@ -101,6 +109,11 @@ func (iep *interceptedEquivalentProof) CheckValidity() error { return err } + ok := iep.proofsPool.HasProof(iep.proof.GetHeaderShardId(), iep.proof.GetHeaderHash()) + if ok { + return proofscache.ErrAlreadyExistingEquivalentProof + } + return iep.headerSigVerifier.VerifyHeaderProof(iep.proof) } diff --git a/process/block/interceptedBlocks/interceptedEquivalentProof_test.go b/process/block/interceptedBlocks/interceptedEquivalentProof_test.go index e46fa651634..b0a8cd6c9c9 100644 --- a/process/block/interceptedBlocks/interceptedEquivalentProof_test.go +++ b/process/block/interceptedBlocks/interceptedEquivalentProof_test.go @@ -9,8 +9,10 @@ import ( "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-go/consensus/mock" + proofscache "github.com/multiversx/mx-chain-go/dataRetriever/dataPool/proofsCache" "github.com/multiversx/mx-chain-go/process" "github.com/multiversx/mx-chain-go/testscommon/consensus" + "github.com/multiversx/mx-chain-go/testscommon/dataRetriever" "github.com/multiversx/mx-chain-go/testscommon/marshallerMock" logger "github.com/multiversx/mx-chain-logger-go" "github.com/stretchr/testify/require" @@ -41,6 +43,7 @@ func createMockArgInterceptedEquivalentProof() ArgInterceptedEquivalentProof { Marshaller: testMarshaller, ShardCoordinator: &mock.ShardCoordinatorMock{}, HeaderSigVerifier: &consensus.HeaderSigVerifierMock{}, + Proofs: &dataRetriever.ProofsPoolMock{}, } } @@ -93,6 +96,15 @@ func TestNewInterceptedEquivalentProof(t *testing.T) { require.Equal(t, process.ErrNilHeaderSigVerifier, err) require.Nil(t, iep) }) + t.Run("nil proofs pool should error", func(t *testing.T) { + t.Parallel() + + args := createMockArgInterceptedEquivalentProof() + args.Proofs = nil + iep, err := NewInterceptedEquivalentProof(args) + require.Equal(t, process.ErrNilProofsPool, err) + require.Nil(t, iep) + }) t.Run("unmarshal error should error", func(t *testing.T) { t.Parallel() @@ -134,6 +146,24 @@ func TestInterceptedEquivalentProof_CheckValidity(t *testing.T) { err = iep.CheckValidity() require.Equal(t, ErrInvalidProof, err) }) + + t.Run("already exiting proof should error", func(t *testing.T) { + t.Parallel() + + args := createMockArgInterceptedEquivalentProof() + args.Proofs = &dataRetriever.ProofsPoolMock{ + HasProofCalled: func(shardID uint32, headerHash []byte) bool { + return true + }, + } + + iep, err := NewInterceptedEquivalentProof(args) + require.NoError(t, err) + + err = iep.CheckValidity() + require.Equal(t, proofscache.ErrAlreadyExistingEquivalentProof, err) + }) + t.Run("should work", func(t *testing.T) { t.Parallel() diff --git a/process/errors.go b/process/errors.go index ba66ec2ab93..395ebf17620 100644 --- a/process/errors.go +++ b/process/errors.go @@ -239,6 +239,9 @@ var ErrNilMiniBlockPool = errors.New("nil mini block pool") // ErrNilMetaBlocksPool signals that a nil meta blocks pool was used var ErrNilMetaBlocksPool = errors.New("nil meta blocks pool") +// ErrNilProofsPool signals that a nil proofs pool was used +var ErrNilProofsPool = errors.New("nil proofs pool") + // ErrNilTxProcessor signals that a nil transactions processor was used var ErrNilTxProcessor = errors.New("nil transactions processor") @@ -1251,9 +1254,6 @@ var ErrNoMatchingConfigForProvidedEpoch = errors.New("no matching configuration" // ErrInvalidHeader is raised when header is invalid var ErrInvalidHeader = errors.New("header is invalid") -// ErrNilEquivalentProofsPool signals that a nil equivalent proofs pool has been provided -var ErrNilEquivalentProofsPool = errors.New("nil equivalent proofs pool") - // ErrNilHeaderProof signals that a nil header proof has been provided var ErrNilHeaderProof = errors.New("nil header proof") diff --git a/process/factory/interceptorscontainer/baseInterceptorsContainerFactory.go b/process/factory/interceptorscontainer/baseInterceptorsContainerFactory.go index b05e496475f..bc167e0dab5 100644 --- a/process/factory/interceptorscontainer/baseInterceptorsContainerFactory.go +++ b/process/factory/interceptorscontainer/baseInterceptorsContainerFactory.go @@ -21,7 +21,6 @@ import ( "github.com/multiversx/mx-chain-go/sharding/nodesCoordinator" "github.com/multiversx/mx-chain-go/state" "github.com/multiversx/mx-chain-go/storage" - "github.com/multiversx/mx-chain-go/testscommon/processMocks" ) const ( @@ -57,6 +56,7 @@ type baseInterceptorsContainerFactory struct { hardforkTrigger heartbeat.HardforkTrigger nodeOperationMode common.NodeOperation interceptedDataVerifierFactory process.InterceptedDataVerifierFactory + enableEpochsHandler common.EnableEpochsHandler } func checkBaseParams( @@ -424,8 +424,10 @@ func (bicf *baseInterceptorsContainerFactory) generateHeaderInterceptors() error } argProcessor := &processor.ArgHdrInterceptorProcessor{ - Headers: bicf.dataPool.Headers(), - BlockBlackList: bicf.blockBlackList, + Headers: bicf.dataPool.Headers(), + BlockBlackList: bicf.blockBlackList, + Proofs: bicf.dataPool.Proofs(), + EnableEpochsHandler: bicf.enableEpochsHandler, } hdrProcessor, err := processor.NewHdrInterceptorProcessor(argProcessor) if err != nil { @@ -566,8 +568,10 @@ func (bicf *baseInterceptorsContainerFactory) generateMetachainHeaderInterceptor } argProcessor := &processor.ArgHdrInterceptorProcessor{ - Headers: bicf.dataPool.Headers(), - BlockBlackList: bicf.blockBlackList, + Headers: bicf.dataPool.Headers(), + BlockBlackList: bicf.blockBlackList, + Proofs: bicf.dataPool.Proofs(), + EnableEpochsHandler: bicf.enableEpochsHandler, } hdrProcessor, err := processor.NewHdrInterceptorProcessor(argProcessor) if err != nil { @@ -909,11 +913,11 @@ func (bicf *baseInterceptorsContainerFactory) generateValidatorInfoInterceptor() } func (bicf *baseInterceptorsContainerFactory) createOneShardEquivalentProofsInterceptor(topic string) (process.Interceptor, error) { - equivalentProofsFactory := interceptorFactory.NewInterceptedEquivalentProofsFactory(*bicf.argInterceptorFactory) + equivalentProofsFactory := interceptorFactory.NewInterceptedEquivalentProofsFactory(*bicf.argInterceptorFactory, bicf.dataPool.Proofs()) marshaller := bicf.argInterceptorFactory.CoreComponents.InternalMarshalizer() argProcessor := processor.ArgEquivalentProofsInterceptorProcessor{ - EquivalentProofsPool: &processMocks.EquivalentProofsPoolMock{}, // TODO: pass the real implementation when is done + EquivalentProofsPool: bicf.dataPool.Proofs(), Marshaller: marshaller, } equivalentProofsProcessor, err := processor.NewEquivalentProofsInterceptorProcessor(argProcessor) diff --git a/process/factory/interceptorscontainer/metaInterceptorsContainerFactory.go b/process/factory/interceptorscontainer/metaInterceptorsContainerFactory.go index 24b70570e84..e3c304b3f83 100644 --- a/process/factory/interceptorscontainer/metaInterceptorsContainerFactory.go +++ b/process/factory/interceptorscontainer/metaInterceptorsContainerFactory.go @@ -130,6 +130,7 @@ func NewMetaInterceptorsContainerFactory( hardforkTrigger: args.HardforkTrigger, nodeOperationMode: args.NodeOperationMode, interceptedDataVerifierFactory: args.InterceptedDataVerifierFactory, + enableEpochsHandler: args.CoreComponents.EnableEpochsHandler(), } icf := &metaInterceptorsContainerFactory{ @@ -264,8 +265,10 @@ func (micf *metaInterceptorsContainerFactory) createOneShardHeaderInterceptor(to } argProcessor := &processor.ArgHdrInterceptorProcessor{ - Headers: micf.dataPool.Headers(), - BlockBlackList: micf.blockBlackList, + Headers: micf.dataPool.Headers(), + BlockBlackList: micf.blockBlackList, + Proofs: micf.dataPool.Proofs(), + EnableEpochsHandler: micf.enableEpochsHandler, } hdrProcessor, err := processor.NewHdrInterceptorProcessor(argProcessor) if err != nil { diff --git a/process/factory/interceptorscontainer/metaInterceptorsContainerFactory_test.go b/process/factory/interceptorscontainer/metaInterceptorsContainerFactory_test.go index 927742f00fa..ec699e5803b 100644 --- a/process/factory/interceptorscontainer/metaInterceptorsContainerFactory_test.go +++ b/process/factory/interceptorscontainer/metaInterceptorsContainerFactory_test.go @@ -80,6 +80,9 @@ func createMetaDataPools() dataRetriever.PoolsHolder { RewardTransactionsCalled: func() dataRetriever.ShardedDataCacherNotifier { return testscommon.NewShardedDataStub() }, + ProofsCalled: func() dataRetriever.ProofsPool { + return &dataRetrieverMock.ProofsPoolMock{} + }, } return pools @@ -574,14 +577,15 @@ func TestMetaInterceptorsContainerFactory_CreateShouldWork(t *testing.T) { coreComp, cryptoComp := createMockComponentHolders() args := getArgumentsMeta(coreComp, cryptoComp) + args.InterceptedDataVerifierFactory = &mock.InterceptedDataVerifierFactoryMock{} icf, _ := interceptorscontainer.NewMetaInterceptorsContainerFactory(args) mainContainer, fullArchiveContainer, err := icf.Create() + require.Nil(t, err) assert.NotNil(t, mainContainer) assert.NotNil(t, fullArchiveContainer) - assert.Nil(t, err) } func TestMetaInterceptorsContainerFactory_With4ShardsShouldWork(t *testing.T) { diff --git a/process/factory/interceptorscontainer/shardInterceptorsContainerFactory.go b/process/factory/interceptorscontainer/shardInterceptorsContainerFactory.go index 7acd6d87e59..e3a4e639d5b 100644 --- a/process/factory/interceptorscontainer/shardInterceptorsContainerFactory.go +++ b/process/factory/interceptorscontainer/shardInterceptorsContainerFactory.go @@ -130,6 +130,7 @@ func NewShardInterceptorsContainerFactory( hardforkTrigger: args.HardforkTrigger, nodeOperationMode: args.NodeOperationMode, interceptedDataVerifierFactory: args.InterceptedDataVerifierFactory, + enableEpochsHandler: args.CoreComponents.EnableEpochsHandler(), } icf := &shardInterceptorsContainerFactory{ diff --git a/process/factory/interceptorscontainer/shardInterceptorsContainerFactory_test.go b/process/factory/interceptorscontainer/shardInterceptorsContainerFactory_test.go index 5d3ff1fcda1..d05099299d5 100644 --- a/process/factory/interceptorscontainer/shardInterceptorsContainerFactory_test.go +++ b/process/factory/interceptorscontainer/shardInterceptorsContainerFactory_test.go @@ -7,6 +7,7 @@ import ( "github.com/multiversx/mx-chain-core-go/core/versioning" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/multiversx/mx-chain-go/common" "github.com/multiversx/mx-chain-go/dataRetriever" @@ -90,6 +91,10 @@ func createShardDataPools() dataRetriever.PoolsHolder { pools.CurrBlockTxsCalled = func() dataRetriever.TransactionCacher { return &mock.TxForCurrentBlockStub{} } + pools.ProofsCalled = func() dataRetriever.ProofsPool { + return &dataRetrieverMock.ProofsPoolMock{} + } + return pools } @@ -581,10 +586,10 @@ func TestShardInterceptorsContainerFactory_CreateShouldWork(t *testing.T) { icf, _ := interceptorscontainer.NewShardInterceptorsContainerFactory(args) mainContainer, fullArchiveContainer, err := icf.Create() + require.Nil(t, err) assert.NotNil(t, mainContainer) assert.NotNil(t, fullArchiveContainer) - assert.Nil(t, err) } func TestShardInterceptorsContainerFactory_With4ShardsShouldWork(t *testing.T) { diff --git a/process/interceptors/factory/interceptedEquivalentProofsFactory.go b/process/interceptors/factory/interceptedEquivalentProofsFactory.go index 0a007fef3d6..4c5694d1e4d 100644 --- a/process/interceptors/factory/interceptedEquivalentProofsFactory.go +++ b/process/interceptors/factory/interceptedEquivalentProofsFactory.go @@ -3,6 +3,7 @@ package factory import ( "github.com/multiversx/mx-chain-core-go/marshal" "github.com/multiversx/mx-chain-go/consensus" + "github.com/multiversx/mx-chain-go/dataRetriever" "github.com/multiversx/mx-chain-go/process" "github.com/multiversx/mx-chain-go/process/block/interceptedBlocks" "github.com/multiversx/mx-chain-go/sharding" @@ -12,14 +13,16 @@ type interceptedEquivalentProofsFactory struct { marshaller marshal.Marshalizer shardCoordinator sharding.Coordinator headerSigVerifier consensus.HeaderSigVerifier + proofsPool dataRetriever.ProofsPool } // NewInterceptedEquivalentProofsFactory creates a new instance of interceptedEquivalentProofsFactory -func NewInterceptedEquivalentProofsFactory(args ArgInterceptedDataFactory) *interceptedEquivalentProofsFactory { +func NewInterceptedEquivalentProofsFactory(args ArgInterceptedDataFactory, proofsPool dataRetriever.ProofsPool) *interceptedEquivalentProofsFactory { return &interceptedEquivalentProofsFactory{ marshaller: args.CoreComponents.InternalMarshalizer(), shardCoordinator: args.ShardCoordinator, headerSigVerifier: args.HeaderSigVerifier, + proofsPool: proofsPool, } } @@ -30,6 +33,7 @@ func (factory *interceptedEquivalentProofsFactory) Create(buff []byte) (process. Marshaller: factory.marshaller, ShardCoordinator: factory.shardCoordinator, HeaderSigVerifier: factory.headerSigVerifier, + Proofs: factory.proofsPool, } return interceptedBlocks.NewInterceptedEquivalentProof(args) } diff --git a/process/interceptors/factory/interceptedEquivalentProofsFactory_test.go b/process/interceptors/factory/interceptedEquivalentProofsFactory_test.go index 9ee099b1c6a..c96ade9528b 100644 --- a/process/interceptors/factory/interceptedEquivalentProofsFactory_test.go +++ b/process/interceptors/factory/interceptedEquivalentProofsFactory_test.go @@ -8,6 +8,7 @@ import ( "github.com/multiversx/mx-chain-go/consensus/mock" processMock "github.com/multiversx/mx-chain-go/process/mock" "github.com/multiversx/mx-chain-go/testscommon/consensus" + "github.com/multiversx/mx-chain-go/testscommon/dataRetriever" "github.com/stretchr/testify/require" ) @@ -27,14 +28,14 @@ func TestInterceptedEquivalentProofsFactory_IsInterfaceNil(t *testing.T) { var factory *interceptedEquivalentProofsFactory require.True(t, factory.IsInterfaceNil()) - factory = NewInterceptedEquivalentProofsFactory(createMockArgInterceptedDataFactory()) + factory = NewInterceptedEquivalentProofsFactory(createMockArgInterceptedDataFactory(), &dataRetriever.ProofsPoolMock{}) require.False(t, factory.IsInterfaceNil()) } func TestNewInterceptedEquivalentProofsFactory(t *testing.T) { t.Parallel() - factory := NewInterceptedEquivalentProofsFactory(createMockArgInterceptedDataFactory()) + factory := NewInterceptedEquivalentProofsFactory(createMockArgInterceptedDataFactory(), &dataRetriever.ProofsPoolMock{}) require.NotNil(t, factory) } @@ -42,7 +43,7 @@ func TestInterceptedEquivalentProofsFactory_Create(t *testing.T) { t.Parallel() args := createMockArgInterceptedDataFactory() - factory := NewInterceptedEquivalentProofsFactory(args) + factory := NewInterceptedEquivalentProofsFactory(args, &dataRetriever.ProofsPoolMock{}) require.NotNil(t, factory) providedProof := &block.HeaderProof{ diff --git a/process/interceptors/processor/argHdrInterceptorProcessor.go b/process/interceptors/processor/argHdrInterceptorProcessor.go index 53e79b731b7..0f9616fb2cf 100644 --- a/process/interceptors/processor/argHdrInterceptorProcessor.go +++ b/process/interceptors/processor/argHdrInterceptorProcessor.go @@ -1,12 +1,15 @@ package processor import ( + "github.com/multiversx/mx-chain-go/common" "github.com/multiversx/mx-chain-go/dataRetriever" "github.com/multiversx/mx-chain-go/process" ) // ArgHdrInterceptorProcessor is the argument for the interceptor processor used for headers (shard, meta and so on) type ArgHdrInterceptorProcessor struct { - Headers dataRetriever.HeadersPool - BlockBlackList process.TimeCacher + Headers dataRetriever.HeadersPool + Proofs dataRetriever.ProofsPool + BlockBlackList process.TimeCacher + EnableEpochsHandler common.EnableEpochsHandler } diff --git a/process/interceptors/processor/equivalentProofsInterceptorProcessor.go b/process/interceptors/processor/equivalentProofsInterceptorProcessor.go index 8ce7f1c1e15..ef8beff12af 100644 --- a/process/interceptors/processor/equivalentProofsInterceptorProcessor.go +++ b/process/interceptors/processor/equivalentProofsInterceptorProcessor.go @@ -34,7 +34,7 @@ func NewEquivalentProofsInterceptorProcessor(args ArgEquivalentProofsInterceptor func checkArgsEquivalentProofs(args ArgEquivalentProofsInterceptorProcessor) error { if check.IfNil(args.EquivalentProofsPool) { - return process.ErrNilEquivalentProofsPool + return process.ErrNilProofsPool } if check.IfNil(args.Marshaller) { return process.ErrNilMarshalizer @@ -56,9 +56,7 @@ func (epip *equivalentProofsInterceptorProcessor) Save(data process.InterceptedD return process.ErrWrongTypeAssertion } - epip.equivalentProofsPool.AddNotarizedProof(interceptedProof.GetProof()) - - return nil + return epip.equivalentProofsPool.AddProof(interceptedProof.GetProof()) } // RegisterHandler registers a callback function to be notified of incoming equivalent proofs diff --git a/process/interceptors/processor/equivalentProofsInterceptorProcessor_test.go b/process/interceptors/processor/equivalentProofsInterceptorProcessor_test.go index 0f5bcbc0d9a..b11eca03aec 100644 --- a/process/interceptors/processor/equivalentProofsInterceptorProcessor_test.go +++ b/process/interceptors/processor/equivalentProofsInterceptorProcessor_test.go @@ -10,14 +10,14 @@ import ( "github.com/multiversx/mx-chain-go/process/block/interceptedBlocks" "github.com/multiversx/mx-chain-go/process/transaction" "github.com/multiversx/mx-chain-go/testscommon/consensus" + "github.com/multiversx/mx-chain-go/testscommon/dataRetriever" "github.com/multiversx/mx-chain-go/testscommon/marshallerMock" - "github.com/multiversx/mx-chain-go/testscommon/processMocks" "github.com/stretchr/testify/require" ) func createMockArgEquivalentProofsInterceptorProcessor() ArgEquivalentProofsInterceptorProcessor { return ArgEquivalentProofsInterceptorProcessor{ - EquivalentProofsPool: &processMocks.EquivalentProofsPoolMock{}, + EquivalentProofsPool: &dataRetriever.ProofsPoolMock{}, Marshaller: &marshallerMock.MarshalizerMock{}, } } @@ -42,7 +42,7 @@ func TestNewEquivalentProofsInterceptorProcessor(t *testing.T) { args.EquivalentProofsPool = nil epip, err := NewEquivalentProofsInterceptorProcessor(args) - require.Equal(t, process.ErrNilEquivalentProofsPool, err) + require.Equal(t, process.ErrNilProofsPool, err) require.Nil(t, epip) }) t.Run("nil Marshaller should error", func(t *testing.T) { @@ -91,9 +91,10 @@ func TestEquivalentProofsInterceptorProcessor_Save(t *testing.T) { wasCalled := false args := createMockArgEquivalentProofsInterceptorProcessor() - args.EquivalentProofsPool = &processMocks.EquivalentProofsPoolMock{ - AddNotarizedProofCalled: func(notarizedProof data.HeaderProofHandler) { + args.EquivalentProofsPool = &dataRetriever.ProofsPoolMock{ + AddProofCalled: func(notarizedProof data.HeaderProofHandler) error { wasCalled = true + return nil }, } epip, err := NewEquivalentProofsInterceptorProcessor(args) @@ -103,6 +104,7 @@ func TestEquivalentProofsInterceptorProcessor_Save(t *testing.T) { Marshaller: args.Marshaller, ShardCoordinator: &mock.ShardCoordinatorMock{}, HeaderSigVerifier: &consensus.HeaderSigVerifierMock{}, + Proofs: &dataRetriever.ProofsPoolMock{}, } argInterceptedEquivalentProof.DataBuff, _ = argInterceptedEquivalentProof.Marshaller.Marshal(&block.HeaderProof{ PubKeysBitmap: []byte("bitmap"), diff --git a/process/interceptors/processor/hdrInterceptorProcessor.go b/process/interceptors/processor/hdrInterceptorProcessor.go index b71d5b73e59..e60489c2ae5 100644 --- a/process/interceptors/processor/hdrInterceptorProcessor.go +++ b/process/interceptors/processor/hdrInterceptorProcessor.go @@ -1,11 +1,13 @@ package processor import ( + "reflect" "sync" "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/data" + "github.com/multiversx/mx-chain-go/common" "github.com/multiversx/mx-chain-go/dataRetriever" "github.com/multiversx/mx-chain-go/process" ) @@ -15,10 +17,12 @@ var _ process.InterceptorProcessor = (*HdrInterceptorProcessor)(nil) // HdrInterceptorProcessor is the processor used when intercepting headers // (shard headers, meta headers) structs which satisfy HeaderHandler interface. type HdrInterceptorProcessor struct { - headers dataRetriever.HeadersPool - blackList process.TimeCacher - registeredHandlers []func(topic string, hash []byte, data interface{}) - mutHandlers sync.RWMutex + headers dataRetriever.HeadersPool + proofs dataRetriever.ProofsPool + blackList process.TimeCacher + enableEpochsHandler common.EnableEpochsHandler + registeredHandlers []func(topic string, hash []byte, data interface{}) + mutHandlers sync.RWMutex } // NewHdrInterceptorProcessor creates a new TxInterceptorProcessor instance @@ -29,14 +33,22 @@ func NewHdrInterceptorProcessor(argument *ArgHdrInterceptorProcessor) (*HdrInter if check.IfNil(argument.Headers) { return nil, process.ErrNilCacher } + if check.IfNil(argument.Proofs) { + return nil, process.ErrNilProofsPool + } if check.IfNil(argument.BlockBlackList) { return nil, process.ErrNilBlackListCacher } + if check.IfNil(argument.EnableEpochsHandler) { + return nil, process.ErrNilEnableEpochsHandler + } return &HdrInterceptorProcessor{ - headers: argument.Headers, - blackList: argument.BlockBlackList, - registeredHandlers: make([]func(topic string, hash []byte, data interface{}), 0), + headers: argument.Headers, + proofs: argument.Proofs, + blackList: argument.BlockBlackList, + enableEpochsHandler: argument.EnableEpochsHandler, + registeredHandlers: make([]func(topic string, hash []byte, data interface{}), 0), }, nil } @@ -68,6 +80,13 @@ func (hip *HdrInterceptorProcessor) Save(data process.InterceptedData, _ core.Pe hip.headers.AddHeader(interceptedHdr.Hash(), interceptedHdr.HeaderHandler()) + if common.IsFlagEnabledAfterEpochsStartBlock(interceptedHdr.HeaderHandler(), hip.enableEpochsHandler, common.EquivalentMessagesFlag) { + err := hip.proofs.AddProof(interceptedHdr.HeaderHandler().GetPreviousProof()) + if err != nil { + log.Error("failed to add proof", "error", err, "intercepted header hash", interceptedHdr.Hash(), "header type", reflect.TypeOf(interceptedHdr.HeaderHandler())) + } + } + return nil } diff --git a/process/interceptors/processor/hdrInterceptorProcessor_test.go b/process/interceptors/processor/hdrInterceptorProcessor_test.go index 87fe3521ff7..cc35b04d06b 100644 --- a/process/interceptors/processor/hdrInterceptorProcessor_test.go +++ b/process/interceptors/processor/hdrInterceptorProcessor_test.go @@ -4,19 +4,25 @@ import ( "testing" "time" + "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/data" + "github.com/multiversx/mx-chain-go/common" "github.com/multiversx/mx-chain-go/process" "github.com/multiversx/mx-chain-go/process/interceptors/processor" "github.com/multiversx/mx-chain-go/process/mock" "github.com/multiversx/mx-chain-go/testscommon" + "github.com/multiversx/mx-chain-go/testscommon/dataRetriever" + "github.com/multiversx/mx-chain-go/testscommon/enableEpochsHandlerMock" "github.com/stretchr/testify/assert" ) func createMockHdrArgument() *processor.ArgHdrInterceptorProcessor { arg := &processor.ArgHdrInterceptorProcessor{ - Headers: &mock.HeadersCacherStub{}, - BlockBlackList: &testscommon.TimeCacheStub{}, + Headers: &mock.HeadersCacherStub{}, + Proofs: &dataRetriever.ProofsPoolMock{}, + BlockBlackList: &testscommon.TimeCacheStub{}, + EnableEpochsHandler: &enableEpochsHandlerMock.EnableEpochsHandlerStub{}, } return arg @@ -55,6 +61,28 @@ func TestNewHdrInterceptorProcessor_NilBlackListHandlerShouldErr(t *testing.T) { assert.Equal(t, process.ErrNilBlackListCacher, err) } +func TestNewHdrInterceptorProcessor_NilProofsPoolShouldErr(t *testing.T) { + t.Parallel() + + arg := createMockHdrArgument() + arg.Proofs = nil + hip, err := processor.NewHdrInterceptorProcessor(arg) + + assert.Nil(t, hip) + assert.Equal(t, process.ErrNilProofsPool, err) +} + +func TestNewHdrInterceptorProcessor_NilEnableEpochsHandlerShouldErr(t *testing.T) { + t.Parallel() + + arg := createMockHdrArgument() + arg.EnableEpochsHandler = nil + hip, err := processor.NewHdrInterceptorProcessor(arg) + + assert.Nil(t, hip) + assert.Equal(t, process.ErrNilEnableEpochsHandler, err) +} + func TestNewHdrInterceptorProcessor_ShouldWork(t *testing.T) { t.Parallel() @@ -165,6 +193,19 @@ func TestHdrInterceptorProcessor_SaveShouldWork(t *testing.T) { wasAddedHeaders = true }, } + arg.EnableEpochsHandler = &enableEpochsHandlerMock.EnableEpochsHandlerStub{ + IsFlagEnabledInEpochCalled: func(flag core.EnableEpochFlag, epoch uint32) bool { + return flag == common.EquivalentMessagesFlag + }, + } + + wasAddedProofs := false + arg.Proofs = &dataRetriever.ProofsPoolMock{ + AddProofCalled: func(headerProof data.HeaderProofHandler) error { + wasAddedProofs = true + return nil + }, + } hip, _ := processor.NewHdrInterceptorProcessor(arg) chanCalled := make(chan struct{}, 1) @@ -176,6 +217,7 @@ func TestHdrInterceptorProcessor_SaveShouldWork(t *testing.T) { assert.Nil(t, err) assert.True(t, wasAddedHeaders) + assert.True(t, wasAddedProofs) timeout := time.Second * 2 select { diff --git a/process/interceptors/processor/interface.go b/process/interceptors/processor/interface.go index fc48ade3db4..14c0ae73bd6 100644 --- a/process/interceptors/processor/interface.go +++ b/process/interceptors/processor/interface.go @@ -33,7 +33,9 @@ type interceptedEquivalentProof interface { // EquivalentProofsPool defines the behaviour of a proofs pool components type EquivalentProofsPool interface { - AddNotarizedProof(headerProof data.HeaderProofHandler) - GetNotarizedProof(shardID uint32, headerHash []byte) (data.HeaderProofHandler, error) + AddProof(headerProof data.HeaderProofHandler) error + CleanupProofsBehindNonce(shardID uint32, nonce uint64) error + GetProof(shardID uint32, headerHash []byte) (data.HeaderProofHandler, error) + HasProof(shardID uint32, headerHash []byte) bool IsInterfaceNil() bool } diff --git a/process/mock/forkDetectorMock.go b/process/mock/forkDetectorMock.go index a574e4724b1..51e79af246f 100644 --- a/process/mock/forkDetectorMock.go +++ b/process/mock/forkDetectorMock.go @@ -28,17 +28,27 @@ func (fdm *ForkDetectorMock) RestoreToGenesis() { // AddHeader - func (fdm *ForkDetectorMock) AddHeader(header data.HeaderHandler, hash []byte, state process.BlockHeaderState, selfNotarizedHeaders []data.HeaderHandler, selfNotarizedHeadersHashes [][]byte) error { - return fdm.AddHeaderCalled(header, hash, state, selfNotarizedHeaders, selfNotarizedHeadersHashes) + if fdm.AddHeaderCalled != nil { + return fdm.AddHeaderCalled(header, hash, state, selfNotarizedHeaders, selfNotarizedHeadersHashes) + } + + return nil } // RemoveHeader - func (fdm *ForkDetectorMock) RemoveHeader(nonce uint64, hash []byte) { - fdm.RemoveHeaderCalled(nonce, hash) + if fdm.RemoveHeaderCalled != nil { + fdm.RemoveHeaderCalled(nonce, hash) + } } // CheckFork - func (fdm *ForkDetectorMock) CheckFork() *process.ForkInfo { - return fdm.CheckForkCalled() + if fdm.CheckForkCalled != nil { + return fdm.CheckForkCalled() + } + + return nil } // GetHighestFinalBlockNonce - @@ -51,12 +61,20 @@ func (fdm *ForkDetectorMock) GetHighestFinalBlockNonce() uint64 { // GetHighestFinalBlockHash - func (fdm *ForkDetectorMock) GetHighestFinalBlockHash() []byte { - return fdm.GetHighestFinalBlockHashCalled() + if fdm.GetHighestFinalBlockHashCalled != nil { + return fdm.GetHighestFinalBlockHashCalled() + } + + return nil } // ProbableHighestNonce - func (fdm *ForkDetectorMock) ProbableHighestNonce() uint64 { - return fdm.ProbableHighestNonceCalled() + if fdm.ProbableHighestNonceCalled != nil { + return fdm.ProbableHighestNonceCalled() + } + + return 0 } // SetRollBackNonce - @@ -68,12 +86,18 @@ func (fdm *ForkDetectorMock) SetRollBackNonce(nonce uint64) { // ResetFork - func (fdm *ForkDetectorMock) ResetFork() { - fdm.ResetForkCalled() + if fdm.ResetForkCalled != nil { + fdm.ResetForkCalled() + } } // GetNotarizedHeaderHash - func (fdm *ForkDetectorMock) GetNotarizedHeaderHash(nonce uint64) []byte { - return fdm.GetNotarizedHeaderHashCalled(nonce) + if fdm.GetNotarizedHeaderHashCalled != nil { + return fdm.GetNotarizedHeaderHashCalled(nonce) + } + + return nil } // ResetProbableHighestNonce - diff --git a/process/peer/process.go b/process/peer/process.go index 97ea34fc21a..c5ebb890d8a 100644 --- a/process/peer/process.go +++ b/process/peer/process.go @@ -1175,6 +1175,11 @@ func (vs *validatorStatistics) getTempRating(s string) uint32 { } func (vs *validatorStatistics) display(validatorKey string) { + if log.GetLevel() != logger.LogTrace { + // do not need to load peer account if not log level trace + return + } + peerAcc, err := vs.loadPeerAccount([]byte(validatorKey)) if err != nil { log.Trace("display peer acc", "error", err) diff --git a/process/sync/argBootstrapper.go b/process/sync/argBootstrapper.go index ec3f64a58d8..587ecedd258 100644 --- a/process/sync/argBootstrapper.go +++ b/process/sync/argBootstrapper.go @@ -8,6 +8,7 @@ import ( "github.com/multiversx/mx-chain-core-go/data/typeConverters" "github.com/multiversx/mx-chain-core-go/hashing" "github.com/multiversx/mx-chain-core-go/marshal" + "github.com/multiversx/mx-chain-go/common" "github.com/multiversx/mx-chain-go/consensus" "github.com/multiversx/mx-chain-go/dataRetriever" "github.com/multiversx/mx-chain-go/dblookupext" @@ -48,6 +49,7 @@ type ArgBaseBootstrapper struct { ScheduledTxsExecutionHandler process.ScheduledTxsExecutionHandler ProcessWaitTime time.Duration RepopulateTokensSupplies bool + EnableEpochsHandler common.EnableEpochsHandler } // ArgShardBootstrapper holds all dependencies required by the bootstrap data factory in order to create diff --git a/process/sync/baseSync.go b/process/sync/baseSync.go index aa43d8cecc1..cf13638912f 100644 --- a/process/sync/baseSync.go +++ b/process/sync/baseSync.go @@ -3,6 +3,7 @@ package sync import ( "bytes" "context" + "encoding/hex" "fmt" "math" "sync" @@ -57,21 +58,23 @@ type notarizedInfo struct { type baseBootstrap struct { historyRepo dblookupext.HistoryRepository headers dataRetriever.HeadersPool + proofs dataRetriever.ProofsPool chainHandler data.ChainHandler blockProcessor process.BlockProcessor store dataRetriever.StorageService - roundHandler consensus.RoundHandler - hasher hashing.Hasher - marshalizer marshal.Marshalizer - epochHandler dataRetriever.EpochHandler - forkDetector process.ForkDetector - requestHandler process.RequestHandler - shardCoordinator sharding.Coordinator - accounts state.AccountsAdapter - blockBootstrapper blockBootstrapper - blackListHandler process.TimeCacher + roundHandler consensus.RoundHandler + hasher hashing.Hasher + marshalizer marshal.Marshalizer + epochHandler dataRetriever.EpochHandler + forkDetector process.ForkDetector + requestHandler process.RequestHandler + shardCoordinator sharding.Coordinator + accounts state.AccountsAdapter + blockBootstrapper blockBootstrapper + blackListHandler process.TimeCacher + enableEpochsHandler common.EnableEpochsHandler mutHeader sync.RWMutex headerNonce *uint64 @@ -491,6 +494,9 @@ func checkBaseBootstrapParameters(arguments ArgBaseBootstrapper) error { if arguments.ProcessWaitTime < minimumProcessWaitTime { return fmt.Errorf("%w, minimum is %v, provided is %v", process.ErrInvalidProcessWaitTime, minimumProcessWaitTime, arguments.ProcessWaitTime) } + if check.IfNil(arguments.EnableEpochsHandler) { + return process.ErrNilEnableEpochsHandler + } return nil } @@ -630,13 +636,18 @@ func (boot *baseBootstrap) syncBlock() error { } }() - header, err = boot.getNextHeaderRequestingIfMissing() + header, headerHash, err := boot.getNextHeaderRequestingIfMissing() if err != nil { return err } go boot.requestHeadersFromNonceIfMissing(header.GetNonce() + 1) + err = boot.handleEquivalentProof(header, headerHash) + if err != nil { + return err + } + body, err = boot.blockBootstrapper.getBlockBodyRequestingIfMissing(header) if err != nil { return err @@ -687,6 +698,47 @@ func (boot *baseBootstrap) syncBlock() error { ) boot.cleanNoncesSyncedWithErrorsBehindFinal() + boot.cleanProofsBehindFinal(header) + + return nil +} + +func (boot *baseBootstrap) handleEquivalentProof( + header data.HeaderHandler, + headerHash []byte, +) error { + if !boot.enableEpochsHandler.IsFlagEnabledInEpoch(common.EquivalentMessagesFlag, header.GetEpoch()) { + return nil + } + + prevHeader, err := boot.blockBootstrapper.getHeaderWithHashRequestingIfMissing(header.GetPrevHash()) + if err != nil { + return err + } + + if !boot.enableEpochsHandler.IsFlagEnabledInEpoch(common.EquivalentMessagesFlag, prevHeader.GetEpoch()) { + // no need to check proof for first block after activation + log.Info("handleEquivalentProof: no need to check equivalent proof for first activation block") + return nil + } + + // process block only if there is a proof for it + hasProof := boot.proofs.HasProof(header.GetShardID(), headerHash) + if hasProof { + return nil + } + + log.Trace("baseBootstrap.handleEquivalentProof: did not have proof for header, will try again", "headerHash", headerHash) + + _, _, err = boot.blockBootstrapper.getHeaderWithNonceRequestingIfMissing(header.GetNonce() + 1) + if err != nil { + return err + } + + hasProof = boot.proofs.HasProof(header.GetShardID(), headerHash) + if !hasProof { + return fmt.Errorf("baseBootstrap.handleEquivalentProof: did not have proof for header, headerHash %s", hex.EncodeToString(headerHash)) + } return nil } @@ -715,6 +767,25 @@ func (boot *baseBootstrap) cleanNoncesSyncedWithErrorsBehindFinal() { } } +func (boot *baseBootstrap) cleanProofsBehindFinal(header data.HeaderHandler) { + if !boot.enableEpochsHandler.IsFlagEnabledInEpoch(common.EquivalentMessagesFlag, header.GetEpoch()) { + return + } + + // TODO: analyse fork detection by proofs + finalNonce := boot.forkDetector.GetHighestFinalBlockNonce() + + err := boot.proofs.CleanupProofsBehindNonce(header.GetShardID(), finalNonce) + if err != nil { + log.Warn("failed to cleanup notarized proofs behind nonce", + "nonce", finalNonce, + "shardID", header.GetShardID(), + "error", err) + } + + log.Trace("baseBootstrap.cleanProofsBehindFinal cleanup successfully", "finalNonce", finalNonce) +} + // rollBack decides if rollBackOneBlock must be called func (boot *baseBootstrap) rollBack(revertUsingForkNonce bool) error { var roleBackOneBlockExecuted bool @@ -935,7 +1006,7 @@ func (boot *baseBootstrap) getRootHashFromBlock(hdr data.HeaderHandler, hdrHash return hdrRootHash } -func (boot *baseBootstrap) getNextHeaderRequestingIfMissing() (data.HeaderHandler, error) { +func (boot *baseBootstrap) getNextHeaderRequestingIfMissing() (data.HeaderHandler, []byte, error) { nonce := boot.getNonceForNextBlock() boot.setRequestedHeaderHash(nil) @@ -947,7 +1018,8 @@ func (boot *baseBootstrap) getNextHeaderRequestingIfMissing() (data.HeaderHandle } if hash != nil { - return boot.blockBootstrapper.getHeaderWithHashRequestingIfMissing(hash) + header, err := boot.blockBootstrapper.getHeaderWithHashRequestingIfMissing(hash) + return header, hash, err } return boot.blockBootstrapper.getHeaderWithNonceRequestingIfMissing(nonce) diff --git a/process/sync/export_test.go b/process/sync/export_test.go index 719e7599f9f..16a91ead8b3 100644 --- a/process/sync/export_test.go +++ b/process/sync/export_test.go @@ -288,3 +288,11 @@ func (boot *baseBootstrap) IsInImportMode() bool { func (boot *baseBootstrap) ProcessWaitTime() time.Duration { return boot.processWaitTime } + +// HandleEquivalentProof - +func (boot *baseBootstrap) HandleEquivalentProof( + header data.HeaderHandler, + headerHash []byte, +) error { + return boot.handleEquivalentProof(header, headerHash) +} diff --git a/process/sync/interface.go b/process/sync/interface.go index 88f644df160..d672cafb88b 100644 --- a/process/sync/interface.go +++ b/process/sync/interface.go @@ -13,7 +13,7 @@ type blockBootstrapper interface { getPrevHeader(data.HeaderHandler, storage.Storer) (data.HeaderHandler, error) getBlockBody(headerHandler data.HeaderHandler) (data.BodyHandler, error) getHeaderWithHashRequestingIfMissing(hash []byte) (data.HeaderHandler, error) - getHeaderWithNonceRequestingIfMissing(nonce uint64) (data.HeaderHandler, error) + getHeaderWithNonceRequestingIfMissing(nonce uint64) (data.HeaderHandler, []byte, error) haveHeaderInPoolWithNonce(nonce uint64) bool getBlockBodyRequestingIfMissing(headerHandler data.HeaderHandler) (data.BodyHandler, error) isForkTriggeredByMeta() bool diff --git a/process/sync/metablock.go b/process/sync/metablock.go index 1b3c69c7386..72fc8a8688b 100644 --- a/process/sync/metablock.go +++ b/process/sync/metablock.go @@ -31,6 +31,9 @@ func NewMetaBootstrap(arguments ArgMetaBootstrapper) (*MetaBootstrap, error) { if check.IfNil(arguments.PoolsHolder.Headers()) { return nil, process.ErrNilMetaBlocksPool } + if check.IfNil(arguments.PoolsHolder.Proofs()) { + return nil, process.ErrNilProofsPool + } if check.IfNil(arguments.EpochBootstrapper) { return nil, process.ErrNilEpochStartTrigger } @@ -54,6 +57,7 @@ func NewMetaBootstrap(arguments ArgMetaBootstrapper) (*MetaBootstrap, error) { blockProcessor: arguments.BlockProcessor, store: arguments.Store, headers: arguments.PoolsHolder.Headers(), + proofs: arguments.PoolsHolder.Proofs(), roundHandler: arguments.RoundHandler, waitTime: arguments.WaitTime, hasher: arguments.Hasher, @@ -78,6 +82,7 @@ func NewMetaBootstrap(arguments ArgMetaBootstrapper) (*MetaBootstrap, error) { historyRepo: arguments.HistoryRepo, scheduledTxsExecutionHandler: arguments.ScheduledTxsExecutionHandler, processWaitTime: arguments.ProcessWaitTime, + enableEpochsHandler: arguments.EnableEpochsHandler, } if base.isInImportMode { @@ -243,8 +248,8 @@ func (boot *MetaBootstrap) requestHeaderWithHash(hash []byte) { // getHeaderWithNonceRequestingIfMissing method gets the header with a given nonce from pool. If it is not found there, it will // be requested from network -func (boot *MetaBootstrap) getHeaderWithNonceRequestingIfMissing(nonce uint64) (data.HeaderHandler, error) { - hdr, _, err := process.GetMetaHeaderFromPoolWithNonce( +func (boot *MetaBootstrap) getHeaderWithNonceRequestingIfMissing(nonce uint64) (data.HeaderHandler, []byte, error) { + hdr, hash, err := process.GetMetaHeaderFromPoolWithNonce( nonce, boot.headers) if err != nil { @@ -252,18 +257,18 @@ func (boot *MetaBootstrap) getHeaderWithNonceRequestingIfMissing(nonce uint64) ( boot.requestHeaderWithNonce(nonce) err = boot.waitForHeaderNonce() if err != nil { - return nil, err + return nil, nil, err } - hdr, _, err = process.GetMetaHeaderFromPoolWithNonce( + hdr, hash, err = process.GetMetaHeaderFromPoolWithNonce( nonce, boot.headers) if err != nil { - return nil, err + return nil, nil, err } } - return hdr, nil + return hdr, hash, nil } // getHeaderWithHashRequestingIfMissing method gets the header with a given hash from pool. If it is not found there, diff --git a/process/sync/metablock_test.go b/process/sync/metablock_test.go index 8835041848c..73386a021f1 100644 --- a/process/sync/metablock_test.go +++ b/process/sync/metablock_test.go @@ -28,7 +28,9 @@ import ( "github.com/multiversx/mx-chain-go/storage" "github.com/multiversx/mx-chain-go/testscommon" "github.com/multiversx/mx-chain-go/testscommon/cache" + dataRetrieverMock "github.com/multiversx/mx-chain-go/testscommon/dataRetriever" "github.com/multiversx/mx-chain-go/testscommon/dblookupext" + "github.com/multiversx/mx-chain-go/testscommon/enableEpochsHandlerMock" "github.com/multiversx/mx-chain-go/testscommon/hashingMocks" "github.com/multiversx/mx-chain-go/testscommon/outport" stateMock "github.com/multiversx/mx-chain-go/testscommon/state" @@ -94,6 +96,7 @@ func CreateMetaBootstrapMockArguments() sync.ArgMetaBootstrapper { ScheduledTxsExecutionHandler: &testscommon.ScheduledTxsExecutionStub{}, ProcessWaitTime: testProcessWaitTime, RepopulateTokensSupplies: false, + EnableEpochsHandler: &enableEpochsHandlerMock.EnableEpochsHandlerStub{}, } argsMetaBootstrapper := sync.ArgMetaBootstrapper{ @@ -172,6 +175,22 @@ func TestNewMetaBootstrap_PoolsHolderRetNilOnHeadersShouldErr(t *testing.T) { assert.Equal(t, process.ErrNilMetaBlocksPool, err) } +func TestNewMetaBootstrap_NilProofsPool(t *testing.T) { + t.Parallel() + + args := CreateMetaBootstrapMockArguments() + pools := createMockPools() + pools.ProofsCalled = func() dataRetriever.ProofsPool { + return nil + } + args.PoolsHolder = pools + + bs, err := sync.NewMetaBootstrap(args) + + assert.True(t, check.IfNil(bs)) + assert.Equal(t, process.ErrNilProofsPool, err) +} + func TestNewMetaBootstrap_NilStoreShouldErr(t *testing.T) { t.Parallel() @@ -388,6 +407,34 @@ func TestNewMetaBootstrap_InvalidProcessTimeShouldErr(t *testing.T) { assert.True(t, errors.Is(err, process.ErrInvalidProcessWaitTime)) } +func TestNewMetaBootstrap_NilEnableEpochsHandlerShouldErr(t *testing.T) { + t.Parallel() + + args := CreateMetaBootstrapMockArguments() + args.EnableEpochsHandler = nil + + bs, err := sync.NewMetaBootstrap(args) + + assert.True(t, check.IfNil(bs)) + assert.True(t, errors.Is(err, process.ErrNilEnableEpochsHandler)) +} + +func TestNewMetaBootstrap_PoolsHolderRetNilOnProofsShouldErr(t *testing.T) { + t.Parallel() + + args := CreateMetaBootstrapMockArguments() + pools := createMockPools() + pools.ProofsCalled = func() dataRetriever.ProofsPool { + return nil + } + args.PoolsHolder = pools + + bs, err := sync.NewMetaBootstrap(args) + + assert.True(t, check.IfNil(bs)) + assert.Equal(t, process.ErrNilProofsPool, err) +} + func TestNewMetaBootstrap_MissingStorer(t *testing.T) { t.Parallel() @@ -1812,3 +1859,333 @@ func TestMetaBootstrap_SyncAccountsDBs(t *testing.T) { require.True(t, accountsSyncCalled) }) } + +func TestMetaBootstrap_HandleEquivalentProof(t *testing.T) { + t.Parallel() + + prevHeaderHash1 := []byte("prevHeaderHash") + headerHash1 := []byte("headerHash") + + t.Run("flag not activated, should return direclty", func(t *testing.T) { + t.Parallel() + + header := &block.MetaBlock{ + Nonce: 11, + } + + args := CreateMetaBootstrapMockArguments() + args.EnableEpochsHandler = &enableEpochsHandlerMock.EnableEpochsHandlerStub{ + IsFlagEnabledInEpochCalled: func(flag core.EnableEpochFlag, epoch uint32) bool { + return false + }, + } + + bs, err := sync.NewMetaBootstrap(args) + require.Nil(t, err) + + err = bs.HandleEquivalentProof(header, headerHash1) + require.Nil(t, err) + }) + + t.Run("should return nil if first block after activation", func(t *testing.T) { + t.Parallel() + + prevHeader := &block.MetaBlock{ + Epoch: 3, + Nonce: 10, + } + + header := &block.MetaBlock{ + Epoch: 4, + Nonce: 11, + PrevHash: prevHeaderHash1, + } + + args := CreateMetaBootstrapMockArguments() + args.EnableEpochsHandler = &enableEpochsHandlerMock.EnableEpochsHandlerStub{ + IsFlagEnabledInEpochCalled: func(flag core.EnableEpochFlag, epoch uint32) bool { + if epoch == 4 { + return flag == common.EquivalentMessagesFlag + } + + return false + }, + } + + pools := createMockPools() + pools.HeadersCalled = func() dataRetriever.HeadersPool { + sds := &mock.HeadersCacherStub{} + sds.GetHeaderByHashCalled = func(hash []byte) (data.HeaderHandler, error) { + if bytes.Equal(hash, prevHeaderHash1) { + return prevHeader, nil + } + + return nil, sync.ErrHeaderNotFound + } + + return sds + } + + args.PoolsHolder = pools + + bs, err := sync.NewMetaBootstrap(args) + require.Nil(t, err) + + err = bs.HandleEquivalentProof(header, headerHash1) + require.Nil(t, err) + }) + + t.Run("should work, proof already in pool", func(t *testing.T) { + t.Parallel() + + prevHeader := &block.MetaBlock{ + Nonce: 10, + } + + header := &block.MetaBlock{ + Nonce: 11, + PrevHash: prevHeaderHash1, + } + + args := CreateMetaBootstrapMockArguments() + args.EnableEpochsHandler = &enableEpochsHandlerMock.EnableEpochsHandlerStub{ + IsFlagEnabledInEpochCalled: func(flag core.EnableEpochFlag, epoch uint32) bool { + return flag == common.EquivalentMessagesFlag + }, + } + + pools := createMockPools() + pools.HeadersCalled = func() dataRetriever.HeadersPool { + sds := &mock.HeadersCacherStub{} + sds.GetHeaderByHashCalled = func(hash []byte) (data.HeaderHandler, error) { + if bytes.Equal(hash, prevHeaderHash1) { + return prevHeader, nil + } + + return nil, sync.ErrHeaderNotFound + } + + return sds + } + + pools.ProofsCalled = func() dataRetriever.ProofsPool { + return &dataRetrieverMock.ProofsPoolMock{ + HasProofCalled: func(shardID uint32, headerHash []byte) bool { + return true + }, + } + } + + args.PoolsHolder = pools + + bs, err := sync.NewMetaBootstrap(args) + require.Nil(t, err) + + err = bs.HandleEquivalentProof(header, headerHash1) + require.Nil(t, err) + }) + + t.Run("should work, by checking for next header", func(t *testing.T) { + t.Parallel() + + headerHash1 := []byte("headerHash1") + headerHash2 := []byte("headerHash2") + + header1 := &block.MetaBlock{ + Nonce: 10, + } + + header2 := &block.MetaBlock{ + Nonce: 11, + PrevHash: headerHash1, + } + + header3 := &block.MetaBlock{ + Nonce: 12, + PrevHash: headerHash2, + } + + args := CreateMetaBootstrapMockArguments() + args.EnableEpochsHandler = &enableEpochsHandlerMock.EnableEpochsHandlerStub{ + IsFlagEnabledInEpochCalled: func(flag core.EnableEpochFlag, epoch uint32) bool { + return flag == common.EquivalentMessagesFlag + }, + } + + pools := createMockPools() + pools.HeadersCalled = func() dataRetriever.HeadersPool { + sds := &mock.HeadersCacherStub{} + sds.GetHeaderByHashCalled = func(hash []byte) (data.HeaderHandler, error) { + if bytes.Equal(hash, headerHash1) { + return header1, nil + } + + return nil, sync.ErrHeaderNotFound + } + sds.GetHeaderByNonceAndShardIdCalled = func(hdrNonce uint64, shardId uint32) ([]data.HeaderHandler, [][]byte, error) { + if hdrNonce == header2.GetNonce()+1 { + return []data.HeaderHandler{header3}, [][]byte{headerHash2}, nil + } + + return nil, nil, process.ErrMissingHeader + } + + return sds + } + + hasProofCalled := 0 + pools.ProofsCalled = func() dataRetriever.ProofsPool { + return &dataRetrieverMock.ProofsPoolMock{ + HasProofCalled: func(shardID uint32, headerHash []byte) bool { + if hasProofCalled == 0 { + hasProofCalled++ + return false + } + + return true + }, + } + } + + args.PoolsHolder = pools + + bs, err := sync.NewMetaBootstrap(args) + require.Nil(t, err) + + err = bs.HandleEquivalentProof(header2, headerHash2) + require.Nil(t, err) + }) + + t.Run("should return err if failing to get proof after second request", func(t *testing.T) { + t.Parallel() + + headerHash1 := []byte("headerHash1") + headerHash2 := []byte("headerHash2") + + header1 := &block.MetaBlock{ + Nonce: 10, + } + + header2 := &block.MetaBlock{ + Nonce: 11, + PrevHash: headerHash1, + } + + header3 := &block.MetaBlock{ + Nonce: 12, + PrevHash: headerHash2, + } + + args := CreateMetaBootstrapMockArguments() + args.EnableEpochsHandler = &enableEpochsHandlerMock.EnableEpochsHandlerStub{ + IsFlagEnabledInEpochCalled: func(flag core.EnableEpochFlag, epoch uint32) bool { + return flag == common.EquivalentMessagesFlag + }, + } + + pools := createMockPools() + pools.HeadersCalled = func() dataRetriever.HeadersPool { + sds := &mock.HeadersCacherStub{} + sds.GetHeaderByHashCalled = func(hash []byte) (data.HeaderHandler, error) { + if bytes.Equal(hash, headerHash1) { + return header1, nil + } + + return nil, sync.ErrHeaderNotFound + } + sds.GetHeaderByNonceAndShardIdCalled = func(hdrNonce uint64, shardId uint32) ([]data.HeaderHandler, [][]byte, error) { + if hdrNonce == header2.GetNonce()+1 { + return []data.HeaderHandler{header3}, [][]byte{headerHash2}, nil + } + + return nil, nil, process.ErrMissingHeader + } + + return sds + } + + hasProofCalled := 0 + pools.ProofsCalled = func() dataRetriever.ProofsPool { + return &dataRetrieverMock.ProofsPoolMock{ + HasProofCalled: func(shardID uint32, headerHash []byte) bool { + if hasProofCalled < 2 { + hasProofCalled++ + return false + } + + return true + }, + } + } + + args.PoolsHolder = pools + + bs, err := sync.NewMetaBootstrap(args) + require.Nil(t, err) + + err = bs.HandleEquivalentProof(header2, headerHash2) + require.Error(t, err) + }) + + t.Run("should return err if failing to request next header", func(t *testing.T) { + t.Parallel() + + headerHash1 := []byte("headerHash1") + headerHash2 := []byte("headerHash2") + + header1 := &block.MetaBlock{ + Nonce: 10, + } + + header2 := &block.MetaBlock{ + Nonce: 11, + PrevHash: headerHash1, + } + + args := CreateMetaBootstrapMockArguments() + args.EnableEpochsHandler = &enableEpochsHandlerMock.EnableEpochsHandlerStub{ + IsFlagEnabledInEpochCalled: func(flag core.EnableEpochFlag, epoch uint32) bool { + return flag == common.EquivalentMessagesFlag + }, + } + + pools := createMockPools() + pools.HeadersCalled = func() dataRetriever.HeadersPool { + sds := &mock.HeadersCacherStub{} + sds.GetHeaderByHashCalled = func(hash []byte) (data.HeaderHandler, error) { + if bytes.Equal(hash, headerHash1) { + return header1, nil + } + + return nil, sync.ErrHeaderNotFound + } + sds.GetHeaderByNonceAndShardIdCalled = func(hdrNonce uint64, shardId uint32) ([]data.HeaderHandler, [][]byte, error) { + return nil, nil, process.ErrMissingHeader + } + + return sds + } + + hasProofCalled := 0 + pools.ProofsCalled = func() dataRetriever.ProofsPool { + return &dataRetrieverMock.ProofsPoolMock{ + HasProofCalled: func(shardID uint32, headerHash []byte) bool { + if hasProofCalled < 2 { + hasProofCalled++ + return false + } + + return true + }, + } + } + + args.PoolsHolder = pools + + bs, err := sync.NewMetaBootstrap(args) + require.Nil(t, err) + + err = bs.HandleEquivalentProof(header2, headerHash2) + require.Error(t, err) + }) +} diff --git a/process/sync/shardblock.go b/process/sync/shardblock.go index 8cca3954ef0..10a3492d024 100644 --- a/process/sync/shardblock.go +++ b/process/sync/shardblock.go @@ -27,6 +27,9 @@ func NewShardBootstrap(arguments ArgShardBootstrapper) (*ShardBootstrap, error) if check.IfNil(arguments.PoolsHolder.Headers()) { return nil, process.ErrNilHeadersDataPool } + if check.IfNil(arguments.PoolsHolder.Proofs()) { + return nil, process.ErrNilProofsPool + } if check.IfNil(arguments.PoolsHolder.MiniBlocks()) { return nil, process.ErrNilTxBlockBody } @@ -41,6 +44,7 @@ func NewShardBootstrap(arguments ArgShardBootstrapper) (*ShardBootstrap, error) blockProcessor: arguments.BlockProcessor, store: arguments.Store, headers: arguments.PoolsHolder.Headers(), + proofs: arguments.PoolsHolder.Proofs(), roundHandler: arguments.RoundHandler, waitTime: arguments.WaitTime, hasher: arguments.Hasher, @@ -66,6 +70,7 @@ func NewShardBootstrap(arguments ArgShardBootstrapper) (*ShardBootstrap, error) scheduledTxsExecutionHandler: arguments.ScheduledTxsExecutionHandler, processWaitTime: arguments.ProcessWaitTime, repopulateTokensSupplies: arguments.RepopulateTokensSupplies, + enableEpochsHandler: arguments.EnableEpochsHandler, } if base.isInImportMode { @@ -196,8 +201,8 @@ func (boot *ShardBootstrap) requestHeaderWithHash(hash []byte) { // getHeaderWithNonceRequestingIfMissing method gets the header with a given nonce from pool. If it is not found there, it will // be requested from network -func (boot *ShardBootstrap) getHeaderWithNonceRequestingIfMissing(nonce uint64) (data.HeaderHandler, error) { - hdr, _, err := process.GetShardHeaderFromPoolWithNonce( +func (boot *ShardBootstrap) getHeaderWithNonceRequestingIfMissing(nonce uint64) (data.HeaderHandler, []byte, error) { + hdr, hash, err := process.GetShardHeaderFromPoolWithNonce( nonce, boot.shardCoordinator.SelfId(), boot.headers) @@ -206,19 +211,19 @@ func (boot *ShardBootstrap) getHeaderWithNonceRequestingIfMissing(nonce uint64) boot.requestHeaderWithNonce(nonce) err = boot.waitForHeaderNonce() if err != nil { - return nil, err + return nil, nil, err } - hdr, _, err = process.GetShardHeaderFromPoolWithNonce( + hdr, hash, err = process.GetShardHeaderFromPoolWithNonce( nonce, boot.shardCoordinator.SelfId(), boot.headers) if err != nil { - return nil, err + return nil, nil, err } } - return hdr, nil + return hdr, hash, nil } // getHeaderWithHashRequestingIfMissing method gets the header with a given hash from pool. If it is not found there, diff --git a/process/sync/shardblock_test.go b/process/sync/shardblock_test.go index b6d1d292174..fbf974c1ee4 100644 --- a/process/sync/shardblock_test.go +++ b/process/sync/shardblock_test.go @@ -34,6 +34,7 @@ import ( "github.com/multiversx/mx-chain-go/testscommon/cache" dataRetrieverMock "github.com/multiversx/mx-chain-go/testscommon/dataRetriever" "github.com/multiversx/mx-chain-go/testscommon/dblookupext" + "github.com/multiversx/mx-chain-go/testscommon/enableEpochsHandlerMock" "github.com/multiversx/mx-chain-go/testscommon/hashingMocks" "github.com/multiversx/mx-chain-go/testscommon/outport" stateMock "github.com/multiversx/mx-chain-go/testscommon/state" @@ -65,6 +66,9 @@ func createMockPools() *dataRetrieverMock.PoolsHolderStub { } return cs } + pools.ProofsCalled = func() dataRetriever.ProofsPool { + return &dataRetrieverMock.ProofsPoolMock{} + } return pools } @@ -221,6 +225,7 @@ func CreateShardBootstrapMockArguments() sync.ArgShardBootstrapper { ScheduledTxsExecutionHandler: &testscommon.ScheduledTxsExecutionStub{}, ProcessWaitTime: testProcessWaitTime, RepopulateTokensSupplies: false, + EnableEpochsHandler: &enableEpochsHandlerMock.EnableEpochsHandlerStub{}, } argsShardBootstrapper := sync.ArgShardBootstrapper{ @@ -272,6 +277,22 @@ func TestNewShardBootstrap_PoolsHolderRetNilOnHeadersShouldErr(t *testing.T) { assert.Equal(t, process.ErrNilHeadersDataPool, err) } +func TestNewShardBootstrap_NilProofsPool(t *testing.T) { + t.Parallel() + + args := CreateShardBootstrapMockArguments() + pools := createMockPools() + pools.ProofsCalled = func() dataRetriever.ProofsPool { + return nil + } + args.PoolsHolder = pools + + bs, err := sync.NewShardBootstrap(args) + + assert.True(t, check.IfNil(bs)) + assert.Equal(t, process.ErrNilProofsPool, err) +} + func TestNewShardBootstrap_PoolsHolderRetNilOnTxBlockBodyShouldErr(t *testing.T) { t.Parallel() @@ -444,6 +465,34 @@ func TestNewShardBootstrap_InvalidProcessTimeShouldErr(t *testing.T) { assert.True(t, errors.Is(err, process.ErrInvalidProcessWaitTime)) } +func TestNewShardBootstrap_NilEnableEpochsHandlerShouldErr(t *testing.T) { + t.Parallel() + + args := CreateShardBootstrapMockArguments() + args.EnableEpochsHandler = nil + + bs, err := sync.NewShardBootstrap(args) + + assert.True(t, check.IfNil(bs)) + assert.True(t, errors.Is(err, process.ErrNilEnableEpochsHandler)) +} + +func TestNewShardBootstrap_PoolsHolderRetNilOnProofsShouldErr(t *testing.T) { + t.Parallel() + + args := CreateShardBootstrapMockArguments() + pools := createMockPools() + pools.ProofsCalled = func() dataRetriever.ProofsPool { + return nil + } + args.PoolsHolder = pools + + bs, err := sync.NewShardBootstrap(args) + + assert.True(t, check.IfNil(bs)) + assert.Equal(t, process.ErrNilProofsPool, err) +} + func TestNewShardBootstrap_MissingStorer(t *testing.T) { t.Parallel() @@ -500,6 +549,10 @@ func TestNewShardBootstrap_OkValsShouldWork(t *testing.T) { return cs } + pools.ProofsCalled = func() dataRetriever.ProofsPool { + return &dataRetrieverMock.ProofsPoolMock{} + } + args.PoolsHolder = pools args.IsInImportMode = true bs, err := sync.NewShardBootstrap(args) @@ -723,6 +776,10 @@ func TestBootstrap_SyncShouldSyncOneBlock(t *testing.T) { return cs } + pools.ProofsCalled = func() dataRetriever.ProofsPool { + return &dataRetrieverMock.ProofsPoolMock{} + } + args.PoolsHolder = pools forkDetector := &mock.ForkDetectorMock{} @@ -818,6 +875,9 @@ func TestBootstrap_ShouldReturnNilErr(t *testing.T) { return cs } + pools.ProofsCalled = func() dataRetriever.ProofsPool { + return &dataRetrieverMock.ProofsPoolMock{} + } args.PoolsHolder = pools forkDetector := &mock.ForkDetectorMock{} @@ -900,6 +960,9 @@ func TestBootstrap_SyncBlockShouldReturnErrorWhenProcessBlockFailed(t *testing.T return cs } + pools.ProofsCalled = func() dataRetriever.ProofsPool { + return &dataRetrieverMock.ProofsPoolMock{} + } args.PoolsHolder = pools forkDetector := &mock.ForkDetectorMock{} @@ -1882,6 +1945,9 @@ func TestShardBootstrap_RequestMiniBlocksFromHeaderWithNonceIfMissing(t *testing return cs } + pools.ProofsCalled = func() dataRetriever.ProofsPool { + return &dataRetrieverMock.ProofsPoolMock{} + } args.PoolsHolder = pools blkc := initBlockchain() @@ -2108,6 +2174,9 @@ func TestShardBootstrap_SyncBlockGetNodeDBErrorShouldSync(t *testing.T) { return cs } + pools.ProofsCalled = func() dataRetriever.ProofsPool { + return &dataRetrieverMock.ProofsPoolMock{} + } args.PoolsHolder = pools forkDetector := &mock.ForkDetectorMock{} @@ -2146,9 +2215,10 @@ func TestShardBootstrap_SyncBlockGetNodeDBErrorShouldSync(t *testing.T) { return []byte("roothash"), nil }} - bs, _ := sync.NewShardBootstrap(args) + bs, err := sync.NewShardBootstrap(args) + require.Nil(t, err) - err := bs.SyncBlock(context.Background()) + err = bs.SyncBlock(context.Background()) assert.Equal(t, errGetNodeFromDB, err) assert.True(t, syncCalled) } diff --git a/testscommon/dataRetriever/poolFactory.go b/testscommon/dataRetriever/poolFactory.go index 54214ceedd0..43aaeb3e78f 100644 --- a/testscommon/dataRetriever/poolFactory.go +++ b/testscommon/dataRetriever/poolFactory.go @@ -51,8 +51,7 @@ func CreateTxPool(numShards uint32, selfShard uint32) (dataRetriever.ShardedData ) } -// CreatePoolsHolder - -func CreatePoolsHolder(numShards uint32, selfShard uint32) dataRetriever.PoolsHolder { +func createPoolHolderArgs(numShards uint32, selfShard uint32) dataPool.DataPoolArgs { var err error txPool, err := CreateTxPool(numShards, selfShard) @@ -160,12 +159,35 @@ func CreatePoolsHolder(numShards uint32, selfShard uint32) dataRetriever.PoolsHo ValidatorsInfo: validatorsInfo, Proofs: proofsPool, } + + return dataPoolArgs +} + +// CreatePoolsHolder - +func CreatePoolsHolder(numShards uint32, selfShard uint32) dataRetriever.PoolsHolder { + + dataPoolArgs := createPoolHolderArgs(numShards, selfShard) + holder, err := dataPool.NewDataPool(dataPoolArgs) panicIfError("CreatePoolsHolder", err) return holder } +// CreatePoolsHolderWithProofsPool - +func CreatePoolsHolderWithProofsPool( + numShards uint32, selfShard uint32, + proofsPool dataRetriever.ProofsPool, +) dataRetriever.PoolsHolder { + dataPoolArgs := createPoolHolderArgs(numShards, selfShard) + dataPoolArgs.Proofs = proofsPool + + holder, err := dataPool.NewDataPool(dataPoolArgs) + panicIfError("CreatePoolsHolderWithProofsPool", err) + + return holder +} + // CreatePoolsHolderWithTxPool - func CreatePoolsHolderWithTxPool(txPool dataRetriever.ShardedDataCacherNotifier) dataRetriever.PoolsHolder { var err error diff --git a/testscommon/processMocks/equivalentProofsPoolMock.go b/testscommon/processMocks/equivalentProofsPoolMock.go deleted file mode 100644 index 9a2c73da584..00000000000 --- a/testscommon/processMocks/equivalentProofsPoolMock.go +++ /dev/null @@ -1,29 +0,0 @@ -package processMocks - -import "github.com/multiversx/mx-chain-core-go/data" - -// EquivalentProofsPoolMock - -type EquivalentProofsPoolMock struct { - AddNotarizedProofCalled func(headerProof data.HeaderProofHandler) - GetNotarizedProofCalled func(shardID uint32, headerHash []byte) (data.HeaderProofHandler, error) -} - -// AddNotarizedProof - -func (mock *EquivalentProofsPoolMock) AddNotarizedProof(headerProof data.HeaderProofHandler) { - if mock.AddNotarizedProofCalled != nil { - mock.AddNotarizedProofCalled(headerProof) - } -} - -// GetNotarizedProof - -func (mock *EquivalentProofsPoolMock) GetNotarizedProof(shardID uint32, headerHash []byte) (data.HeaderProofHandler, error) { - if mock.GetNotarizedProofCalled != nil { - return mock.GetNotarizedProofCalled(shardID, headerHash) - } - return nil, nil -} - -// IsInterfaceNil - -func (mock *EquivalentProofsPoolMock) IsInterfaceNil() bool { - return mock == nil -} diff --git a/update/factory/fullSyncInterceptors.go b/update/factory/fullSyncInterceptors.go index 037155226c9..ad8602e1e5b 100644 --- a/update/factory/fullSyncInterceptors.go +++ b/update/factory/fullSyncInterceptors.go @@ -350,6 +350,7 @@ func (ficf *fullSyncInterceptorsContainerFactory) createOneShardHeaderIntercepto argProcessor := &processor.ArgHdrInterceptorProcessor{ Headers: ficf.dataPool.Headers(), BlockBlackList: ficf.blockBlackList, + Proofs: ficf.dataPool.Proofs(), } hdrProcessor, err := processor.NewHdrInterceptorProcessor(argProcessor) if err != nil { @@ -764,6 +765,7 @@ func (ficf *fullSyncInterceptorsContainerFactory) generateMetachainHeaderInterce argProcessor := &processor.ArgHdrInterceptorProcessor{ Headers: ficf.dataPool.Headers(), BlockBlackList: ficf.blockBlackList, + Proofs: ficf.dataPool.Proofs(), } hdrProcessor, err := processor.NewHdrInterceptorProcessor(argProcessor) if err != nil {