diff --git a/cmd/node/config/config.toml b/cmd/node/config/config.toml index f415dd8e426..688f688b7e2 100644 --- a/cmd/node/config/config.toml +++ b/cmd/node/config/config.toml @@ -958,3 +958,7 @@ # All validators will broadcast the message right away { EndIndex = 0, DelayInMilliseconds = 0 }, ] + +[InterceptedDataVerifier] + CacheSpanInSec = 30 + CacheExpiryInSec = 30 diff --git a/config/config.go b/config/config.go index 19da7e2c0c8..9607c9dc330 100644 --- a/config/config.go +++ b/config/config.go @@ -229,6 +229,8 @@ type Config struct { PoolsCleanersConfig PoolsCleanersConfig Redundancy RedundancyConfig ConsensusGradualBroadcast ConsensusGradualBroadcastConfig + + InterceptedDataVerifier InterceptedDataVerifierConfig } // PeersRatingConfig will hold settings related to peers rating @@ -679,3 +681,9 @@ type IndexBroadcastDelay struct { type ConsensusGradualBroadcastConfig struct { GradualIndexBroadcastDelay []IndexBroadcastDelay } + +// InterceptedDataVerifierConfig holds the configuration for the intercepted data verifier +type InterceptedDataVerifierConfig struct { + CacheSpanInSec uint64 + CacheExpiryInSec uint64 +} diff --git a/epochStart/bootstrap/factory/epochStartInterceptorsContainerFactory.go b/epochStart/bootstrap/factory/epochStartInterceptorsContainerFactory.go index d659989896b..8700b1daa24 100644 --- a/epochStart/bootstrap/factory/epochStartInterceptorsContainerFactory.go +++ b/epochStart/bootstrap/factory/epochStartInterceptorsContainerFactory.go @@ -6,6 +6,7 @@ import ( "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/data/typeConverters" + "github.com/multiversx/mx-chain-go/common" "github.com/multiversx/mx-chain-go/config" "github.com/multiversx/mx-chain-go/dataRetriever" @@ -25,23 +26,24 @@ const timeSpanForBadHeaders = time.Minute // ArgsEpochStartInterceptorContainer holds the arguments needed for creating a new epoch start interceptors // container factory type ArgsEpochStartInterceptorContainer struct { - CoreComponents process.CoreComponentsHolder - CryptoComponents process.CryptoComponentsHolder - Config config.Config - ShardCoordinator sharding.Coordinator - MainMessenger process.TopicHandler - FullArchiveMessenger process.TopicHandler - DataPool dataRetriever.PoolsHolder - WhiteListHandler update.WhiteListHandler - WhiteListerVerifiedTxs update.WhiteListHandler - AddressPubkeyConv core.PubkeyConverter - NonceConverter typeConverters.Uint64ByteSliceConverter - ChainID []byte - ArgumentsParser process.ArgumentsParser - HeaderIntegrityVerifier process.HeaderIntegrityVerifier - RequestHandler process.RequestHandler - SignaturesHandler process.SignaturesHandler - NodeOperationMode common.NodeOperation + CoreComponents process.CoreComponentsHolder + CryptoComponents process.CryptoComponentsHolder + Config config.Config + ShardCoordinator sharding.Coordinator + MainMessenger process.TopicHandler + FullArchiveMessenger process.TopicHandler + DataPool dataRetriever.PoolsHolder + WhiteListHandler update.WhiteListHandler + WhiteListerVerifiedTxs update.WhiteListHandler + AddressPubkeyConv core.PubkeyConverter + NonceConverter typeConverters.Uint64ByteSliceConverter + ChainID []byte + ArgumentsParser process.ArgumentsParser + HeaderIntegrityVerifier process.HeaderIntegrityVerifier + RequestHandler process.RequestHandler + SignaturesHandler process.SignaturesHandler + NodeOperationMode common.NodeOperation + InterceptedDataVerifierFactory process.InterceptedDataVerifierFactory } // NewEpochStartInterceptorsContainer will return a real interceptors container factory, but with many disabled components @@ -78,36 +80,37 @@ func NewEpochStartInterceptorsContainer(args ArgsEpochStartInterceptorContainer) hardforkTrigger := disabledFactory.HardforkTrigger() containerFactoryArgs := interceptorscontainer.CommonInterceptorsContainerFactoryArgs{ - CoreComponents: args.CoreComponents, - CryptoComponents: cryptoComponents, - Accounts: accountsAdapter, - ShardCoordinator: args.ShardCoordinator, - NodesCoordinator: nodesCoordinator, - MainMessenger: args.MainMessenger, - FullArchiveMessenger: args.FullArchiveMessenger, - Store: storer, - DataPool: args.DataPool, - MaxTxNonceDeltaAllowed: common.MaxTxNonceDeltaAllowed, - TxFeeHandler: feeHandler, - BlockBlackList: blackListHandler, - HeaderSigVerifier: headerSigVerifier, - HeaderIntegrityVerifier: args.HeaderIntegrityVerifier, - ValidityAttester: validityAttester, - EpochStartTrigger: epochStartTrigger, - WhiteListHandler: args.WhiteListHandler, - WhiteListerVerifiedTxs: args.WhiteListerVerifiedTxs, - AntifloodHandler: antiFloodHandler, - ArgumentsParser: args.ArgumentsParser, - PreferredPeersHolder: disabled.NewPreferredPeersHolder(), - SizeCheckDelta: uint32(sizeCheckDelta), - RequestHandler: args.RequestHandler, - PeerSignatureHandler: cryptoComponents.PeerSignatureHandler(), - SignaturesHandler: args.SignaturesHandler, - HeartbeatExpiryTimespanInSec: args.Config.HeartbeatV2.HeartbeatExpiryTimespanInSec, - MainPeerShardMapper: peerShardMapper, - FullArchivePeerShardMapper: fullArchivePeerShardMapper, - HardforkTrigger: hardforkTrigger, - NodeOperationMode: args.NodeOperationMode, + CoreComponents: args.CoreComponents, + CryptoComponents: cryptoComponents, + Accounts: accountsAdapter, + ShardCoordinator: args.ShardCoordinator, + NodesCoordinator: nodesCoordinator, + MainMessenger: args.MainMessenger, + FullArchiveMessenger: args.FullArchiveMessenger, + Store: storer, + DataPool: args.DataPool, + MaxTxNonceDeltaAllowed: common.MaxTxNonceDeltaAllowed, + TxFeeHandler: feeHandler, + BlockBlackList: blackListHandler, + HeaderSigVerifier: headerSigVerifier, + HeaderIntegrityVerifier: args.HeaderIntegrityVerifier, + ValidityAttester: validityAttester, + EpochStartTrigger: epochStartTrigger, + WhiteListHandler: args.WhiteListHandler, + WhiteListerVerifiedTxs: args.WhiteListerVerifiedTxs, + AntifloodHandler: antiFloodHandler, + ArgumentsParser: args.ArgumentsParser, + PreferredPeersHolder: disabled.NewPreferredPeersHolder(), + SizeCheckDelta: uint32(sizeCheckDelta), + RequestHandler: args.RequestHandler, + PeerSignatureHandler: cryptoComponents.PeerSignatureHandler(), + SignaturesHandler: args.SignaturesHandler, + HeartbeatExpiryTimespanInSec: args.Config.HeartbeatV2.HeartbeatExpiryTimespanInSec, + MainPeerShardMapper: peerShardMapper, + FullArchivePeerShardMapper: fullArchivePeerShardMapper, + HardforkTrigger: hardforkTrigger, + NodeOperationMode: args.NodeOperationMode, + InterceptedDataVerifierFactory: args.InterceptedDataVerifierFactory, } interceptorsContainerFactory, err := interceptorscontainer.NewMetaInterceptorsContainerFactory(containerFactoryArgs) diff --git a/epochStart/bootstrap/process.go b/epochStart/bootstrap/process.go index d8fef964e6a..c1c4eb8d4df 100644 --- a/epochStart/bootstrap/process.go +++ b/epochStart/bootstrap/process.go @@ -14,6 +14,8 @@ import ( "github.com/multiversx/mx-chain-core-go/data" "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-core-go/data/typeConverters/uint64ByteSlice" + logger "github.com/multiversx/mx-chain-logger-go" + "github.com/multiversx/mx-chain-go/common" disabledCommon "github.com/multiversx/mx-chain-go/common/disabled" "github.com/multiversx/mx-chain-go/common/ordering" @@ -52,7 +54,6 @@ import ( "github.com/multiversx/mx-chain-go/trie/storageMarker" "github.com/multiversx/mx-chain-go/update" updateSync "github.com/multiversx/mx-chain-go/update/sync" - logger "github.com/multiversx/mx-chain-logger-go" ) var log = logger.GetOrCreate("epochStart/bootstrap") @@ -152,6 +153,8 @@ type epochStartBootstrap struct { nodeType core.NodeType startEpoch uint32 shuffledOut bool + + interceptedDataVerifierFactory process.InterceptedDataVerifierFactory } type baseDataInStorage struct { @@ -190,6 +193,7 @@ type ArgsEpochStartBootstrap struct { NodeProcessingMode common.NodeProcessingMode StateStatsHandler common.StateStatisticsHandler NodesCoordinatorRegistryFactory nodesCoordinator.NodesCoordinatorRegistryFactory + InterceptedDataVerifierFactory process.InterceptedDataVerifierFactory } type dataToSync struct { @@ -242,6 +246,7 @@ func NewEpochStartBootstrap(args ArgsEpochStartBootstrap) (*epochStartBootstrap, stateStatsHandler: args.StateStatsHandler, startEpoch: args.GeneralConfig.EpochStartConfig.GenesisEpoch, nodesCoordinatorRegistryFactory: args.NodesCoordinatorRegistryFactory, + interceptedDataVerifierFactory: args.InterceptedDataVerifierFactory, } if epochStartProvider.prefsConfig.FullArchive { @@ -553,16 +558,17 @@ func (e *epochStartBootstrap) prepareComponentsToSyncFromNetwork() error { } argsEpochStartSyncer := ArgsNewEpochStartMetaSyncer{ - CoreComponentsHolder: e.coreComponentsHolder, - CryptoComponentsHolder: e.cryptoComponentsHolder, - RequestHandler: e.requestHandler, - Messenger: e.mainMessenger, - ShardCoordinator: e.shardCoordinator, - EconomicsData: e.economicsData, - WhitelistHandler: e.whiteListHandler, - StartInEpochConfig: epochStartConfig, - HeaderIntegrityVerifier: e.headerIntegrityVerifier, - MetaBlockProcessor: metaBlockProcessor, + CoreComponentsHolder: e.coreComponentsHolder, + CryptoComponentsHolder: e.cryptoComponentsHolder, + RequestHandler: e.requestHandler, + Messenger: e.mainMessenger, + ShardCoordinator: e.shardCoordinator, + EconomicsData: e.economicsData, + WhitelistHandler: e.whiteListHandler, + StartInEpochConfig: epochStartConfig, + HeaderIntegrityVerifier: e.headerIntegrityVerifier, + MetaBlockProcessor: metaBlockProcessor, + InterceptedDataVerifierFactory: e.interceptedDataVerifierFactory, } e.epochStartMetaBlockSyncer, err = NewEpochStartMetaSyncer(argsEpochStartSyncer) if err != nil { @@ -575,20 +581,21 @@ func (e *epochStartBootstrap) prepareComponentsToSyncFromNetwork() error { func (e *epochStartBootstrap) createSyncers() error { var err error args := factoryInterceptors.ArgsEpochStartInterceptorContainer{ - CoreComponents: e.coreComponentsHolder, - CryptoComponents: e.cryptoComponentsHolder, - Config: e.generalConfig, - ShardCoordinator: e.shardCoordinator, - MainMessenger: e.mainMessenger, - FullArchiveMessenger: e.fullArchiveMessenger, - DataPool: e.dataPool, - WhiteListHandler: e.whiteListHandler, - WhiteListerVerifiedTxs: e.whiteListerVerifiedTxs, - ArgumentsParser: e.argumentsParser, - HeaderIntegrityVerifier: e.headerIntegrityVerifier, - RequestHandler: e.requestHandler, - SignaturesHandler: e.mainMessenger, - NodeOperationMode: e.nodeOperationMode, + CoreComponents: e.coreComponentsHolder, + CryptoComponents: e.cryptoComponentsHolder, + Config: e.generalConfig, + ShardCoordinator: e.shardCoordinator, + MainMessenger: e.mainMessenger, + FullArchiveMessenger: e.fullArchiveMessenger, + DataPool: e.dataPool, + WhiteListHandler: e.whiteListHandler, + WhiteListerVerifiedTxs: e.whiteListerVerifiedTxs, + ArgumentsParser: e.argumentsParser, + HeaderIntegrityVerifier: e.headerIntegrityVerifier, + RequestHandler: e.requestHandler, + SignaturesHandler: e.mainMessenger, + NodeOperationMode: e.nodeOperationMode, + InterceptedDataVerifierFactory: e.interceptedDataVerifierFactory, } e.mainInterceptorContainer, e.fullArchiveInterceptorContainer, err = factoryInterceptors.NewEpochStartInterceptorsContainer(args) diff --git a/epochStart/bootstrap/process_test.go b/epochStart/bootstrap/process_test.go index 7878f3842be..67d6a9d1295 100644 --- a/epochStart/bootstrap/process_test.go +++ b/epochStart/bootstrap/process_test.go @@ -32,6 +32,7 @@ import ( "github.com/multiversx/mx-chain-go/epochStart/bootstrap/types" "github.com/multiversx/mx-chain-go/epochStart/mock" "github.com/multiversx/mx-chain-go/process" + processMock "github.com/multiversx/mx-chain-go/process/mock" "github.com/multiversx/mx-chain-go/sharding" "github.com/multiversx/mx-chain-go/sharding/nodesCoordinator" "github.com/multiversx/mx-chain-go/state" @@ -251,8 +252,9 @@ func createMockEpochStartBootstrapArgs( FlagsConfig: config.ContextFlagsConfig{ ForceStartFromNetwork: false, }, - TrieSyncStatisticsProvider: &testscommon.SizeSyncStatisticsHandlerStub{}, - StateStatsHandler: disabledStatistics.NewStateStatistics(), + TrieSyncStatisticsProvider: &testscommon.SizeSyncStatisticsHandlerStub{}, + StateStatsHandler: disabledStatistics.NewStateStatistics(), + InterceptedDataVerifierFactory: &processMock.InterceptedDataVerifierFactoryMock{}, } } @@ -994,6 +996,7 @@ func TestCreateSyncers(t *testing.T) { epochStartProvider.whiteListerVerifiedTxs = &testscommon.WhiteListHandlerStub{} epochStartProvider.requestHandler = &testscommon.RequestHandlerStub{} epochStartProvider.storageService = &storageMocks.ChainStorerStub{} + epochStartProvider.interceptedDataVerifierFactory = &processMock.InterceptedDataVerifierFactoryMock{} err := epochStartProvider.createSyncers() assert.Nil(t, err) diff --git a/epochStart/bootstrap/storageProcess.go b/epochStart/bootstrap/storageProcess.go index 2e57801ef89..0ec16f6548d 100644 --- a/epochStart/bootstrap/storageProcess.go +++ b/epochStart/bootstrap/storageProcess.go @@ -11,6 +11,7 @@ import ( "github.com/multiversx/mx-chain-core-go/data" "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-core-go/data/endProcess" + "github.com/multiversx/mx-chain-go/common" "github.com/multiversx/mx-chain-go/config" "github.com/multiversx/mx-chain-go/dataRetriever" @@ -177,16 +178,17 @@ func (sesb *storageEpochStartBootstrap) prepareComponentsToSync() error { } argsEpochStartSyncer := ArgsNewEpochStartMetaSyncer{ - CoreComponentsHolder: sesb.coreComponentsHolder, - CryptoComponentsHolder: sesb.cryptoComponentsHolder, - RequestHandler: sesb.requestHandler, - Messenger: sesb.mainMessenger, - ShardCoordinator: sesb.shardCoordinator, - EconomicsData: sesb.economicsData, - WhitelistHandler: sesb.whiteListHandler, - StartInEpochConfig: sesb.generalConfig.EpochStartConfig, - HeaderIntegrityVerifier: sesb.headerIntegrityVerifier, - MetaBlockProcessor: metablockProcessor, + CoreComponentsHolder: sesb.coreComponentsHolder, + CryptoComponentsHolder: sesb.cryptoComponentsHolder, + RequestHandler: sesb.requestHandler, + Messenger: sesb.mainMessenger, + ShardCoordinator: sesb.shardCoordinator, + EconomicsData: sesb.economicsData, + WhitelistHandler: sesb.whiteListHandler, + StartInEpochConfig: sesb.generalConfig.EpochStartConfig, + HeaderIntegrityVerifier: sesb.headerIntegrityVerifier, + MetaBlockProcessor: metablockProcessor, + InterceptedDataVerifierFactory: sesb.interceptedDataVerifierFactory, } sesb.epochStartMetaBlockSyncer, err = NewEpochStartMetaSyncer(argsEpochStartSyncer) diff --git a/epochStart/bootstrap/storageProcess_test.go b/epochStart/bootstrap/storageProcess_test.go index a59b0d125f2..64708040acd 100644 --- a/epochStart/bootstrap/storageProcess_test.go +++ b/epochStart/bootstrap/storageProcess_test.go @@ -11,11 +11,14 @@ import ( "github.com/multiversx/mx-chain-core-go/data" "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-core-go/data/endProcess" + "github.com/stretchr/testify/assert" + "github.com/multiversx/mx-chain-go/config" "github.com/multiversx/mx-chain-go/dataRetriever" "github.com/multiversx/mx-chain-go/epochStart" "github.com/multiversx/mx-chain-go/epochStart/mock" "github.com/multiversx/mx-chain-go/process" + processMock "github.com/multiversx/mx-chain-go/process/mock" "github.com/multiversx/mx-chain-go/sharding/nodesCoordinator" "github.com/multiversx/mx-chain-go/storage" "github.com/multiversx/mx-chain-go/testscommon" @@ -23,7 +26,6 @@ import ( dataRetrieverMock "github.com/multiversx/mx-chain-go/testscommon/dataRetriever" "github.com/multiversx/mx-chain-go/testscommon/economicsmocks" "github.com/multiversx/mx-chain-go/testscommon/genesisMocks" - "github.com/stretchr/testify/assert" ) func createMockStorageEpochStartBootstrapArgs( @@ -127,6 +129,7 @@ func TestStorageEpochStartBootstrap_BootstrapMetablockNotFound(t *testing.T) { } args.GeneralConfig = testscommon.GetGeneralConfig() args.GeneralConfig.EpochStartConfig.RoundsPerEpoch = roundsPerEpoch + args.InterceptedDataVerifierFactory = &processMock.InterceptedDataVerifierFactoryMock{} sesb, _ := NewStorageEpochStartBootstrap(args) params, err := sesb.Bootstrap() diff --git a/epochStart/bootstrap/syncEpochStartMeta.go b/epochStart/bootstrap/syncEpochStartMeta.go index fa764a04c4a..b550a25911a 100644 --- a/epochStart/bootstrap/syncEpochStartMeta.go +++ b/epochStart/bootstrap/syncEpochStartMeta.go @@ -8,6 +8,7 @@ import ( "github.com/multiversx/mx-chain-core-go/data" "github.com/multiversx/mx-chain-core-go/hashing" "github.com/multiversx/mx-chain-core-go/marshal" + "github.com/multiversx/mx-chain-go/common" "github.com/multiversx/mx-chain-go/config" "github.com/multiversx/mx-chain-go/epochStart" @@ -22,27 +23,29 @@ import ( var _ epochStart.StartOfEpochMetaSyncer = (*epochStartMetaSyncer)(nil) type epochStartMetaSyncer struct { - requestHandler RequestHandler - messenger Messenger - marshalizer marshal.Marshalizer - hasher hashing.Hasher - singleDataInterceptor process.Interceptor - metaBlockProcessor EpochStartMetaBlockInterceptorProcessor + requestHandler RequestHandler + messenger Messenger + marshalizer marshal.Marshalizer + hasher hashing.Hasher + singleDataInterceptor process.Interceptor + metaBlockProcessor EpochStartMetaBlockInterceptorProcessor + interceptedDataVerifierFactory process.InterceptedDataVerifierFactory } // ArgsNewEpochStartMetaSyncer - type ArgsNewEpochStartMetaSyncer struct { - CoreComponentsHolder process.CoreComponentsHolder - CryptoComponentsHolder process.CryptoComponentsHolder - RequestHandler RequestHandler - Messenger Messenger - ShardCoordinator sharding.Coordinator - EconomicsData process.EconomicsDataHandler - WhitelistHandler process.WhiteListHandler - StartInEpochConfig config.EpochStartConfig - ArgsParser process.ArgumentsParser - HeaderIntegrityVerifier process.HeaderIntegrityVerifier - MetaBlockProcessor EpochStartMetaBlockInterceptorProcessor + CoreComponentsHolder process.CoreComponentsHolder + CryptoComponentsHolder process.CryptoComponentsHolder + RequestHandler RequestHandler + Messenger Messenger + ShardCoordinator sharding.Coordinator + EconomicsData process.EconomicsDataHandler + WhitelistHandler process.WhiteListHandler + StartInEpochConfig config.EpochStartConfig + ArgsParser process.ArgumentsParser + HeaderIntegrityVerifier process.HeaderIntegrityVerifier + MetaBlockProcessor EpochStartMetaBlockInterceptorProcessor + InterceptedDataVerifierFactory process.InterceptedDataVerifierFactory } // NewEpochStartMetaSyncer will return a new instance of epochStartMetaSyncer @@ -62,13 +65,17 @@ func NewEpochStartMetaSyncer(args ArgsNewEpochStartMetaSyncer) (*epochStartMetaS if check.IfNil(args.MetaBlockProcessor) { return nil, epochStart.ErrNilMetablockProcessor } + if check.IfNil(args.InterceptedDataVerifierFactory) { + return nil, epochStart.ErrNilInterceptedDataVerifierFactory + } e := &epochStartMetaSyncer{ - requestHandler: args.RequestHandler, - messenger: args.Messenger, - marshalizer: args.CoreComponentsHolder.InternalMarshalizer(), - hasher: args.CoreComponentsHolder.Hasher(), - metaBlockProcessor: args.MetaBlockProcessor, + requestHandler: args.RequestHandler, + messenger: args.Messenger, + marshalizer: args.CoreComponentsHolder.InternalMarshalizer(), + hasher: args.CoreComponentsHolder.Hasher(), + metaBlockProcessor: args.MetaBlockProcessor, + interceptedDataVerifierFactory: args.InterceptedDataVerifierFactory, } argsInterceptedDataFactory := interceptorsFactory.ArgInterceptedDataFactory{ @@ -89,16 +96,22 @@ func NewEpochStartMetaSyncer(args ArgsNewEpochStartMetaSyncer) (*epochStartMetaS return nil, err } + interceptedDataVerifier, err := e.interceptedDataVerifierFactory.Create(factory.MetachainBlocksTopic) + if err != nil { + return nil, err + } + e.singleDataInterceptor, err = interceptors.NewSingleDataInterceptor( interceptors.ArgSingleDataInterceptor{ - Topic: factory.MetachainBlocksTopic, - DataFactory: interceptedMetaHdrDataFactory, - Processor: args.MetaBlockProcessor, - Throttler: disabled.NewThrottler(), - AntifloodHandler: disabled.NewAntiFloodHandler(), - WhiteListRequest: args.WhitelistHandler, - CurrentPeerId: args.Messenger.ID(), - PreferredPeersHolder: disabled.NewPreferredPeersHolder(), + Topic: factory.MetachainBlocksTopic, + DataFactory: interceptedMetaHdrDataFactory, + Processor: args.MetaBlockProcessor, + Throttler: disabled.NewThrottler(), + AntifloodHandler: disabled.NewAntiFloodHandler(), + WhiteListRequest: args.WhitelistHandler, + CurrentPeerId: args.Messenger.ID(), + PreferredPeersHolder: disabled.NewPreferredPeersHolder(), + InterceptedDataVerifier: interceptedDataVerifier, }, ) if err != nil { diff --git a/epochStart/bootstrap/syncEpochStartMeta_test.go b/epochStart/bootstrap/syncEpochStartMeta_test.go index 169b20a656e..ac05d2ba977 100644 --- a/epochStart/bootstrap/syncEpochStartMeta_test.go +++ b/epochStart/bootstrap/syncEpochStartMeta_test.go @@ -9,17 +9,19 @@ import ( "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/data" "github.com/multiversx/mx-chain-core-go/data/block" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/multiversx/mx-chain-go/config" "github.com/multiversx/mx-chain-go/epochStart" "github.com/multiversx/mx-chain-go/epochStart/mock" "github.com/multiversx/mx-chain-go/p2p" + processMock "github.com/multiversx/mx-chain-go/process/mock" "github.com/multiversx/mx-chain-go/testscommon" "github.com/multiversx/mx-chain-go/testscommon/cryptoMocks" "github.com/multiversx/mx-chain-go/testscommon/economicsmocks" "github.com/multiversx/mx-chain-go/testscommon/hashingMocks" "github.com/multiversx/mx-chain-go/testscommon/p2pmocks" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestNewEpochStartMetaSyncer_NilsShouldError(t *testing.T) { @@ -48,6 +50,12 @@ func TestNewEpochStartMetaSyncer_NilsShouldError(t *testing.T) { ess, err = NewEpochStartMetaSyncer(args) assert.True(t, check.IfNil(ess)) assert.Equal(t, epochStart.ErrNilMetablockProcessor, err) + + args = getEpochStartSyncerArgs() + args.InterceptedDataVerifierFactory = nil + ess, err = NewEpochStartMetaSyncer(args) + assert.True(t, check.IfNil(ess)) + assert.Equal(t, epochStart.ErrNilInterceptedDataVerifierFactory, err) } func TestNewEpochStartMetaSyncer_ShouldWork(t *testing.T) { @@ -71,7 +79,8 @@ func TestEpochStartMetaSyncer_SyncEpochStartMetaRegisterMessengerProcessorFailsS }, } args.Messenger = messenger - ess, _ := NewEpochStartMetaSyncer(args) + ess, err := NewEpochStartMetaSyncer(args) + require.NoError(t, err) mb, err := ess.SyncEpochStartMeta(time.Second) require.Equal(t, expectedErr, err) @@ -159,7 +168,8 @@ func getEpochStartSyncerArgs() ArgsNewEpochStartMetaSyncer { MinNumConnectedPeersToStart: 2, MinNumOfPeersToConsiderBlockValid: 2, }, - HeaderIntegrityVerifier: &mock.HeaderIntegrityVerifierStub{}, - MetaBlockProcessor: &mock.EpochStartMetaBlockProcessorStub{}, + HeaderIntegrityVerifier: &mock.HeaderIntegrityVerifierStub{}, + MetaBlockProcessor: &mock.EpochStartMetaBlockProcessorStub{}, + InterceptedDataVerifierFactory: &processMock.InterceptedDataVerifierFactoryMock{}, } } diff --git a/epochStart/errors.go b/epochStart/errors.go index ca115e939f4..e022064c472 100644 --- a/epochStart/errors.go +++ b/epochStart/errors.go @@ -239,6 +239,9 @@ var ErrNilEpochNotifier = errors.New("nil EpochNotifier") // ErrNilMetablockProcessor signals that a nil metablock processor was provided var ErrNilMetablockProcessor = errors.New("nil metablock processor") +// ErrNilInterceptedDataVerifierFactory signals that a nil intercepted data verifier factory was provided +var ErrNilInterceptedDataVerifierFactory = errors.New("nil intercepted data verifier factory") + // ErrCouldNotInitDelegationSystemSC signals that delegation system sc init failed var ErrCouldNotInitDelegationSystemSC = errors.New("could not init delegation system sc") diff --git a/factory/bootstrap/bootstrapComponents.go b/factory/bootstrap/bootstrapComponents.go index a9ef7851ccb..1c3500f9599 100644 --- a/factory/bootstrap/bootstrapComponents.go +++ b/factory/bootstrap/bootstrapComponents.go @@ -6,6 +6,8 @@ import ( "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/core/check" + logger "github.com/multiversx/mx-chain-logger-go" + nodeFactory "github.com/multiversx/mx-chain-go/cmd/node/factory" "github.com/multiversx/mx-chain-go/common" "github.com/multiversx/mx-chain-go/config" @@ -24,23 +26,23 @@ import ( storageFactory "github.com/multiversx/mx-chain-go/storage/factory" "github.com/multiversx/mx-chain-go/storage/latestData" "github.com/multiversx/mx-chain-go/storage/storageunit" - logger "github.com/multiversx/mx-chain-logger-go" ) var log = logger.GetOrCreate("factory") // BootstrapComponentsFactoryArgs holds the arguments needed to create a bootstrap components factory type BootstrapComponentsFactoryArgs struct { - Config config.Config - RoundConfig config.RoundConfig - PrefConfig config.Preferences - ImportDbConfig config.ImportDbConfig - FlagsConfig config.ContextFlagsConfig - WorkingDir string - CoreComponents factory.CoreComponentsHolder - CryptoComponents factory.CryptoComponentsHolder - NetworkComponents factory.NetworkComponentsHolder - StatusCoreComponents factory.StatusCoreComponentsHolder + Config config.Config + RoundConfig config.RoundConfig + PrefConfig config.Preferences + ImportDbConfig config.ImportDbConfig + FlagsConfig config.ContextFlagsConfig + WorkingDir string + CoreComponents factory.CoreComponentsHolder + CryptoComponents factory.CryptoComponentsHolder + NetworkComponents factory.NetworkComponentsHolder + StatusCoreComponents factory.StatusCoreComponentsHolder + InterceptedDataVerifierFactory process.InterceptedDataVerifierFactory } type bootstrapComponentsFactory struct { diff --git a/factory/processing/blockProcessorCreator_test.go b/factory/processing/blockProcessorCreator_test.go index 099fec4a82d..8b01c44c8f8 100644 --- a/factory/processing/blockProcessorCreator_test.go +++ b/factory/processing/blockProcessorCreator_test.go @@ -8,6 +8,9 @@ import ( "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/hashing" "github.com/multiversx/mx-chain-core-go/marshal" + vmcommon "github.com/multiversx/mx-chain-vm-common-go" + "github.com/stretchr/testify/require" + "github.com/multiversx/mx-chain-go/common" "github.com/multiversx/mx-chain-go/dataRetriever" dataComp "github.com/multiversx/mx-chain-go/factory/data" @@ -26,8 +29,6 @@ import ( storageManager "github.com/multiversx/mx-chain-go/testscommon/storage" trieMock "github.com/multiversx/mx-chain-go/testscommon/trie" "github.com/multiversx/mx-chain-go/trie" - vmcommon "github.com/multiversx/mx-chain-vm-common-go" - "github.com/stretchr/testify/require" ) func Test_newBlockProcessorCreatorForShard(t *testing.T) { diff --git a/factory/processing/processComponents.go b/factory/processing/processComponents.go index 0376a7235ce..ce7da0e7006 100644 --- a/factory/processing/processComponents.go +++ b/factory/processing/processComponents.go @@ -57,6 +57,7 @@ import ( "github.com/multiversx/mx-chain-go/process/factory/interceptorscontainer" "github.com/multiversx/mx-chain-go/process/headerCheck" "github.com/multiversx/mx-chain-go/process/heartbeat/validator" + interceptorFactory "github.com/multiversx/mx-chain-go/process/interceptors/factory" "github.com/multiversx/mx-chain-go/process/peer" "github.com/multiversx/mx-chain-go/process/receipts" "github.com/multiversx/mx-chain-go/process/smartContract" @@ -133,6 +134,7 @@ type processComponents struct { receiptsRepository mainFactory.ReceiptsRepository sentSignaturesTracker process.SentSignaturesTracker epochSystemSCProcessor process.EpochStartSystemSCProcessor + interceptedDataVerifierFactory process.InterceptedDataVerifierFactory } // ProcessComponentsFactoryArgs holds the arguments needed to create a process components factory @@ -208,6 +210,8 @@ type processComponentsFactory struct { genesisNonce uint64 genesisRound uint64 + + interceptedDataVerifierFactory process.InterceptedDataVerifierFactory } // NewProcessComponentsFactory will return a new instance of processComponentsFactory @@ -217,37 +221,43 @@ func NewProcessComponentsFactory(args ProcessComponentsFactoryArgs) (*processCom return nil, err } + interceptedDataVerifierFactory := interceptorFactory.NewInterceptedDataVerifierFactory(interceptorFactory.InterceptedDataVerifierFactoryArgs{ + CacheSpan: time.Duration(args.Config.InterceptedDataVerifier.CacheSpanInSec) * time.Second, + CacheExpiry: time.Duration(args.Config.InterceptedDataVerifier.CacheExpiryInSec) * time.Second, + }) + return &processComponentsFactory{ - config: args.Config, - epochConfig: args.EpochConfig, - prefConfigs: args.PrefConfigs, - importDBConfig: args.ImportDBConfig, - economicsConfig: args.EconomicsConfig, - accountsParser: args.AccountsParser, - smartContractParser: args.SmartContractParser, - gasSchedule: args.GasSchedule, - nodesCoordinator: args.NodesCoordinator, - data: args.Data, - coreData: args.CoreData, - crypto: args.Crypto, - state: args.State, - network: args.Network, - bootstrapComponents: args.BootstrapComponents, - statusComponents: args.StatusComponents, - requestedItemsHandler: args.RequestedItemsHandler, - whiteListHandler: args.WhiteListHandler, - whiteListerVerifiedTxs: args.WhiteListerVerifiedTxs, - maxRating: args.MaxRating, - systemSCConfig: args.SystemSCConfig, - importStartHandler: args.ImportStartHandler, - historyRepo: args.HistoryRepo, - epochNotifier: args.CoreData.EpochNotifier(), - statusCoreComponents: args.StatusCoreComponents, - flagsConfig: args.FlagsConfig, - txExecutionOrderHandler: args.TxExecutionOrderHandler, - genesisNonce: args.GenesisNonce, - genesisRound: args.GenesisRound, - roundConfig: args.RoundConfig, + config: args.Config, + epochConfig: args.EpochConfig, + prefConfigs: args.PrefConfigs, + importDBConfig: args.ImportDBConfig, + economicsConfig: args.EconomicsConfig, + accountsParser: args.AccountsParser, + smartContractParser: args.SmartContractParser, + gasSchedule: args.GasSchedule, + nodesCoordinator: args.NodesCoordinator, + data: args.Data, + coreData: args.CoreData, + crypto: args.Crypto, + state: args.State, + network: args.Network, + bootstrapComponents: args.BootstrapComponents, + statusComponents: args.StatusComponents, + requestedItemsHandler: args.RequestedItemsHandler, + whiteListHandler: args.WhiteListHandler, + whiteListerVerifiedTxs: args.WhiteListerVerifiedTxs, + maxRating: args.MaxRating, + systemSCConfig: args.SystemSCConfig, + importStartHandler: args.ImportStartHandler, + historyRepo: args.HistoryRepo, + epochNotifier: args.CoreData.EpochNotifier(), + statusCoreComponents: args.StatusCoreComponents, + flagsConfig: args.FlagsConfig, + txExecutionOrderHandler: args.TxExecutionOrderHandler, + genesisNonce: args.GenesisNonce, + genesisRound: args.GenesisRound, + roundConfig: args.RoundConfig, + interceptedDataVerifierFactory: interceptedDataVerifierFactory, }, nil } @@ -764,6 +774,7 @@ func (pcf *processComponentsFactory) Create() (*processComponents, error) { accountsParser: pcf.accountsParser, receiptsRepository: receiptsRepository, sentSignaturesTracker: sentSignaturesTracker, + interceptedDataVerifierFactory: pcf.interceptedDataVerifierFactory, }, nil } @@ -1668,36 +1679,37 @@ func (pcf *processComponentsFactory) newShardInterceptorContainerFactory( ) (process.InterceptorsContainerFactory, process.TimeCacher, error) { headerBlackList := cache.NewTimeCache(timeSpanForBadHeaders) shardInterceptorsContainerFactoryArgs := interceptorscontainer.CommonInterceptorsContainerFactoryArgs{ - CoreComponents: pcf.coreData, - CryptoComponents: pcf.crypto, - Accounts: pcf.state.AccountsAdapter(), - ShardCoordinator: pcf.bootstrapComponents.ShardCoordinator(), - NodesCoordinator: pcf.nodesCoordinator, - MainMessenger: pcf.network.NetworkMessenger(), - FullArchiveMessenger: pcf.network.FullArchiveNetworkMessenger(), - Store: pcf.data.StorageService(), - DataPool: pcf.data.Datapool(), - MaxTxNonceDeltaAllowed: common.MaxTxNonceDeltaAllowed, - TxFeeHandler: pcf.coreData.EconomicsData(), - BlockBlackList: headerBlackList, - HeaderSigVerifier: headerSigVerifier, - HeaderIntegrityVerifier: headerIntegrityVerifier, - ValidityAttester: validityAttester, - EpochStartTrigger: epochStartTrigger, - WhiteListHandler: pcf.whiteListHandler, - WhiteListerVerifiedTxs: pcf.whiteListerVerifiedTxs, - AntifloodHandler: pcf.network.InputAntiFloodHandler(), - ArgumentsParser: smartContract.NewArgumentParser(), - PreferredPeersHolder: pcf.network.PreferredPeersHolderHandler(), - SizeCheckDelta: pcf.config.Marshalizer.SizeCheckDelta, - RequestHandler: requestHandler, - PeerSignatureHandler: pcf.crypto.PeerSignatureHandler(), - SignaturesHandler: pcf.network.NetworkMessenger(), - HeartbeatExpiryTimespanInSec: pcf.config.HeartbeatV2.HeartbeatExpiryTimespanInSec, - MainPeerShardMapper: mainPeerShardMapper, - FullArchivePeerShardMapper: fullArchivePeerShardMapper, - HardforkTrigger: hardforkTrigger, - NodeOperationMode: nodeOperationMode, + CoreComponents: pcf.coreData, + CryptoComponents: pcf.crypto, + Accounts: pcf.state.AccountsAdapter(), + ShardCoordinator: pcf.bootstrapComponents.ShardCoordinator(), + NodesCoordinator: pcf.nodesCoordinator, + MainMessenger: pcf.network.NetworkMessenger(), + FullArchiveMessenger: pcf.network.FullArchiveNetworkMessenger(), + Store: pcf.data.StorageService(), + DataPool: pcf.data.Datapool(), + MaxTxNonceDeltaAllowed: common.MaxTxNonceDeltaAllowed, + TxFeeHandler: pcf.coreData.EconomicsData(), + BlockBlackList: headerBlackList, + HeaderSigVerifier: headerSigVerifier, + HeaderIntegrityVerifier: headerIntegrityVerifier, + ValidityAttester: validityAttester, + EpochStartTrigger: epochStartTrigger, + WhiteListHandler: pcf.whiteListHandler, + WhiteListerVerifiedTxs: pcf.whiteListerVerifiedTxs, + AntifloodHandler: pcf.network.InputAntiFloodHandler(), + ArgumentsParser: smartContract.NewArgumentParser(), + PreferredPeersHolder: pcf.network.PreferredPeersHolderHandler(), + SizeCheckDelta: pcf.config.Marshalizer.SizeCheckDelta, + RequestHandler: requestHandler, + PeerSignatureHandler: pcf.crypto.PeerSignatureHandler(), + SignaturesHandler: pcf.network.NetworkMessenger(), + HeartbeatExpiryTimespanInSec: pcf.config.HeartbeatV2.HeartbeatExpiryTimespanInSec, + MainPeerShardMapper: mainPeerShardMapper, + FullArchivePeerShardMapper: fullArchivePeerShardMapper, + HardforkTrigger: hardforkTrigger, + NodeOperationMode: nodeOperationMode, + InterceptedDataVerifierFactory: pcf.interceptedDataVerifierFactory, } interceptorContainerFactory, err := interceptorscontainer.NewShardInterceptorsContainerFactory(shardInterceptorsContainerFactoryArgs) @@ -1721,36 +1733,37 @@ func (pcf *processComponentsFactory) newMetaInterceptorContainerFactory( ) (process.InterceptorsContainerFactory, process.TimeCacher, error) { headerBlackList := cache.NewTimeCache(timeSpanForBadHeaders) metaInterceptorsContainerFactoryArgs := interceptorscontainer.CommonInterceptorsContainerFactoryArgs{ - CoreComponents: pcf.coreData, - CryptoComponents: pcf.crypto, - ShardCoordinator: pcf.bootstrapComponents.ShardCoordinator(), - NodesCoordinator: pcf.nodesCoordinator, - MainMessenger: pcf.network.NetworkMessenger(), - FullArchiveMessenger: pcf.network.FullArchiveNetworkMessenger(), - Store: pcf.data.StorageService(), - DataPool: pcf.data.Datapool(), - Accounts: pcf.state.AccountsAdapter(), - MaxTxNonceDeltaAllowed: common.MaxTxNonceDeltaAllowed, - TxFeeHandler: pcf.coreData.EconomicsData(), - BlockBlackList: headerBlackList, - HeaderSigVerifier: headerSigVerifier, - HeaderIntegrityVerifier: headerIntegrityVerifier, - ValidityAttester: validityAttester, - EpochStartTrigger: epochStartTrigger, - WhiteListHandler: pcf.whiteListHandler, - WhiteListerVerifiedTxs: pcf.whiteListerVerifiedTxs, - AntifloodHandler: pcf.network.InputAntiFloodHandler(), - ArgumentsParser: smartContract.NewArgumentParser(), - SizeCheckDelta: pcf.config.Marshalizer.SizeCheckDelta, - PreferredPeersHolder: pcf.network.PreferredPeersHolderHandler(), - RequestHandler: requestHandler, - PeerSignatureHandler: pcf.crypto.PeerSignatureHandler(), - SignaturesHandler: pcf.network.NetworkMessenger(), - HeartbeatExpiryTimespanInSec: pcf.config.HeartbeatV2.HeartbeatExpiryTimespanInSec, - MainPeerShardMapper: mainPeerShardMapper, - FullArchivePeerShardMapper: fullArchivePeerShardMapper, - HardforkTrigger: hardforkTrigger, - NodeOperationMode: nodeOperationMode, + CoreComponents: pcf.coreData, + CryptoComponents: pcf.crypto, + ShardCoordinator: pcf.bootstrapComponents.ShardCoordinator(), + NodesCoordinator: pcf.nodesCoordinator, + MainMessenger: pcf.network.NetworkMessenger(), + FullArchiveMessenger: pcf.network.FullArchiveNetworkMessenger(), + Store: pcf.data.StorageService(), + DataPool: pcf.data.Datapool(), + Accounts: pcf.state.AccountsAdapter(), + MaxTxNonceDeltaAllowed: common.MaxTxNonceDeltaAllowed, + TxFeeHandler: pcf.coreData.EconomicsData(), + BlockBlackList: headerBlackList, + HeaderSigVerifier: headerSigVerifier, + HeaderIntegrityVerifier: headerIntegrityVerifier, + ValidityAttester: validityAttester, + EpochStartTrigger: epochStartTrigger, + WhiteListHandler: pcf.whiteListHandler, + WhiteListerVerifiedTxs: pcf.whiteListerVerifiedTxs, + AntifloodHandler: pcf.network.InputAntiFloodHandler(), + ArgumentsParser: smartContract.NewArgumentParser(), + SizeCheckDelta: pcf.config.Marshalizer.SizeCheckDelta, + PreferredPeersHolder: pcf.network.PreferredPeersHolderHandler(), + RequestHandler: requestHandler, + PeerSignatureHandler: pcf.crypto.PeerSignatureHandler(), + SignaturesHandler: pcf.network.NetworkMessenger(), + HeartbeatExpiryTimespanInSec: pcf.config.HeartbeatV2.HeartbeatExpiryTimespanInSec, + MainPeerShardMapper: mainPeerShardMapper, + FullArchivePeerShardMapper: fullArchivePeerShardMapper, + HardforkTrigger: hardforkTrigger, + NodeOperationMode: nodeOperationMode, + InterceptedDataVerifierFactory: pcf.interceptedDataVerifierFactory, } interceptorContainerFactory, err := interceptorscontainer.NewMetaInterceptorsContainerFactory(metaInterceptorsContainerFactoryArgs) @@ -1850,6 +1863,7 @@ func (pcf *processComponentsFactory) createExportFactoryHandler( NumConcurrentTrieSyncers: pcf.config.TrieSync.NumConcurrentTrieSyncers, TrieSyncerVersion: pcf.config.TrieSync.TrieSyncerVersion, NodeOperationMode: nodeOperationMode, + InterceptedDataVerifierFactory: pcf.interceptedDataVerifierFactory, } return updateFactory.NewExportHandlerFactory(argsExporter) } @@ -2047,6 +2061,9 @@ func (pc *processComponents) Close() error { if !check.IfNil(pc.txsSender) { log.LogIfError(pc.txsSender.Close()) } + if !check.IfNil(pc.interceptedDataVerifierFactory) { + log.LogIfError(pc.interceptedDataVerifierFactory.Close()) + } return nil } diff --git a/factory/processing/processComponents_test.go b/factory/processing/processComponents_test.go index a1654ce3ba3..6ddf5ea2d8b 100644 --- a/factory/processing/processComponents_test.go +++ b/factory/processing/processComponents_test.go @@ -17,6 +17,8 @@ import ( "github.com/multiversx/mx-chain-core-go/hashing/blake2b" "github.com/multiversx/mx-chain-core-go/hashing/keccak" "github.com/multiversx/mx-chain-core-go/marshal" + "github.com/stretchr/testify/require" + "github.com/multiversx/mx-chain-go/common" "github.com/multiversx/mx-chain-go/common/factory" disabledStatistics "github.com/multiversx/mx-chain-go/common/statistics/disabled" @@ -55,7 +57,6 @@ import ( testState "github.com/multiversx/mx-chain-go/testscommon/state" "github.com/multiversx/mx-chain-go/testscommon/statusHandler" updateMocks "github.com/multiversx/mx-chain-go/update/mock" - "github.com/stretchr/testify/require" ) const ( diff --git a/go.mod b/go.mod index c02ddde8a66..0b1ef7d9632 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/klauspost/cpuid/v2 v2.2.5 github.com/mitchellh/mapstructure v1.5.0 github.com/multiversx/mx-chain-communication-go v1.0.15-0.20240508074652-e128a1c05c8e - github.com/multiversx/mx-chain-core-go v1.2.21-0.20240917083438-99280b4dc9b1 + github.com/multiversx/mx-chain-core-go v1.2.21-0.20240925111815-120b0b610b5a github.com/multiversx/mx-chain-crypto-go v1.2.12-0.20240508074452-cc21c1b505df github.com/multiversx/mx-chain-es-indexer-go v1.7.2-0.20240619122842-05143459c554 github.com/multiversx/mx-chain-logger-go v1.0.15-0.20240508072523-3f00a726af57 diff --git a/go.sum b/go.sum index c166db57fb2..4e99b95dc3d 100644 --- a/go.sum +++ b/go.sum @@ -387,8 +387,8 @@ github.com/multiversx/concurrent-map v0.1.4 h1:hdnbM8VE4b0KYJaGY5yJS2aNIW9TFFsUY github.com/multiversx/concurrent-map v0.1.4/go.mod h1:8cWFRJDOrWHOTNSqgYCUvwT7c7eFQ4U2vKMOp4A/9+o= github.com/multiversx/mx-chain-communication-go v1.0.15-0.20240508074652-e128a1c05c8e h1:Tsmwhu+UleE+l3buPuqXSKTqfu5FbPmzQ4MjMoUvCWA= github.com/multiversx/mx-chain-communication-go v1.0.15-0.20240508074652-e128a1c05c8e/go.mod h1:2yXl18wUbuV3cRZr7VHxM1xo73kTaC1WUcu2kx8R034= -github.com/multiversx/mx-chain-core-go v1.2.21-0.20240917083438-99280b4dc9b1 h1:AObGM2gvQrbFH45HrWBfhgpPRMAQkcAEsZrBN+Vi7ew= -github.com/multiversx/mx-chain-core-go v1.2.21-0.20240917083438-99280b4dc9b1/go.mod h1:B5zU4MFyJezmEzCsAHE9YNULmGCm2zbPHvl9hazNxmE= +github.com/multiversx/mx-chain-core-go v1.2.21-0.20240925111815-120b0b610b5a h1:YsPfyNONJsERG+MzJIHRZW6mVIHkUFc8YeKsb20YhhA= +github.com/multiversx/mx-chain-core-go v1.2.21-0.20240925111815-120b0b610b5a/go.mod h1:B5zU4MFyJezmEzCsAHE9YNULmGCm2zbPHvl9hazNxmE= github.com/multiversx/mx-chain-crypto-go v1.2.12-0.20240508074452-cc21c1b505df h1:clihfi78bMEOWk/qw6WA4uQbCM2e2NGliqswLAvw19k= github.com/multiversx/mx-chain-crypto-go v1.2.12-0.20240508074452-cc21c1b505df/go.mod h1:gtJYB4rR21KBSqJlazn+2z6f9gFSqQP3KvAgL7Qgxw4= github.com/multiversx/mx-chain-es-indexer-go v1.7.2-0.20240619122842-05143459c554 h1:Fv8BfzJSzdovmoh9Jh/by++0uGsOVBlMP3XiN5Svkn4= diff --git a/integrationTests/factory/bootstrapComponents/bootstrapComponents_test.go b/integrationTests/factory/bootstrapComponents/bootstrapComponents_test.go index 6c525ff9f12..2e9cb01e72a 100644 --- a/integrationTests/factory/bootstrapComponents/bootstrapComponents_test.go +++ b/integrationTests/factory/bootstrapComponents/bootstrapComponents_test.go @@ -6,10 +6,11 @@ import ( "time" "github.com/multiversx/mx-chain-core-go/data/endProcess" + "github.com/stretchr/testify/require" + "github.com/multiversx/mx-chain-go/integrationTests/factory" "github.com/multiversx/mx-chain-go/node" "github.com/multiversx/mx-chain-go/testscommon/goroutines" - "github.com/stretchr/testify/require" ) // ------------ Test BootstrapComponents -------------------- diff --git a/integrationTests/factory/consensusComponents/consensusComponents_test.go b/integrationTests/factory/consensusComponents/consensusComponents_test.go index 1e32c0c574b..d4b120a9636 100644 --- a/integrationTests/factory/consensusComponents/consensusComponents_test.go +++ b/integrationTests/factory/consensusComponents/consensusComponents_test.go @@ -6,13 +6,14 @@ import ( "time" "github.com/multiversx/mx-chain-core-go/data/endProcess" + "github.com/stretchr/testify/require" + "github.com/multiversx/mx-chain-go/common/forking" "github.com/multiversx/mx-chain-go/dataRetriever" bootstrapComp "github.com/multiversx/mx-chain-go/factory/bootstrap" "github.com/multiversx/mx-chain-go/integrationTests/factory" "github.com/multiversx/mx-chain-go/node" "github.com/multiversx/mx-chain-go/testscommon/goroutines" - "github.com/stretchr/testify/require" ) // ------------ Test TestConsensusComponents -------------------- diff --git a/integrationTests/factory/dataComponents/dataComponents_test.go b/integrationTests/factory/dataComponents/dataComponents_test.go index c28a41c6543..d26cf7aa01f 100644 --- a/integrationTests/factory/dataComponents/dataComponents_test.go +++ b/integrationTests/factory/dataComponents/dataComponents_test.go @@ -6,10 +6,11 @@ import ( "time" "github.com/multiversx/mx-chain-core-go/data/endProcess" + "github.com/stretchr/testify/require" + "github.com/multiversx/mx-chain-go/integrationTests/factory" "github.com/multiversx/mx-chain-go/node" "github.com/multiversx/mx-chain-go/testscommon/goroutines" - "github.com/stretchr/testify/require" ) func TestDataComponents_Create_Close_ShouldWork(t *testing.T) { @@ -36,6 +37,7 @@ func TestDataComponents_Create_Close_ShouldWork(t *testing.T) { require.Nil(t, err) managedNetworkComponents, err := nr.CreateManagedNetworkComponents(managedCoreComponents, managedStatusCoreComponents, managedCryptoComponents) require.Nil(t, err) + managedBootstrapComponents, err := nr.CreateManagedBootstrapComponents(managedStatusCoreComponents, managedCoreComponents, managedCryptoComponents, managedNetworkComponents) require.Nil(t, err) managedDataComponents, err := nr.CreateManagedDataComponents(managedStatusCoreComponents, managedCoreComponents, managedBootstrapComponents, managedCryptoComponents) diff --git a/integrationTests/factory/heartbeatComponents/heartbeatComponents_test.go b/integrationTests/factory/heartbeatComponents/heartbeatComponents_test.go index 1c541f524ff..889c4ff38f8 100644 --- a/integrationTests/factory/heartbeatComponents/heartbeatComponents_test.go +++ b/integrationTests/factory/heartbeatComponents/heartbeatComponents_test.go @@ -6,13 +6,14 @@ import ( "time" "github.com/multiversx/mx-chain-core-go/data/endProcess" + "github.com/stretchr/testify/require" + "github.com/multiversx/mx-chain-go/common/forking" "github.com/multiversx/mx-chain-go/dataRetriever" bootstrapComp "github.com/multiversx/mx-chain-go/factory/bootstrap" "github.com/multiversx/mx-chain-go/integrationTests/factory" "github.com/multiversx/mx-chain-go/node" "github.com/multiversx/mx-chain-go/testscommon/goroutines" - "github.com/stretchr/testify/require" ) // ------------ Test TestHeartbeatComponents -------------------- diff --git a/integrationTests/factory/processComponents/processComponents_test.go b/integrationTests/factory/processComponents/processComponents_test.go index 897a1289d2c..110a8869878 100644 --- a/integrationTests/factory/processComponents/processComponents_test.go +++ b/integrationTests/factory/processComponents/processComponents_test.go @@ -6,13 +6,14 @@ import ( "time" "github.com/multiversx/mx-chain-core-go/data/endProcess" + "github.com/stretchr/testify/require" + "github.com/multiversx/mx-chain-go/common/forking" "github.com/multiversx/mx-chain-go/dataRetriever" bootstrapComp "github.com/multiversx/mx-chain-go/factory/bootstrap" "github.com/multiversx/mx-chain-go/integrationTests/factory" "github.com/multiversx/mx-chain-go/node" "github.com/multiversx/mx-chain-go/testscommon/goroutines" - "github.com/stretchr/testify/require" ) // ------------ Test TestProcessComponents -------------------- diff --git a/integrationTests/factory/stateComponents/stateComponents_test.go b/integrationTests/factory/stateComponents/stateComponents_test.go index 3c942f54e53..ba93bdf8263 100644 --- a/integrationTests/factory/stateComponents/stateComponents_test.go +++ b/integrationTests/factory/stateComponents/stateComponents_test.go @@ -6,10 +6,11 @@ import ( "time" "github.com/multiversx/mx-chain-core-go/data/endProcess" + "github.com/stretchr/testify/require" + "github.com/multiversx/mx-chain-go/integrationTests/factory" "github.com/multiversx/mx-chain-go/node" "github.com/multiversx/mx-chain-go/testscommon/goroutines" - "github.com/stretchr/testify/require" ) func TestStateComponents_Create_Close_ShouldWork(t *testing.T) { diff --git a/integrationTests/factory/statusComponents/statusComponents_test.go b/integrationTests/factory/statusComponents/statusComponents_test.go index 85cfbd155f7..38527da6a41 100644 --- a/integrationTests/factory/statusComponents/statusComponents_test.go +++ b/integrationTests/factory/statusComponents/statusComponents_test.go @@ -6,13 +6,14 @@ import ( "time" "github.com/multiversx/mx-chain-core-go/data/endProcess" + "github.com/stretchr/testify/require" + "github.com/multiversx/mx-chain-go/common/forking" "github.com/multiversx/mx-chain-go/dataRetriever" bootstrapComp "github.com/multiversx/mx-chain-go/factory/bootstrap" "github.com/multiversx/mx-chain-go/integrationTests/factory" "github.com/multiversx/mx-chain-go/node" "github.com/multiversx/mx-chain-go/testscommon/goroutines" - "github.com/stretchr/testify/require" ) // ------------ Test StatusComponents -------------------- diff --git a/node/interface.go b/node/interface.go index 236e7a131e3..05330285fb6 100644 --- a/node/interface.go +++ b/node/interface.go @@ -4,8 +4,9 @@ import ( "io" "github.com/multiversx/mx-chain-core-go/core" - "github.com/multiversx/mx-chain-go/update" vmcommon "github.com/multiversx/mx-chain-vm-common-go" + + "github.com/multiversx/mx-chain-go/update" ) // NetworkShardingCollector defines the updating methods used by the network sharding component diff --git a/node/nodeRunner.go b/node/nodeRunner.go index 1378007ad64..f6fa53a660e 100644 --- a/node/nodeRunner.go +++ b/node/nodeRunner.go @@ -20,6 +20,8 @@ import ( "github.com/multiversx/mx-chain-core-go/core/throttler" "github.com/multiversx/mx-chain-core-go/data/endProcess" outportCore "github.com/multiversx/mx-chain-core-go/data/outport" + logger "github.com/multiversx/mx-chain-logger-go" + "github.com/multiversx/mx-chain-go/api/gin" "github.com/multiversx/mx-chain-go/api/shared" "github.com/multiversx/mx-chain-go/common" @@ -61,7 +63,6 @@ import ( "github.com/multiversx/mx-chain-go/storage/storageunit" trieStatistics "github.com/multiversx/mx-chain-go/trie/statistics" "github.com/multiversx/mx-chain-go/update/trigger" - logger "github.com/multiversx/mx-chain-logger-go" ) type nextOperationForNode int diff --git a/process/block/interceptedBlocks/interceptedBlockHeader.go b/process/block/interceptedBlocks/interceptedBlockHeader.go index 0cdb2cec703..9c009b8bb3f 100644 --- a/process/block/interceptedBlocks/interceptedBlockHeader.go +++ b/process/block/interceptedBlocks/interceptedBlockHeader.go @@ -6,10 +6,11 @@ import ( "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/data" "github.com/multiversx/mx-chain-core-go/hashing" + logger "github.com/multiversx/mx-chain-logger-go" + "github.com/multiversx/mx-chain-go/common" "github.com/multiversx/mx-chain-go/process" "github.com/multiversx/mx-chain-go/sharding" - logger "github.com/multiversx/mx-chain-logger-go" ) var _ process.HdrValidatorHandler = (*InterceptedHeader)(nil) diff --git a/process/block/interceptedBlocks/interceptedBlockHeader_test.go b/process/block/interceptedBlocks/interceptedBlockHeader_test.go index c2c8bf6d61a..462e040af50 100644 --- a/process/block/interceptedBlocks/interceptedBlockHeader_test.go +++ b/process/block/interceptedBlocks/interceptedBlockHeader_test.go @@ -11,6 +11,9 @@ import ( "github.com/multiversx/mx-chain-core-go/data/block" dataBlock "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-core-go/marshal" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/multiversx/mx-chain-go/common" "github.com/multiversx/mx-chain-go/process" "github.com/multiversx/mx-chain-go/process/block/interceptedBlocks" @@ -18,8 +21,6 @@ import ( "github.com/multiversx/mx-chain-go/testscommon/consensus" "github.com/multiversx/mx-chain-go/testscommon/enableEpochsHandlerMock" "github.com/multiversx/mx-chain-go/testscommon/hashingMocks" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) var testMarshalizer = &mock.MarshalizerMock{} @@ -173,7 +174,7 @@ func TestNewInterceptedHeader_MetachainForThisShardShouldWork(t *testing.T) { assert.True(t, inHdr.IsForCurrentShard()) } -//------- CheckValidity +//------- Verify func TestInterceptedHeader_CheckValidityNilPubKeyBitmapShouldErr(t *testing.T) { t.Parallel() diff --git a/process/block/interceptedBlocks/interceptedMetaBlockHeader.go b/process/block/interceptedBlocks/interceptedMetaBlockHeader.go index 050c457598c..1d2917c7cb8 100644 --- a/process/block/interceptedBlocks/interceptedMetaBlockHeader.go +++ b/process/block/interceptedBlocks/interceptedMetaBlockHeader.go @@ -8,10 +8,11 @@ import ( "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-core-go/hashing" "github.com/multiversx/mx-chain-core-go/marshal" + logger "github.com/multiversx/mx-chain-logger-go" + "github.com/multiversx/mx-chain-go/common" "github.com/multiversx/mx-chain-go/process" "github.com/multiversx/mx-chain-go/sharding" - logger "github.com/multiversx/mx-chain-logger-go" ) var _ process.HdrValidatorHandler = (*InterceptedMetaHeader)(nil) diff --git a/process/block/interceptedBlocks/interceptedMetaBlockHeader_test.go b/process/block/interceptedBlocks/interceptedMetaBlockHeader_test.go index e952e9fc476..b895a6a81cc 100644 --- a/process/block/interceptedBlocks/interceptedMetaBlockHeader_test.go +++ b/process/block/interceptedBlocks/interceptedMetaBlockHeader_test.go @@ -8,13 +8,14 @@ import ( "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/data" dataBlock "github.com/multiversx/mx-chain-core-go/data/block" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/multiversx/mx-chain-go/process" "github.com/multiversx/mx-chain-go/process/block/interceptedBlocks" "github.com/multiversx/mx-chain-go/process/mock" "github.com/multiversx/mx-chain-go/testscommon/consensus" "github.com/multiversx/mx-chain-go/testscommon/enableEpochsHandlerMock" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func createDefaultMetaArgument() *interceptedBlocks.ArgInterceptedBlockHeader { diff --git a/process/block/interceptedBlocks/interceptedMiniblock_test.go b/process/block/interceptedBlocks/interceptedMiniblock_test.go index 57d53ec251d..46b489b259d 100644 --- a/process/block/interceptedBlocks/interceptedMiniblock_test.go +++ b/process/block/interceptedBlocks/interceptedMiniblock_test.go @@ -5,10 +5,11 @@ import ( "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/data/block" + "github.com/stretchr/testify/assert" + "github.com/multiversx/mx-chain-go/process" "github.com/multiversx/mx-chain-go/process/block/interceptedBlocks" "github.com/multiversx/mx-chain-go/process/mock" - "github.com/stretchr/testify/assert" ) func createDefaultMiniblockArgument() *interceptedBlocks.ArgInterceptedMiniblock { @@ -69,7 +70,7 @@ func TestNewInterceptedMiniblock_ShouldWork(t *testing.T) { assert.Nil(t, err) } -//------- CheckValidity +//------- Verify func TestInterceptedMiniblock_InvalidReceiverShardIdShouldErr(t *testing.T) { t.Parallel() diff --git a/process/errors.go b/process/errors.go index 8edf7342ada..d6d38c75180 100644 --- a/process/errors.go +++ b/process/errors.go @@ -696,6 +696,9 @@ var ErrNilWhiteListHandler = errors.New("nil whitelist handler") // ErrNilPreferredPeersHolder signals that preferred peers holder is nil var ErrNilPreferredPeersHolder = errors.New("nil preferred peers holder") +// ErrNilInterceptedDataVerifier signals that intercepted data verifier is nil +var ErrNilInterceptedDataVerifier = errors.New("nil intercepted data verifier") + // ErrMiniBlocksInWrongOrder signals the miniblocks are in wrong order var ErrMiniBlocksInWrongOrder = errors.New("miniblocks in wrong order, should have been only from me") @@ -1095,6 +1098,9 @@ var ErrInvalidExpiryTimespan = errors.New("invalid expiry timespan") // ErrNilPeerSignatureHandler signals that a nil peer signature handler was provided var ErrNilPeerSignatureHandler = errors.New("nil peer signature handler") +// ErrNilInterceptedDataVerifierFactory signals that a nil intercepted data verifier factory was provided +var ErrNilInterceptedDataVerifierFactory = errors.New("nil intercepted data verifier factory") + // ErrNilPeerAuthenticationCacher signals that a nil peer authentication cacher was provided var ErrNilPeerAuthenticationCacher = errors.New("nil peer authentication cacher") @@ -1251,5 +1257,11 @@ var ErrNilEquivalentProofsPool = errors.New("nil equivalent proofs pool") // ErrNilHeaderProof signals that a nil header proof has been provided var ErrNilHeaderProof = errors.New("nil header proof") +// ErrNilInterceptedDataCache signals that a nil cacher was provided for intercepted data verifier +var ErrNilInterceptedDataCache = errors.New("nil cache for intercepted data") + // ErrFlagNotActive signals that a flag is not active var ErrFlagNotActive = errors.New("flag not active") + +// ErrInvalidInterceptedData signals that an invalid data has been intercepted +var ErrInvalidInterceptedData = errors.New("invalid intercepted data") diff --git a/process/factory/interceptorscontainer/args.go b/process/factory/interceptorscontainer/args.go index 294e66290b3..8e98c7c18ab 100644 --- a/process/factory/interceptorscontainer/args.go +++ b/process/factory/interceptorscontainer/args.go @@ -2,6 +2,7 @@ package interceptorscontainer import ( crypto "github.com/multiversx/mx-chain-crypto-go" + "github.com/multiversx/mx-chain-go/common" "github.com/multiversx/mx-chain-go/dataRetriever" "github.com/multiversx/mx-chain-go/heartbeat" @@ -13,34 +14,35 @@ import ( // CommonInterceptorsContainerFactoryArgs holds the arguments needed for the metachain/shard interceptors factories type CommonInterceptorsContainerFactoryArgs struct { - CoreComponents process.CoreComponentsHolder - CryptoComponents process.CryptoComponentsHolder - Accounts state.AccountsAdapter - ShardCoordinator sharding.Coordinator - NodesCoordinator nodesCoordinator.NodesCoordinator - MainMessenger process.TopicHandler - FullArchiveMessenger process.TopicHandler - Store dataRetriever.StorageService - DataPool dataRetriever.PoolsHolder - MaxTxNonceDeltaAllowed int - TxFeeHandler process.FeeHandler - BlockBlackList process.TimeCacher - HeaderSigVerifier process.InterceptedHeaderSigVerifier - HeaderIntegrityVerifier process.HeaderIntegrityVerifier - ValidityAttester process.ValidityAttester - EpochStartTrigger process.EpochStartTriggerHandler - WhiteListHandler process.WhiteListHandler - WhiteListerVerifiedTxs process.WhiteListHandler - AntifloodHandler process.P2PAntifloodHandler - ArgumentsParser process.ArgumentsParser - PreferredPeersHolder process.PreferredPeersHolderHandler - SizeCheckDelta uint32 - RequestHandler process.RequestHandler - PeerSignatureHandler crypto.PeerSignatureHandler - SignaturesHandler process.SignaturesHandler - HeartbeatExpiryTimespanInSec int64 - MainPeerShardMapper process.PeerShardMapper - FullArchivePeerShardMapper process.PeerShardMapper - HardforkTrigger heartbeat.HardforkTrigger - NodeOperationMode common.NodeOperation + CoreComponents process.CoreComponentsHolder + CryptoComponents process.CryptoComponentsHolder + Accounts state.AccountsAdapter + ShardCoordinator sharding.Coordinator + NodesCoordinator nodesCoordinator.NodesCoordinator + MainMessenger process.TopicHandler + FullArchiveMessenger process.TopicHandler + Store dataRetriever.StorageService + DataPool dataRetriever.PoolsHolder + MaxTxNonceDeltaAllowed int + TxFeeHandler process.FeeHandler + BlockBlackList process.TimeCacher + HeaderSigVerifier process.InterceptedHeaderSigVerifier + HeaderIntegrityVerifier process.HeaderIntegrityVerifier + ValidityAttester process.ValidityAttester + EpochStartTrigger process.EpochStartTriggerHandler + WhiteListHandler process.WhiteListHandler + WhiteListerVerifiedTxs process.WhiteListHandler + AntifloodHandler process.P2PAntifloodHandler + ArgumentsParser process.ArgumentsParser + PreferredPeersHolder process.PreferredPeersHolderHandler + SizeCheckDelta uint32 + RequestHandler process.RequestHandler + PeerSignatureHandler crypto.PeerSignatureHandler + SignaturesHandler process.SignaturesHandler + HeartbeatExpiryTimespanInSec int64 + MainPeerShardMapper process.PeerShardMapper + FullArchivePeerShardMapper process.PeerShardMapper + HardforkTrigger heartbeat.HardforkTrigger + NodeOperationMode common.NodeOperation + InterceptedDataVerifierFactory process.InterceptedDataVerifierFactory } diff --git a/process/factory/interceptorscontainer/baseInterceptorsContainerFactory.go b/process/factory/interceptorscontainer/baseInterceptorsContainerFactory.go index aaccb8de44e..b05e496475f 100644 --- a/process/factory/interceptorscontainer/baseInterceptorsContainerFactory.go +++ b/process/factory/interceptorscontainer/baseInterceptorsContainerFactory.go @@ -7,6 +7,7 @@ import ( "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/hashing" + "github.com/multiversx/mx-chain-go/common" "github.com/multiversx/mx-chain-go/dataRetriever" "github.com/multiversx/mx-chain-go/heartbeat" @@ -32,29 +33,30 @@ const ( ) type baseInterceptorsContainerFactory struct { - mainContainer process.InterceptorsContainer - fullArchiveContainer process.InterceptorsContainer - shardCoordinator sharding.Coordinator - accounts state.AccountsAdapter - store dataRetriever.StorageService - dataPool dataRetriever.PoolsHolder - mainMessenger process.TopicHandler - fullArchiveMessenger process.TopicHandler - nodesCoordinator nodesCoordinator.NodesCoordinator - blockBlackList process.TimeCacher - argInterceptorFactory *interceptorFactory.ArgInterceptedDataFactory - globalThrottler process.InterceptorThrottler - maxTxNonceDeltaAllowed int - antifloodHandler process.P2PAntifloodHandler - whiteListHandler process.WhiteListHandler - whiteListerVerifiedTxs process.WhiteListHandler - preferredPeersHolder process.PreferredPeersHolderHandler - hasher hashing.Hasher - requestHandler process.RequestHandler - mainPeerShardMapper process.PeerShardMapper - fullArchivePeerShardMapper process.PeerShardMapper - hardforkTrigger heartbeat.HardforkTrigger - nodeOperationMode common.NodeOperation + mainContainer process.InterceptorsContainer + fullArchiveContainer process.InterceptorsContainer + shardCoordinator sharding.Coordinator + accounts state.AccountsAdapter + store dataRetriever.StorageService + dataPool dataRetriever.PoolsHolder + mainMessenger process.TopicHandler + fullArchiveMessenger process.TopicHandler + nodesCoordinator nodesCoordinator.NodesCoordinator + blockBlackList process.TimeCacher + argInterceptorFactory *interceptorFactory.ArgInterceptedDataFactory + globalThrottler process.InterceptorThrottler + maxTxNonceDeltaAllowed int + antifloodHandler process.P2PAntifloodHandler + whiteListHandler process.WhiteListHandler + whiteListerVerifiedTxs process.WhiteListHandler + preferredPeersHolder process.PreferredPeersHolderHandler + hasher hashing.Hasher + requestHandler process.RequestHandler + mainPeerShardMapper process.PeerShardMapper + fullArchivePeerShardMapper process.PeerShardMapper + hardforkTrigger heartbeat.HardforkTrigger + nodeOperationMode common.NodeOperation + interceptedDataVerifierFactory process.InterceptedDataVerifierFactory } func checkBaseParams( @@ -286,18 +288,24 @@ func (bicf *baseInterceptorsContainerFactory) createOneTxInterceptor(topic strin return nil, err } + interceptedDataVerifier, err := bicf.interceptedDataVerifierFactory.Create(topic) + if err != nil { + return nil, err + } + internalMarshaller := bicf.argInterceptorFactory.CoreComponents.InternalMarshalizer() interceptor, err := interceptors.NewMultiDataInterceptor( interceptors.ArgMultiDataInterceptor{ - Topic: topic, - Marshalizer: internalMarshaller, - DataFactory: txFactory, - Processor: txProcessor, - Throttler: bicf.globalThrottler, - AntifloodHandler: bicf.antifloodHandler, - WhiteListRequest: bicf.whiteListHandler, - CurrentPeerId: bicf.mainMessenger.ID(), - PreferredPeersHolder: bicf.preferredPeersHolder, + Topic: topic, + Marshalizer: internalMarshaller, + DataFactory: txFactory, + Processor: txProcessor, + Throttler: bicf.globalThrottler, + AntifloodHandler: bicf.antifloodHandler, + WhiteListRequest: bicf.whiteListHandler, + CurrentPeerId: bicf.mainMessenger.ID(), + PreferredPeersHolder: bicf.preferredPeersHolder, + InterceptedDataVerifier: interceptedDataVerifier, }, ) if err != nil { @@ -329,18 +337,24 @@ func (bicf *baseInterceptorsContainerFactory) createOneUnsignedTxInterceptor(top return nil, err } + interceptedDataVerifier, err := bicf.interceptedDataVerifierFactory.Create(topic) + if err != nil { + return nil, err + } + internalMarshaller := bicf.argInterceptorFactory.CoreComponents.InternalMarshalizer() interceptor, err := interceptors.NewMultiDataInterceptor( interceptors.ArgMultiDataInterceptor{ - Topic: topic, - Marshalizer: internalMarshaller, - DataFactory: txFactory, - Processor: txProcessor, - Throttler: bicf.globalThrottler, - AntifloodHandler: bicf.antifloodHandler, - WhiteListRequest: bicf.whiteListHandler, - CurrentPeerId: bicf.mainMessenger.ID(), - PreferredPeersHolder: bicf.preferredPeersHolder, + Topic: topic, + Marshalizer: internalMarshaller, + DataFactory: txFactory, + Processor: txProcessor, + Throttler: bicf.globalThrottler, + AntifloodHandler: bicf.antifloodHandler, + WhiteListRequest: bicf.whiteListHandler, + CurrentPeerId: bicf.mainMessenger.ID(), + PreferredPeersHolder: bicf.preferredPeersHolder, + InterceptedDataVerifier: interceptedDataVerifier, }, ) if err != nil { @@ -372,18 +386,24 @@ func (bicf *baseInterceptorsContainerFactory) createOneRewardTxInterceptor(topic return nil, err } + interceptedDataVerifier, err := bicf.interceptedDataVerifierFactory.Create(topic) + if err != nil { + return nil, err + } + internalMarshaller := bicf.argInterceptorFactory.CoreComponents.InternalMarshalizer() interceptor, err := interceptors.NewMultiDataInterceptor( interceptors.ArgMultiDataInterceptor{ - Topic: topic, - Marshalizer: internalMarshaller, - DataFactory: txFactory, - Processor: txProcessor, - Throttler: bicf.globalThrottler, - AntifloodHandler: bicf.antifloodHandler, - WhiteListRequest: bicf.whiteListHandler, - CurrentPeerId: bicf.mainMessenger.ID(), - PreferredPeersHolder: bicf.preferredPeersHolder, + Topic: topic, + Marshalizer: internalMarshaller, + DataFactory: txFactory, + Processor: txProcessor, + Throttler: bicf.globalThrottler, + AntifloodHandler: bicf.antifloodHandler, + WhiteListRequest: bicf.whiteListHandler, + CurrentPeerId: bicf.mainMessenger.ID(), + PreferredPeersHolder: bicf.preferredPeersHolder, + InterceptedDataVerifier: interceptedDataVerifier, }, ) if err != nil { @@ -415,17 +435,23 @@ func (bicf *baseInterceptorsContainerFactory) generateHeaderInterceptors() error // compose header shard topic, for example: shardBlocks_0_META identifierHdr := factory.ShardBlocksTopic + shardC.CommunicationIdentifier(core.MetachainShardId) + interceptedDataVerifier, err := bicf.interceptedDataVerifierFactory.Create(identifierHdr) + if err != nil { + return err + } + // only one intrashard header topic interceptor, err := interceptors.NewSingleDataInterceptor( interceptors.ArgSingleDataInterceptor{ - Topic: identifierHdr, - DataFactory: hdrFactory, - Processor: hdrProcessor, - Throttler: bicf.globalThrottler, - AntifloodHandler: bicf.antifloodHandler, - WhiteListRequest: bicf.whiteListHandler, - CurrentPeerId: bicf.mainMessenger.ID(), - PreferredPeersHolder: bicf.preferredPeersHolder, + Topic: identifierHdr, + DataFactory: hdrFactory, + Processor: hdrProcessor, + Throttler: bicf.globalThrottler, + AntifloodHandler: bicf.antifloodHandler, + WhiteListRequest: bicf.whiteListHandler, + CurrentPeerId: bicf.mainMessenger.ID(), + PreferredPeersHolder: bicf.preferredPeersHolder, + InterceptedDataVerifier: interceptedDataVerifier, }, ) if err != nil { @@ -503,17 +529,23 @@ func (bicf *baseInterceptorsContainerFactory) createOneMiniBlocksInterceptor(top return nil, err } + interceptedDataVerifier, err := bicf.interceptedDataVerifierFactory.Create(topic) + if err != nil { + return nil, err + } + interceptor, err := interceptors.NewMultiDataInterceptor( interceptors.ArgMultiDataInterceptor{ - Topic: topic, - Marshalizer: internalMarshaller, - DataFactory: miniblockFactory, - Processor: miniblockProcessor, - Throttler: bicf.globalThrottler, - AntifloodHandler: bicf.antifloodHandler, - WhiteListRequest: bicf.whiteListHandler, - CurrentPeerId: bicf.mainMessenger.ID(), - PreferredPeersHolder: bicf.preferredPeersHolder, + Topic: topic, + Marshalizer: internalMarshaller, + DataFactory: miniblockFactory, + Processor: miniblockProcessor, + Throttler: bicf.globalThrottler, + AntifloodHandler: bicf.antifloodHandler, + WhiteListRequest: bicf.whiteListHandler, + CurrentPeerId: bicf.mainMessenger.ID(), + PreferredPeersHolder: bicf.preferredPeersHolder, + InterceptedDataVerifier: interceptedDataVerifier, }, ) if err != nil { @@ -542,17 +574,23 @@ func (bicf *baseInterceptorsContainerFactory) generateMetachainHeaderInterceptor return err } + interceptedDataVerifier, err := bicf.interceptedDataVerifierFactory.Create(identifierHdr) + if err != nil { + return err + } + // only one metachain header topic interceptor, err := interceptors.NewSingleDataInterceptor( interceptors.ArgSingleDataInterceptor{ - Topic: identifierHdr, - DataFactory: hdrFactory, - Processor: hdrProcessor, - Throttler: bicf.globalThrottler, - AntifloodHandler: bicf.antifloodHandler, - WhiteListRequest: bicf.whiteListHandler, - CurrentPeerId: bicf.mainMessenger.ID(), - PreferredPeersHolder: bicf.preferredPeersHolder, + Topic: identifierHdr, + DataFactory: hdrFactory, + Processor: hdrProcessor, + Throttler: bicf.globalThrottler, + AntifloodHandler: bicf.antifloodHandler, + WhiteListRequest: bicf.whiteListHandler, + CurrentPeerId: bicf.mainMessenger.ID(), + PreferredPeersHolder: bicf.preferredPeersHolder, + InterceptedDataVerifier: interceptedDataVerifier, }, ) if err != nil { @@ -578,18 +616,24 @@ func (bicf *baseInterceptorsContainerFactory) createOneTrieNodesInterceptor(topi return nil, err } + interceptedDataVerifier, err := bicf.interceptedDataVerifierFactory.Create(topic) + if err != nil { + return nil, err + } + internalMarshaller := bicf.argInterceptorFactory.CoreComponents.InternalMarshalizer() interceptor, err := interceptors.NewMultiDataInterceptor( interceptors.ArgMultiDataInterceptor{ - Topic: topic, - Marshalizer: internalMarshaller, - DataFactory: trieNodesFactory, - Processor: trieNodesProcessor, - Throttler: bicf.globalThrottler, - AntifloodHandler: bicf.antifloodHandler, - WhiteListRequest: bicf.whiteListHandler, - CurrentPeerId: bicf.mainMessenger.ID(), - PreferredPeersHolder: bicf.preferredPeersHolder, + Topic: topic, + Marshalizer: internalMarshaller, + DataFactory: trieNodesFactory, + Processor: trieNodesProcessor, + Throttler: bicf.globalThrottler, + AntifloodHandler: bicf.antifloodHandler, + WhiteListRequest: bicf.whiteListHandler, + CurrentPeerId: bicf.mainMessenger.ID(), + PreferredPeersHolder: bicf.preferredPeersHolder, + InterceptedDataVerifier: interceptedDataVerifier, }, ) if err != nil { @@ -670,17 +714,23 @@ func (bicf *baseInterceptorsContainerFactory) generatePeerAuthenticationIntercep return err } + interceptedDataVerifier, err := bicf.interceptedDataVerifierFactory.Create(identifierPeerAuthentication) + if err != nil { + return err + } + mdInterceptor, err := interceptors.NewMultiDataInterceptor( interceptors.ArgMultiDataInterceptor{ - Topic: identifierPeerAuthentication, - Marshalizer: internalMarshaller, - DataFactory: peerAuthenticationFactory, - Processor: peerAuthenticationProcessor, - Throttler: bicf.globalThrottler, - AntifloodHandler: bicf.antifloodHandler, - WhiteListRequest: bicf.whiteListHandler, - PreferredPeersHolder: bicf.preferredPeersHolder, - CurrentPeerId: bicf.mainMessenger.ID(), + Topic: identifierPeerAuthentication, + Marshalizer: internalMarshaller, + DataFactory: peerAuthenticationFactory, + Processor: peerAuthenticationProcessor, + Throttler: bicf.globalThrottler, + AntifloodHandler: bicf.antifloodHandler, + WhiteListRequest: bicf.whiteListHandler, + PreferredPeersHolder: bicf.preferredPeersHolder, + CurrentPeerId: bicf.mainMessenger.ID(), + InterceptedDataVerifier: interceptedDataVerifier, }, ) if err != nil { @@ -729,16 +779,22 @@ func (bicf *baseInterceptorsContainerFactory) createHeartbeatV2Interceptor( return nil, err } + interceptedDataVerifier, err := bicf.interceptedDataVerifierFactory.Create(identifier) + if err != nil { + return nil, err + } + interceptor, err := interceptors.NewSingleDataInterceptor( interceptors.ArgSingleDataInterceptor{ - Topic: identifier, - DataFactory: heartbeatFactory, - Processor: heartbeatProcessor, - Throttler: bicf.globalThrottler, - AntifloodHandler: bicf.antifloodHandler, - WhiteListRequest: bicf.whiteListHandler, - PreferredPeersHolder: bicf.preferredPeersHolder, - CurrentPeerId: bicf.mainMessenger.ID(), + Topic: identifier, + DataFactory: heartbeatFactory, + Processor: heartbeatProcessor, + Throttler: bicf.globalThrottler, + AntifloodHandler: bicf.antifloodHandler, + WhiteListRequest: bicf.whiteListHandler, + PreferredPeersHolder: bicf.preferredPeersHolder, + CurrentPeerId: bicf.mainMessenger.ID(), + InterceptedDataVerifier: interceptedDataVerifier, }, ) if err != nil { @@ -778,16 +834,22 @@ func (bicf *baseInterceptorsContainerFactory) createPeerShardInterceptor( return nil, err } + interceptedDataVerifier, err := bicf.interceptedDataVerifierFactory.Create(identifier) + if err != nil { + return nil, err + } + interceptor, err := interceptors.NewSingleDataInterceptor( interceptors.ArgSingleDataInterceptor{ - Topic: identifier, - DataFactory: interceptedPeerShardFactory, - Processor: psiProcessor, - Throttler: bicf.globalThrottler, - AntifloodHandler: bicf.antifloodHandler, - WhiteListRequest: bicf.whiteListHandler, - CurrentPeerId: bicf.mainMessenger.ID(), - PreferredPeersHolder: bicf.preferredPeersHolder, + Topic: identifier, + DataFactory: interceptedPeerShardFactory, + Processor: psiProcessor, + Throttler: bicf.globalThrottler, + AntifloodHandler: bicf.antifloodHandler, + WhiteListRequest: bicf.whiteListHandler, + CurrentPeerId: bicf.mainMessenger.ID(), + PreferredPeersHolder: bicf.preferredPeersHolder, + InterceptedDataVerifier: interceptedDataVerifier, }, ) if err != nil { @@ -815,17 +877,23 @@ func (bicf *baseInterceptorsContainerFactory) generateValidatorInfoInterceptor() return err } + interceptedDataVerifier, err := bicf.interceptedDataVerifierFactory.Create(identifier) + if err != nil { + return err + } + mdInterceptor, err := interceptors.NewMultiDataInterceptor( interceptors.ArgMultiDataInterceptor{ - Topic: identifier, - Marshalizer: internalMarshaller, - DataFactory: interceptedValidatorInfoFactory, - Processor: validatorInfoProcessor, - Throttler: bicf.globalThrottler, - AntifloodHandler: bicf.antifloodHandler, - WhiteListRequest: bicf.whiteListHandler, - PreferredPeersHolder: bicf.preferredPeersHolder, - CurrentPeerId: bicf.mainMessenger.ID(), + Topic: identifier, + Marshalizer: internalMarshaller, + DataFactory: interceptedValidatorInfoFactory, + Processor: validatorInfoProcessor, + Throttler: bicf.globalThrottler, + AntifloodHandler: bicf.antifloodHandler, + WhiteListRequest: bicf.whiteListHandler, + PreferredPeersHolder: bicf.preferredPeersHolder, + CurrentPeerId: bicf.mainMessenger.ID(), + InterceptedDataVerifier: interceptedDataVerifier, }, ) if err != nil { @@ -853,16 +921,22 @@ func (bicf *baseInterceptorsContainerFactory) createOneShardEquivalentProofsInte return nil, err } + interceptedDataVerifier, err := bicf.interceptedDataVerifierFactory.Create(topic) + if err != nil { + return nil, err + } + interceptor, err := interceptors.NewSingleDataInterceptor( interceptors.ArgSingleDataInterceptor{ - Topic: topic, - DataFactory: equivalentProofsFactory, - Processor: equivalentProofsProcessor, - Throttler: bicf.globalThrottler, - AntifloodHandler: bicf.antifloodHandler, - WhiteListRequest: bicf.whiteListHandler, - CurrentPeerId: bicf.mainMessenger.ID(), - PreferredPeersHolder: bicf.preferredPeersHolder, + Topic: topic, + DataFactory: equivalentProofsFactory, + Processor: equivalentProofsProcessor, + Throttler: bicf.globalThrottler, + AntifloodHandler: bicf.antifloodHandler, + WhiteListRequest: bicf.whiteListHandler, + CurrentPeerId: bicf.mainMessenger.ID(), + PreferredPeersHolder: bicf.preferredPeersHolder, + InterceptedDataVerifier: interceptedDataVerifier, }, ) if err != nil { diff --git a/process/factory/interceptorscontainer/metaInterceptorsContainerFactory.go b/process/factory/interceptorscontainer/metaInterceptorsContainerFactory.go index 3fee1b05430..24b70570e84 100644 --- a/process/factory/interceptorscontainer/metaInterceptorsContainerFactory.go +++ b/process/factory/interceptorscontainer/metaInterceptorsContainerFactory.go @@ -5,6 +5,7 @@ import ( "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/core/throttler" "github.com/multiversx/mx-chain-core-go/marshal" + "github.com/multiversx/mx-chain-go/common" "github.com/multiversx/mx-chain-go/process" "github.com/multiversx/mx-chain-go/process/factory" @@ -80,6 +81,9 @@ func NewMetaInterceptorsContainerFactory( if check.IfNil(args.PeerSignatureHandler) { return nil, process.ErrNilPeerSignatureHandler } + if check.IfNil(args.InterceptedDataVerifierFactory) { + return nil, process.ErrNilInterceptedDataVerifierFactory + } if args.HeartbeatExpiryTimespanInSec < minTimespanDurationInSec { return nil, process.ErrInvalidExpiryTimespan } @@ -103,28 +107,29 @@ func NewMetaInterceptorsContainerFactory( } base := &baseInterceptorsContainerFactory{ - mainContainer: containers.NewInterceptorsContainer(), - fullArchiveContainer: containers.NewInterceptorsContainer(), - shardCoordinator: args.ShardCoordinator, - mainMessenger: args.MainMessenger, - fullArchiveMessenger: args.FullArchiveMessenger, - store: args.Store, - dataPool: args.DataPool, - nodesCoordinator: args.NodesCoordinator, - blockBlackList: args.BlockBlackList, - argInterceptorFactory: argInterceptorFactory, - maxTxNonceDeltaAllowed: args.MaxTxNonceDeltaAllowed, - accounts: args.Accounts, - antifloodHandler: args.AntifloodHandler, - whiteListHandler: args.WhiteListHandler, - whiteListerVerifiedTxs: args.WhiteListerVerifiedTxs, - preferredPeersHolder: args.PreferredPeersHolder, - hasher: args.CoreComponents.Hasher(), - requestHandler: args.RequestHandler, - mainPeerShardMapper: args.MainPeerShardMapper, - fullArchivePeerShardMapper: args.FullArchivePeerShardMapper, - hardforkTrigger: args.HardforkTrigger, - nodeOperationMode: args.NodeOperationMode, + mainContainer: containers.NewInterceptorsContainer(), + fullArchiveContainer: containers.NewInterceptorsContainer(), + shardCoordinator: args.ShardCoordinator, + mainMessenger: args.MainMessenger, + fullArchiveMessenger: args.FullArchiveMessenger, + store: args.Store, + dataPool: args.DataPool, + nodesCoordinator: args.NodesCoordinator, + blockBlackList: args.BlockBlackList, + argInterceptorFactory: argInterceptorFactory, + maxTxNonceDeltaAllowed: args.MaxTxNonceDeltaAllowed, + accounts: args.Accounts, + antifloodHandler: args.AntifloodHandler, + whiteListHandler: args.WhiteListHandler, + whiteListerVerifiedTxs: args.WhiteListerVerifiedTxs, + preferredPeersHolder: args.PreferredPeersHolder, + hasher: args.CoreComponents.Hasher(), + requestHandler: args.RequestHandler, + mainPeerShardMapper: args.MainPeerShardMapper, + fullArchivePeerShardMapper: args.FullArchivePeerShardMapper, + hardforkTrigger: args.HardforkTrigger, + nodeOperationMode: args.NodeOperationMode, + interceptedDataVerifierFactory: args.InterceptedDataVerifierFactory, } icf := &metaInterceptorsContainerFactory{ @@ -267,16 +272,22 @@ func (micf *metaInterceptorsContainerFactory) createOneShardHeaderInterceptor(to return nil, err } + interceptedDataVerifier, err := micf.interceptedDataVerifierFactory.Create(topic) + if err != nil { + return nil, err + } + interceptor, err := processInterceptors.NewSingleDataInterceptor( processInterceptors.ArgSingleDataInterceptor{ - Topic: topic, - DataFactory: hdrFactory, - Processor: hdrProcessor, - Throttler: micf.globalThrottler, - AntifloodHandler: micf.antifloodHandler, - WhiteListRequest: micf.whiteListHandler, - CurrentPeerId: micf.mainMessenger.ID(), - PreferredPeersHolder: micf.preferredPeersHolder, + Topic: topic, + DataFactory: hdrFactory, + Processor: hdrProcessor, + Throttler: micf.globalThrottler, + AntifloodHandler: micf.antifloodHandler, + WhiteListRequest: micf.whiteListHandler, + CurrentPeerId: micf.mainMessenger.ID(), + PreferredPeersHolder: micf.preferredPeersHolder, + InterceptedDataVerifier: interceptedDataVerifier, }, ) if err != nil { diff --git a/process/factory/interceptorscontainer/metaInterceptorsContainerFactory_test.go b/process/factory/interceptorscontainer/metaInterceptorsContainerFactory_test.go index c27d0607452..927742f00fa 100644 --- a/process/factory/interceptorscontainer/metaInterceptorsContainerFactory_test.go +++ b/process/factory/interceptorscontainer/metaInterceptorsContainerFactory_test.go @@ -400,6 +400,18 @@ func TestNewMetaInterceptorsContainerFactory_NilPeerSignatureHandler(t *testing. assert.Equal(t, process.ErrNilPeerSignatureHandler, err) } +func TestNewMetaInterceptorsContainerFactory_NilInterceptedDataVerifierFactory(t *testing.T) { + t.Parallel() + + coreComp, cryptoComp := createMockComponentHolders() + args := getArgumentsShard(coreComp, cryptoComp) + args.InterceptedDataVerifierFactory = nil + icf, err := interceptorscontainer.NewMetaInterceptorsContainerFactory(args) + + assert.Nil(t, icf) + assert.Equal(t, process.ErrNilInterceptedDataVerifierFactory, err) +} + func TestNewMetaInterceptorsContainerFactory_InvalidExpiryTimespan(t *testing.T) { t.Parallel() @@ -546,6 +558,7 @@ func testCreateMetaTopicShouldFail(matchStrToErrOnCreate string, matchStrToErrOn } else { args.MainMessenger = createMetaStubTopicHandler(matchStrToErrOnCreate, matchStrToErrOnRegister) } + args.InterceptedDataVerifierFactory = &mock.InterceptedDataVerifierFactoryMock{} icf, _ := interceptorscontainer.NewMetaInterceptorsContainerFactory(args) mainContainer, fullArchiveConatiner, err := icf.Create() @@ -561,6 +574,7 @@ func TestMetaInterceptorsContainerFactory_CreateShouldWork(t *testing.T) { coreComp, cryptoComp := createMockComponentHolders() args := getArgumentsMeta(coreComp, cryptoComp) + args.InterceptedDataVerifierFactory = &mock.InterceptedDataVerifierFactoryMock{} icf, _ := interceptorscontainer.NewMetaInterceptorsContainerFactory(args) mainContainer, fullArchiveContainer, err := icf.Create() @@ -593,6 +607,8 @@ func TestMetaInterceptorsContainerFactory_With4ShardsShouldWork(t *testing.T) { args := getArgumentsMeta(coreComp, cryptoComp) args.ShardCoordinator = shardCoordinator args.NodesCoordinator = nodesCoordinator + args.InterceptedDataVerifierFactory = &mock.InterceptedDataVerifierFactoryMock{} + icf, err := interceptorscontainer.NewMetaInterceptorsContainerFactory(args) require.Nil(t, err) @@ -643,6 +659,7 @@ func TestMetaInterceptorsContainerFactory_With4ShardsShouldWork(t *testing.T) { args.NodeOperationMode = common.FullArchiveMode args.ShardCoordinator = shardCoordinator args.NodesCoordinator = nodesCoordinator + args.InterceptedDataVerifierFactory = &mock.InterceptedDataVerifierFactoryMock{} icf, err := interceptorscontainer.NewMetaInterceptorsContainerFactory(args) require.Nil(t, err) @@ -685,34 +702,35 @@ func getArgumentsMeta( cryptoComp *mock.CryptoComponentsMock, ) interceptorscontainer.CommonInterceptorsContainerFactoryArgs { return interceptorscontainer.CommonInterceptorsContainerFactoryArgs{ - CoreComponents: coreComp, - CryptoComponents: cryptoComp, - Accounts: &stateMock.AccountsStub{}, - ShardCoordinator: mock.NewOneShardCoordinatorMock(), - NodesCoordinator: shardingMocks.NewNodesCoordinatorMock(), - MainMessenger: &mock.TopicHandlerStub{}, - FullArchiveMessenger: &mock.TopicHandlerStub{}, - Store: createMetaStore(), - DataPool: createMetaDataPools(), - MaxTxNonceDeltaAllowed: maxTxNonceDeltaAllowed, - TxFeeHandler: &economicsmocks.EconomicsHandlerStub{}, - BlockBlackList: &testscommon.TimeCacheStub{}, - HeaderSigVerifier: &consensus.HeaderSigVerifierMock{}, - HeaderIntegrityVerifier: &mock.HeaderIntegrityVerifierStub{}, - ValidityAttester: &mock.ValidityAttesterStub{}, - EpochStartTrigger: &mock.EpochStartTriggerStub{}, - WhiteListHandler: &testscommon.WhiteListHandlerStub{}, - WhiteListerVerifiedTxs: &testscommon.WhiteListHandlerStub{}, - AntifloodHandler: &mock.P2PAntifloodHandlerStub{}, - ArgumentsParser: &mock.ArgumentParserMock{}, - PreferredPeersHolder: &p2pmocks.PeersHolderStub{}, - RequestHandler: &testscommon.RequestHandlerStub{}, - PeerSignatureHandler: &mock.PeerSignatureHandlerStub{}, - SignaturesHandler: &mock.SignaturesHandlerStub{}, - HeartbeatExpiryTimespanInSec: 30, - MainPeerShardMapper: &p2pmocks.NetworkShardingCollectorStub{}, - FullArchivePeerShardMapper: &p2pmocks.NetworkShardingCollectorStub{}, - HardforkTrigger: &testscommon.HardforkTriggerStub{}, - NodeOperationMode: common.NormalOperation, + CoreComponents: coreComp, + CryptoComponents: cryptoComp, + Accounts: &stateMock.AccountsStub{}, + ShardCoordinator: mock.NewOneShardCoordinatorMock(), + NodesCoordinator: shardingMocks.NewNodesCoordinatorMock(), + MainMessenger: &mock.TopicHandlerStub{}, + FullArchiveMessenger: &mock.TopicHandlerStub{}, + Store: createMetaStore(), + DataPool: createMetaDataPools(), + MaxTxNonceDeltaAllowed: maxTxNonceDeltaAllowed, + TxFeeHandler: &economicsmocks.EconomicsHandlerStub{}, + BlockBlackList: &testscommon.TimeCacheStub{}, + HeaderSigVerifier: &consensus.HeaderSigVerifierMock{}, + HeaderIntegrityVerifier: &mock.HeaderIntegrityVerifierStub{}, + ValidityAttester: &mock.ValidityAttesterStub{}, + EpochStartTrigger: &mock.EpochStartTriggerStub{}, + WhiteListHandler: &testscommon.WhiteListHandlerStub{}, + WhiteListerVerifiedTxs: &testscommon.WhiteListHandlerStub{}, + AntifloodHandler: &mock.P2PAntifloodHandlerStub{}, + ArgumentsParser: &mock.ArgumentParserMock{}, + PreferredPeersHolder: &p2pmocks.PeersHolderStub{}, + RequestHandler: &testscommon.RequestHandlerStub{}, + PeerSignatureHandler: &mock.PeerSignatureHandlerStub{}, + SignaturesHandler: &mock.SignaturesHandlerStub{}, + HeartbeatExpiryTimespanInSec: 30, + MainPeerShardMapper: &p2pmocks.NetworkShardingCollectorStub{}, + FullArchivePeerShardMapper: &p2pmocks.NetworkShardingCollectorStub{}, + HardforkTrigger: &testscommon.HardforkTriggerStub{}, + NodeOperationMode: common.NormalOperation, + InterceptedDataVerifierFactory: &mock.InterceptedDataVerifierFactoryMock{}, } } diff --git a/process/factory/interceptorscontainer/shardInterceptorsContainerFactory.go b/process/factory/interceptorscontainer/shardInterceptorsContainerFactory.go index c0f3551d472..7acd6d87e59 100644 --- a/process/factory/interceptorscontainer/shardInterceptorsContainerFactory.go +++ b/process/factory/interceptorscontainer/shardInterceptorsContainerFactory.go @@ -5,7 +5,9 @@ import ( "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/core/throttler" "github.com/multiversx/mx-chain-core-go/marshal" + "github.com/multiversx/mx-chain-go/common" + "github.com/multiversx/mx-chain-go/process" "github.com/multiversx/mx-chain-go/process/factory" "github.com/multiversx/mx-chain-go/process/factory/containers" @@ -79,6 +81,9 @@ func NewShardInterceptorsContainerFactory( if check.IfNil(args.PeerSignatureHandler) { return nil, process.ErrNilPeerSignatureHandler } + if check.IfNil(args.InterceptedDataVerifierFactory) { + return nil, process.ErrNilInterceptedDataVerifierFactory + } if args.HeartbeatExpiryTimespanInSec < minTimespanDurationInSec { return nil, process.ErrInvalidExpiryTimespan } @@ -102,28 +107,29 @@ func NewShardInterceptorsContainerFactory( } base := &baseInterceptorsContainerFactory{ - mainContainer: containers.NewInterceptorsContainer(), - fullArchiveContainer: containers.NewInterceptorsContainer(), - accounts: args.Accounts, - shardCoordinator: args.ShardCoordinator, - mainMessenger: args.MainMessenger, - fullArchiveMessenger: args.FullArchiveMessenger, - store: args.Store, - dataPool: args.DataPool, - nodesCoordinator: args.NodesCoordinator, - argInterceptorFactory: argInterceptorFactory, - blockBlackList: args.BlockBlackList, - maxTxNonceDeltaAllowed: args.MaxTxNonceDeltaAllowed, - antifloodHandler: args.AntifloodHandler, - whiteListHandler: args.WhiteListHandler, - whiteListerVerifiedTxs: args.WhiteListerVerifiedTxs, - preferredPeersHolder: args.PreferredPeersHolder, - hasher: args.CoreComponents.Hasher(), - requestHandler: args.RequestHandler, - mainPeerShardMapper: args.MainPeerShardMapper, - fullArchivePeerShardMapper: args.FullArchivePeerShardMapper, - hardforkTrigger: args.HardforkTrigger, - nodeOperationMode: args.NodeOperationMode, + mainContainer: containers.NewInterceptorsContainer(), + fullArchiveContainer: containers.NewInterceptorsContainer(), + accounts: args.Accounts, + shardCoordinator: args.ShardCoordinator, + mainMessenger: args.MainMessenger, + fullArchiveMessenger: args.FullArchiveMessenger, + store: args.Store, + dataPool: args.DataPool, + nodesCoordinator: args.NodesCoordinator, + argInterceptorFactory: argInterceptorFactory, + blockBlackList: args.BlockBlackList, + maxTxNonceDeltaAllowed: args.MaxTxNonceDeltaAllowed, + antifloodHandler: args.AntifloodHandler, + whiteListHandler: args.WhiteListHandler, + whiteListerVerifiedTxs: args.WhiteListerVerifiedTxs, + preferredPeersHolder: args.PreferredPeersHolder, + hasher: args.CoreComponents.Hasher(), + requestHandler: args.RequestHandler, + mainPeerShardMapper: args.MainPeerShardMapper, + fullArchivePeerShardMapper: args.FullArchivePeerShardMapper, + hardforkTrigger: args.HardforkTrigger, + nodeOperationMode: args.NodeOperationMode, + interceptedDataVerifierFactory: args.InterceptedDataVerifierFactory, } icf := &shardInterceptorsContainerFactory{ diff --git a/process/factory/interceptorscontainer/shardInterceptorsContainerFactory_test.go b/process/factory/interceptorscontainer/shardInterceptorsContainerFactory_test.go index 549f1fdc15a..5d3ff1fcda1 100644 --- a/process/factory/interceptorscontainer/shardInterceptorsContainerFactory_test.go +++ b/process/factory/interceptorscontainer/shardInterceptorsContainerFactory_test.go @@ -356,6 +356,18 @@ func TestNewShardInterceptorsContainerFactory_NilValidityAttesterShouldErr(t *te assert.Equal(t, process.ErrNilValidityAttester, err) } +func TestNewShardInterceptorsContainerFactory_NilInterceptedDataVerifierFactory(t *testing.T) { + t.Parallel() + + coreComp, cryptoComp := createMockComponentHolders() + args := getArgumentsShard(coreComp, cryptoComp) + args.InterceptedDataVerifierFactory = nil + icf, err := interceptorscontainer.NewShardInterceptorsContainerFactory(args) + + assert.Nil(t, icf) + assert.Equal(t, process.ErrNilInterceptedDataVerifierFactory, err) +} + func TestNewShardInterceptorsContainerFactory_InvalidChainIDShouldErr(t *testing.T) { t.Parallel() @@ -497,6 +509,7 @@ func testCreateShardTopicShouldFail(matchStrToErrOnCreate string, matchStrToErrO coreComp, cryptoComp := createMockComponentHolders() args := getArgumentsShard(coreComp, cryptoComp) + args.InterceptedDataVerifierFactory = &mock.InterceptedDataVerifierFactoryMock{} if strings.Contains(t.Name(), "full_archive") { args.NodeOperationMode = common.FullArchiveMode args.FullArchiveMessenger = createShardStubTopicHandler(matchStrToErrOnCreate, matchStrToErrOnRegister) @@ -563,6 +576,7 @@ func TestShardInterceptorsContainerFactory_CreateShouldWork(t *testing.T) { }, } args.WhiteListerVerifiedTxs = &testscommon.WhiteListHandlerStub{} + args.InterceptedDataVerifierFactory = &mock.InterceptedDataVerifierFactoryMock{} icf, _ := interceptorscontainer.NewShardInterceptorsContainerFactory(args) @@ -598,6 +612,7 @@ func TestShardInterceptorsContainerFactory_With4ShardsShouldWork(t *testing.T) { args.ShardCoordinator = shardCoordinator args.NodesCoordinator = nodesCoordinator args.PreferredPeersHolder = &p2pmocks.PeersHolderStub{} + args.InterceptedDataVerifierFactory = &mock.InterceptedDataVerifierFactoryMock{} icf, _ := interceptorscontainer.NewShardInterceptorsContainerFactory(args) @@ -648,6 +663,7 @@ func TestShardInterceptorsContainerFactory_With4ShardsShouldWork(t *testing.T) { args.ShardCoordinator = shardCoordinator args.NodesCoordinator = nodesCoordinator args.PreferredPeersHolder = &p2pmocks.PeersHolderStub{} + args.InterceptedDataVerifierFactory = &mock.InterceptedDataVerifierFactoryMock{} icf, _ := interceptorscontainer.NewShardInterceptorsContainerFactory(args) @@ -712,34 +728,35 @@ func getArgumentsShard( cryptoComp *mock.CryptoComponentsMock, ) interceptorscontainer.CommonInterceptorsContainerFactoryArgs { return interceptorscontainer.CommonInterceptorsContainerFactoryArgs{ - CoreComponents: coreComp, - CryptoComponents: cryptoComp, - Accounts: &stateMock.AccountsStub{}, - ShardCoordinator: mock.NewOneShardCoordinatorMock(), - NodesCoordinator: shardingMocks.NewNodesCoordinatorMock(), - MainMessenger: &mock.TopicHandlerStub{}, - FullArchiveMessenger: &mock.TopicHandlerStub{}, - Store: createShardStore(), - DataPool: createShardDataPools(), - MaxTxNonceDeltaAllowed: maxTxNonceDeltaAllowed, - TxFeeHandler: &economicsmocks.EconomicsHandlerStub{}, - BlockBlackList: &testscommon.TimeCacheStub{}, - HeaderSigVerifier: &consensus.HeaderSigVerifierMock{}, - HeaderIntegrityVerifier: &mock.HeaderIntegrityVerifierStub{}, - SizeCheckDelta: 0, - ValidityAttester: &mock.ValidityAttesterStub{}, - EpochStartTrigger: &mock.EpochStartTriggerStub{}, - AntifloodHandler: &mock.P2PAntifloodHandlerStub{}, - WhiteListHandler: &testscommon.WhiteListHandlerStub{}, - WhiteListerVerifiedTxs: &testscommon.WhiteListHandlerStub{}, - ArgumentsParser: &mock.ArgumentParserMock{}, - PreferredPeersHolder: &p2pmocks.PeersHolderStub{}, - RequestHandler: &testscommon.RequestHandlerStub{}, - PeerSignatureHandler: &mock.PeerSignatureHandlerStub{}, - SignaturesHandler: &mock.SignaturesHandlerStub{}, - HeartbeatExpiryTimespanInSec: 30, - MainPeerShardMapper: &p2pmocks.NetworkShardingCollectorStub{}, - FullArchivePeerShardMapper: &p2pmocks.NetworkShardingCollectorStub{}, - HardforkTrigger: &testscommon.HardforkTriggerStub{}, + CoreComponents: coreComp, + CryptoComponents: cryptoComp, + Accounts: &stateMock.AccountsStub{}, + ShardCoordinator: mock.NewOneShardCoordinatorMock(), + NodesCoordinator: shardingMocks.NewNodesCoordinatorMock(), + MainMessenger: &mock.TopicHandlerStub{}, + FullArchiveMessenger: &mock.TopicHandlerStub{}, + Store: createShardStore(), + DataPool: createShardDataPools(), + MaxTxNonceDeltaAllowed: maxTxNonceDeltaAllowed, + TxFeeHandler: &economicsmocks.EconomicsHandlerStub{}, + BlockBlackList: &testscommon.TimeCacheStub{}, + HeaderSigVerifier: &consensus.HeaderSigVerifierMock{}, + HeaderIntegrityVerifier: &mock.HeaderIntegrityVerifierStub{}, + SizeCheckDelta: 0, + ValidityAttester: &mock.ValidityAttesterStub{}, + EpochStartTrigger: &mock.EpochStartTriggerStub{}, + AntifloodHandler: &mock.P2PAntifloodHandlerStub{}, + WhiteListHandler: &testscommon.WhiteListHandlerStub{}, + WhiteListerVerifiedTxs: &testscommon.WhiteListHandlerStub{}, + ArgumentsParser: &mock.ArgumentParserMock{}, + PreferredPeersHolder: &p2pmocks.PeersHolderStub{}, + RequestHandler: &testscommon.RequestHandlerStub{}, + PeerSignatureHandler: &mock.PeerSignatureHandlerStub{}, + SignaturesHandler: &mock.SignaturesHandlerStub{}, + HeartbeatExpiryTimespanInSec: 30, + MainPeerShardMapper: &p2pmocks.NetworkShardingCollectorStub{}, + FullArchivePeerShardMapper: &p2pmocks.NetworkShardingCollectorStub{}, + HardforkTrigger: &testscommon.HardforkTriggerStub{}, + InterceptedDataVerifierFactory: &mock.InterceptedDataVerifierFactoryMock{}, } } diff --git a/process/interceptors/baseDataInterceptor.go b/process/interceptors/baseDataInterceptor.go index 64efb852238..cec00abd756 100644 --- a/process/interceptors/baseDataInterceptor.go +++ b/process/interceptors/baseDataInterceptor.go @@ -6,19 +6,21 @@ import ( "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/core/check" + "github.com/multiversx/mx-chain-go/p2p" "github.com/multiversx/mx-chain-go/process" ) type baseDataInterceptor struct { - throttler process.InterceptorThrottler - antifloodHandler process.P2PAntifloodHandler - topic string - currentPeerId core.PeerID - processor process.InterceptorProcessor - mutDebugHandler sync.RWMutex - debugHandler process.InterceptedDebugger - preferredPeersHolder process.PreferredPeersHolderHandler + throttler process.InterceptorThrottler + antifloodHandler process.P2PAntifloodHandler + topic string + currentPeerId core.PeerID + processor process.InterceptorProcessor + mutDebugHandler sync.RWMutex + debugHandler process.InterceptedDebugger + preferredPeersHolder process.PreferredPeersHolderHandler + interceptedDataVerifier process.InterceptedDataVerifier } func (bdi *baseDataInterceptor) preProcessMesage(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error { diff --git a/process/interceptors/factory/interceptedDataVerifierFactory.go b/process/interceptors/factory/interceptedDataVerifierFactory.go new file mode 100644 index 00000000000..2775bbdc61a --- /dev/null +++ b/process/interceptors/factory/interceptedDataVerifierFactory.go @@ -0,0 +1,72 @@ +package factory + +import ( + "fmt" + "sync" + "time" + + "github.com/multiversx/mx-chain-go/process" + "github.com/multiversx/mx-chain-go/process/interceptors" + "github.com/multiversx/mx-chain-go/storage" + "github.com/multiversx/mx-chain-go/storage/cache" +) + +// InterceptedDataVerifierFactoryArgs holds the required arguments for interceptedDataVerifierFactory +type InterceptedDataVerifierFactoryArgs struct { + CacheSpan time.Duration + CacheExpiry time.Duration +} + +// interceptedDataVerifierFactory encapsulates the required arguments to create InterceptedDataVerifier +// Furthermore it will hold all such instances in an internal map. +type interceptedDataVerifierFactory struct { + cacheSpan time.Duration + cacheExpiry time.Duration + + interceptedDataVerifierMap map[string]storage.Cacher + mutex sync.Mutex +} + +// NewInterceptedDataVerifierFactory will create a factory instance that will create instance of InterceptedDataVerifiers +func NewInterceptedDataVerifierFactory(args InterceptedDataVerifierFactoryArgs) *interceptedDataVerifierFactory { + return &interceptedDataVerifierFactory{ + cacheSpan: args.CacheSpan, + cacheExpiry: args.CacheExpiry, + interceptedDataVerifierMap: make(map[string]storage.Cacher), + mutex: sync.Mutex{}, + } +} + +// Create will return an instance of InterceptedDataVerifier +func (idvf *interceptedDataVerifierFactory) Create(topic string) (process.InterceptedDataVerifier, error) { + internalCache, err := cache.NewTimeCacher(cache.ArgTimeCacher{ + DefaultSpan: idvf.cacheSpan, + CacheExpiry: idvf.cacheExpiry, + }) + if err != nil { + return nil, err + } + + idvf.mutex.Lock() + idvf.interceptedDataVerifierMap[topic] = internalCache + idvf.mutex.Unlock() + + return interceptors.NewInterceptedDataVerifier(internalCache) +} + +// Close will close all the sweeping routines created by the cache. +func (idvf *interceptedDataVerifierFactory) Close() error { + for topic, cacher := range idvf.interceptedDataVerifierMap { + err := cacher.Close() + if err != nil { + return fmt.Errorf("failed to close cacher on topic %q: %w", topic, err) + } + } + + return nil +} + +// IsInterfaceNil returns true if there is no value under the interface +func (idvf *interceptedDataVerifierFactory) IsInterfaceNil() bool { + return idvf == nil +} diff --git a/process/interceptors/factory/interceptedDataVerifierFactory_test.go b/process/interceptors/factory/interceptedDataVerifierFactory_test.go new file mode 100644 index 00000000000..45f42ec05fd --- /dev/null +++ b/process/interceptors/factory/interceptedDataVerifierFactory_test.go @@ -0,0 +1,44 @@ +package factory + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func createMockArgInterceptedDataVerifierFactory() InterceptedDataVerifierFactoryArgs { + return InterceptedDataVerifierFactoryArgs{ + CacheSpan: time.Second, + CacheExpiry: time.Second, + } +} + +func TestInterceptedDataVerifierFactory_IsInterfaceNil(t *testing.T) { + t.Parallel() + + var factory *interceptedDataVerifierFactory + require.True(t, factory.IsInterfaceNil()) + + factory = NewInterceptedDataVerifierFactory(createMockArgInterceptedDataVerifierFactory()) + require.False(t, factory.IsInterfaceNil()) +} + +func TestNewInterceptedDataVerifierFactory(t *testing.T) { + t.Parallel() + + factory := NewInterceptedDataVerifierFactory(createMockArgInterceptedDataVerifierFactory()) + require.NotNil(t, factory) +} + +func TestInterceptedDataVerifierFactory_Create(t *testing.T) { + t.Parallel() + + factory := NewInterceptedDataVerifierFactory(createMockArgInterceptedDataVerifierFactory()) + require.NotNil(t, factory) + + interceptedDataVerifier, err := factory.Create("mockTopic") + require.NoError(t, err) + + require.False(t, interceptedDataVerifier.IsInterfaceNil()) +} diff --git a/process/interceptors/interceptedDataVerifier.go b/process/interceptors/interceptedDataVerifier.go new file mode 100644 index 00000000000..0accf41d3fc --- /dev/null +++ b/process/interceptors/interceptedDataVerifier.go @@ -0,0 +1,70 @@ +package interceptors + +import ( + "github.com/multiversx/mx-chain-core-go/core/check" + "github.com/multiversx/mx-chain-core-go/core/sync" + + "github.com/multiversx/mx-chain-go/process" + "github.com/multiversx/mx-chain-go/storage" +) + +type interceptedDataStatus int8 + +const ( + validInterceptedData interceptedDataStatus = iota + invalidInterceptedData + + interceptedDataStatusBytesSize = 8 +) + +type interceptedDataVerifier struct { + km sync.KeyRWMutexHandler + cache storage.Cacher +} + +// NewInterceptedDataVerifier creates a new instance of intercepted data verifier +func NewInterceptedDataVerifier(cache storage.Cacher) (*interceptedDataVerifier, error) { + if check.IfNil(cache) { + return nil, process.ErrNilInterceptedDataCache + } + + return &interceptedDataVerifier{ + km: sync.NewKeyRWMutex(), + cache: cache, + }, nil +} + +// Verify will check if the intercepted data has been validated before and put in the time cache. +// It will retrieve the status in the cache if it exists, otherwise it will validate it and store the status of the +// validation in the cache. Note that the entries are stored for a set period of time +func (idv *interceptedDataVerifier) Verify(interceptedData process.InterceptedData) error { + if len(interceptedData.Hash()) == 0 { + return interceptedData.CheckValidity() + } + + hash := string(interceptedData.Hash()) + idv.km.Lock(hash) + defer idv.km.Unlock(hash) + + if val, ok := idv.cache.Get(interceptedData.Hash()); ok { + if val == validInterceptedData { + return nil + } + + return process.ErrInvalidInterceptedData + } + + err := interceptedData.CheckValidity() + if err != nil { + idv.cache.Put(interceptedData.Hash(), invalidInterceptedData, interceptedDataStatusBytesSize) + return process.ErrInvalidInterceptedData + } + + idv.cache.Put(interceptedData.Hash(), validInterceptedData, interceptedDataStatusBytesSize) + return nil +} + +// IsInterfaceNil returns true if there is no value under the interface +func (idv *interceptedDataVerifier) IsInterfaceNil() bool { + return idv == nil +} diff --git a/process/interceptors/interceptedDataVerifier_test.go b/process/interceptors/interceptedDataVerifier_test.go new file mode 100644 index 00000000000..8913f5828d8 --- /dev/null +++ b/process/interceptors/interceptedDataVerifier_test.go @@ -0,0 +1,237 @@ +package interceptors + +import ( + "sync" + "testing" + "time" + + "github.com/multiversx/mx-chain-core-go/core/atomic" + "github.com/stretchr/testify/require" + + "github.com/multiversx/mx-chain-go/process" + "github.com/multiversx/mx-chain-go/storage" + "github.com/multiversx/mx-chain-go/storage/cache" + "github.com/multiversx/mx-chain-go/testscommon" +) + +const defaultSpan = 1 * time.Second + +func defaultInterceptedDataVerifier(span time.Duration) *interceptedDataVerifier { + c, _ := cache.NewTimeCacher(cache.ArgTimeCacher{ + DefaultSpan: span, + CacheExpiry: span, + }) + + verifier, _ := NewInterceptedDataVerifier(c) + return verifier +} + +func TestNewInterceptedDataVerifier(t *testing.T) { + t.Parallel() + + var c storage.Cacher + verifier, err := NewInterceptedDataVerifier(c) + require.Equal(t, process.ErrNilInterceptedDataCache, err) + require.Nil(t, verifier) +} + +func TestInterceptedDataVerifier_IsInterfaceNil(t *testing.T) { + t.Parallel() + + var verifier *interceptedDataVerifier + require.True(t, verifier.IsInterfaceNil()) + + verifier = defaultInterceptedDataVerifier(defaultSpan) + require.False(t, verifier.IsInterfaceNil()) +} + +func TestInterceptedDataVerifier_EmptyHash(t *testing.T) { + t.Parallel() + + var checkValidityCounter int + verifier := defaultInterceptedDataVerifier(defaultSpan) + interceptedData := &testscommon.InterceptedDataStub{ + CheckValidityCalled: func() error { + checkValidityCounter++ + return nil + }, + IsForCurrentShardCalled: func() bool { + return false + }, + HashCalled: func() []byte { + return nil + }, + } + + err := verifier.Verify(interceptedData) + require.NoError(t, err) + require.Equal(t, 1, checkValidityCounter) + + err = verifier.Verify(interceptedData) + require.NoError(t, err) + require.Equal(t, 2, checkValidityCounter) +} + +func TestInterceptedDataVerifier_CheckValidityShouldWork(t *testing.T) { + t.Parallel() + + checkValidityCounter := atomic.Counter{} + + interceptedData := &testscommon.InterceptedDataStub{ + CheckValidityCalled: func() error { + checkValidityCounter.Add(1) + return nil + }, + IsForCurrentShardCalled: func() bool { + return false + }, + HashCalled: func() []byte { + return []byte("hash") + }, + } + + verifier := defaultInterceptedDataVerifier(defaultSpan) + + err := verifier.Verify(interceptedData) + require.NoError(t, err) + + errCount := atomic.Counter{} + wg := sync.WaitGroup{} + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + err := verifier.Verify(interceptedData) + if err != nil { + errCount.Add(1) + } + }() + } + wg.Wait() + + require.Equal(t, int64(0), errCount.Get()) + require.Equal(t, int64(1), checkValidityCounter.Get()) +} + +func TestInterceptedDataVerifier_CheckValidityShouldNotWork(t *testing.T) { + t.Parallel() + + checkValidityCounter := atomic.Counter{} + + interceptedData := &testscommon.InterceptedDataStub{ + CheckValidityCalled: func() error { + checkValidityCounter.Add(1) + return process.ErrInvalidInterceptedData + }, + IsForCurrentShardCalled: func() bool { + return false + }, + HashCalled: func() []byte { + return []byte("hash") + }, + } + + verifier := defaultInterceptedDataVerifier(defaultSpan) + + err := verifier.Verify(interceptedData) + require.Equal(t, process.ErrInvalidInterceptedData, err) + + errCount := atomic.Counter{} + wg := sync.WaitGroup{} + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + err := verifier.Verify(interceptedData) + if err != nil { + errCount.Add(1) + } + }() + } + wg.Wait() + + require.Equal(t, int64(100), errCount.Get()) + require.Equal(t, int64(1), checkValidityCounter.Get()) +} + +func TestInterceptedDataVerifier_CheckExpiryTime(t *testing.T) { + t.Parallel() + + t.Run("expiry on valid data", func(t *testing.T) { + expiryTestDuration := defaultSpan * 2 + + checkValidityCounter := atomic.Counter{} + + interceptedData := &testscommon.InterceptedDataStub{ + CheckValidityCalled: func() error { + checkValidityCounter.Add(1) + return nil + }, + IsForCurrentShardCalled: func() bool { + return false + }, + HashCalled: func() []byte { + return []byte("hash") + }, + } + + verifier := defaultInterceptedDataVerifier(expiryTestDuration) + + // First retrieval, check validity is reached. + err := verifier.Verify(interceptedData) + require.NoError(t, err) + require.Equal(t, int64(1), checkValidityCounter.Get()) + + // Second retrieval should be from the cache. + err = verifier.Verify(interceptedData) + require.NoError(t, err) + require.Equal(t, int64(1), checkValidityCounter.Get()) + + // Wait for the cache expiry + <-time.After(expiryTestDuration + 100*time.Millisecond) + + // Third retrieval should reach validity check again. + err = verifier.Verify(interceptedData) + require.NoError(t, err) + require.Equal(t, int64(2), checkValidityCounter.Get()) + }) + + t.Run("expiry on invalid data", func(t *testing.T) { + expiryTestDuration := defaultSpan * 2 + + checkValidityCounter := atomic.Counter{} + + interceptedData := &testscommon.InterceptedDataStub{ + CheckValidityCalled: func() error { + checkValidityCounter.Add(1) + return process.ErrInvalidInterceptedData + }, + IsForCurrentShardCalled: func() bool { + return false + }, + HashCalled: func() []byte { + return []byte("hash") + }, + } + + verifier := defaultInterceptedDataVerifier(expiryTestDuration) + + // First retrieval, check validity is reached. + err := verifier.Verify(interceptedData) + require.Equal(t, process.ErrInvalidInterceptedData, err) + require.Equal(t, int64(1), checkValidityCounter.Get()) + + // Second retrieval should be from the cache. + err = verifier.Verify(interceptedData) + require.Equal(t, process.ErrInvalidInterceptedData, err) + require.Equal(t, int64(1), checkValidityCounter.Get()) + + // Wait for the cache expiry + <-time.After(expiryTestDuration + 100*time.Millisecond) + + // Third retrieval should reach validity check again. + err = verifier.Verify(interceptedData) + require.Equal(t, process.ErrInvalidInterceptedData, err) + require.Equal(t, int64(2), checkValidityCounter.Get()) + }) +} diff --git a/process/interceptors/multiDataInterceptor.go b/process/interceptors/multiDataInterceptor.go index 9e0197ea741..923c9b360e9 100644 --- a/process/interceptors/multiDataInterceptor.go +++ b/process/interceptors/multiDataInterceptor.go @@ -7,27 +7,30 @@ import ( "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/data/batch" "github.com/multiversx/mx-chain-core-go/marshal" + logger "github.com/multiversx/mx-chain-logger-go" + "github.com/pkg/errors" + "github.com/multiversx/mx-chain-go/common" "github.com/multiversx/mx-chain-go/debug/handler" "github.com/multiversx/mx-chain-go/p2p" "github.com/multiversx/mx-chain-go/process" "github.com/multiversx/mx-chain-go/process/interceptors/disabled" - logger "github.com/multiversx/mx-chain-logger-go" ) var log = logger.GetOrCreate("process/interceptors") // ArgMultiDataInterceptor is the argument for the multi-data interceptor type ArgMultiDataInterceptor struct { - Topic string - Marshalizer marshal.Marshalizer - DataFactory process.InterceptedDataFactory - Processor process.InterceptorProcessor - Throttler process.InterceptorThrottler - AntifloodHandler process.P2PAntifloodHandler - WhiteListRequest process.WhiteListHandler - PreferredPeersHolder process.PreferredPeersHolderHandler - CurrentPeerId core.PeerID + Topic string + Marshalizer marshal.Marshalizer + DataFactory process.InterceptedDataFactory + Processor process.InterceptorProcessor + Throttler process.InterceptorThrottler + AntifloodHandler process.P2PAntifloodHandler + WhiteListRequest process.WhiteListHandler + PreferredPeersHolder process.PreferredPeersHolderHandler + CurrentPeerId core.PeerID + InterceptedDataVerifier process.InterceptedDataVerifier } // MultiDataInterceptor is used for intercepting packed multi data @@ -66,19 +69,23 @@ func NewMultiDataInterceptor(arg ArgMultiDataInterceptor) (*MultiDataInterceptor if check.IfNil(arg.PreferredPeersHolder) { return nil, process.ErrNilPreferredPeersHolder } + if check.IfNil(arg.InterceptedDataVerifier) { + return nil, process.ErrNilInterceptedDataVerifier + } if len(arg.CurrentPeerId) == 0 { return nil, process.ErrEmptyPeerID } multiDataIntercept := &MultiDataInterceptor{ baseDataInterceptor: &baseDataInterceptor{ - throttler: arg.Throttler, - antifloodHandler: arg.AntifloodHandler, - topic: arg.Topic, - currentPeerId: arg.CurrentPeerId, - processor: arg.Processor, - preferredPeersHolder: arg.PreferredPeersHolder, - debugHandler: handler.NewDisabledInterceptorDebugHandler(), + throttler: arg.Throttler, + antifloodHandler: arg.AntifloodHandler, + topic: arg.Topic, + currentPeerId: arg.CurrentPeerId, + processor: arg.Processor, + preferredPeersHolder: arg.PreferredPeersHolder, + debugHandler: handler.NewDisabledInterceptorDebugHandler(), + interceptedDataVerifier: arg.InterceptedDataVerifier, }, marshalizer: arg.Marshalizer, factory: arg.DataFactory, @@ -153,6 +160,7 @@ func (mdi *MultiDataInterceptor) ProcessReceivedMessage(message p2p.MessageP2P, var interceptedData process.InterceptedData interceptedData, err = mdi.interceptedData(dataBuff, message.Peer(), fromConnectedPeer) listInterceptedData[index] = interceptedData + if err != nil { mdi.throttler.EndProcessing() return err @@ -207,11 +215,11 @@ func (mdi *MultiDataInterceptor) interceptedData(dataBuff []byte, originator cor mdi.receivedDebugInterceptedData(interceptedData) - err = interceptedData.CheckValidity() + err = mdi.interceptedDataVerifier.Verify(interceptedData) if err != nil { mdi.processDebugInterceptedData(interceptedData, err) - isWrongVersion := err == process.ErrInvalidTransactionVersion || err == process.ErrInvalidChainID + isWrongVersion := errors.Is(err, process.ErrInvalidTransactionVersion) || errors.Is(err, process.ErrInvalidChainID) if isWrongVersion { // this situation is so severe that we need to black list de peers reason := "wrong version of received intercepted data, topic " + mdi.topic + ", error " + err.Error() diff --git a/process/interceptors/multiDataInterceptor_test.go b/process/interceptors/multiDataInterceptor_test.go index 6ca244409b7..ede867dba07 100644 --- a/process/interceptors/multiDataInterceptor_test.go +++ b/process/interceptors/multiDataInterceptor_test.go @@ -10,28 +10,30 @@ import ( "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/data/batch" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/multiversx/mx-chain-go/process" "github.com/multiversx/mx-chain-go/process/interceptors" "github.com/multiversx/mx-chain-go/process/mock" "github.com/multiversx/mx-chain-go/testscommon" "github.com/multiversx/mx-chain-go/testscommon/p2pmocks" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) var fromConnectedPeerId = core.PeerID("from connected peer Id") func createMockArgMultiDataInterceptor() interceptors.ArgMultiDataInterceptor { return interceptors.ArgMultiDataInterceptor{ - Topic: "test topic", - Marshalizer: &mock.MarshalizerMock{}, - DataFactory: &mock.InterceptedDataFactoryStub{}, - Processor: &mock.InterceptorProcessorStub{}, - Throttler: createMockThrottler(), - AntifloodHandler: &mock.P2PAntifloodHandlerStub{}, - WhiteListRequest: &testscommon.WhiteListHandlerStub{}, - PreferredPeersHolder: &p2pmocks.PeersHolderStub{}, - CurrentPeerId: "pid", + Topic: "test topic", + Marshalizer: &mock.MarshalizerMock{}, + DataFactory: &mock.InterceptedDataFactoryStub{}, + Processor: &mock.InterceptorProcessorStub{}, + Throttler: createMockThrottler(), + AntifloodHandler: &mock.P2PAntifloodHandlerStub{}, + WhiteListRequest: &testscommon.WhiteListHandlerStub{}, + PreferredPeersHolder: &p2pmocks.PeersHolderStub{}, + CurrentPeerId: "pid", + InterceptedDataVerifier: &mock.InterceptedDataVerifierMock{}, } } @@ -68,6 +70,17 @@ func TestNewMultiDataInterceptor_NilInterceptedDataFactoryShouldErr(t *testing.T assert.Equal(t, process.ErrNilInterceptedDataFactory, err) } +func TestNewMultiDataInterceptor_NilInterceptedDataVerifierShouldErr(t *testing.T) { + t.Parallel() + + arg := createMockArgMultiDataInterceptor() + arg.InterceptedDataVerifier = nil + mdi, err := interceptors.NewMultiDataInterceptor(arg) + + assert.True(t, check.IfNil(mdi)) + assert.Equal(t, process.ErrNilInterceptedDataVerifier, err) +} + func TestNewMultiDataInterceptor_NilInterceptedDataProcessorShouldErr(t *testing.T) { t.Parallel() @@ -282,6 +295,7 @@ func TestMultiDataInterceptor_ProcessReceivedPartiallyCorrectDataShouldErr(t *te IsForCurrentShardCalled: func() bool { return true }, + HashCalled: func() []byte { return []byte("hash") }, } arg := createMockArgMultiDataInterceptor() arg.DataFactory = &mock.InterceptedDataFactoryStub{ @@ -354,6 +368,11 @@ func testProcessReceiveMessageMultiData(t *testing.T, isForCurrentShard bool, ex } arg.Processor = createMockInterceptorStub(&checkCalledNum, &processCalledNum) arg.Throttler = throttler + arg.InterceptedDataVerifier = &mock.InterceptedDataVerifierMock{ + VerifyCalled: func(interceptedData process.InterceptedData) error { + return interceptedData.CheckValidity() + }, + } mdi, _ := interceptors.NewMultiDataInterceptor(arg) dataField, _ := marshalizer.Marshal(&batch.Batch{Data: buffData}) @@ -570,6 +589,9 @@ func processReceivedMessageMultiDataInvalidVersion(t *testing.T, expectedErr err checkCalledNum := int32(0) processCalledNum := int32(0) interceptedData := &testscommon.InterceptedDataStub{ + HashCalled: func() []byte { + return []byte("hash") + }, CheckValidityCalled: func() error { return expectedErr }, @@ -603,6 +625,11 @@ func processReceivedMessageMultiDataInvalidVersion(t *testing.T, expectedErr err return true }, } + arg.InterceptedDataVerifier = &mock.InterceptedDataVerifierMock{ + VerifyCalled: func(interceptedData process.InterceptedData) error { + return interceptedData.CheckValidity() + }, + } mdi, _ := interceptors.NewMultiDataInterceptor(arg) dataField, _ := marshalizer.Marshal(&batch.Batch{Data: buffData}) @@ -658,6 +685,9 @@ func TestMultiDataInterceptor_ProcessReceivedMessageIsOriginatorNotOkButWhiteLis IsForCurrentShardCalled: func() bool { return false }, + HashCalled: func() []byte { + return []byte("hash") + }, } whiteListHandler := &testscommon.WhiteListHandlerStub{ diff --git a/process/interceptors/singleDataInterceptor.go b/process/interceptors/singleDataInterceptor.go index 84f3296acd7..7e5a4257fd6 100644 --- a/process/interceptors/singleDataInterceptor.go +++ b/process/interceptors/singleDataInterceptor.go @@ -1,8 +1,11 @@ package interceptors import ( + "errors" + "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/core/check" + "github.com/multiversx/mx-chain-go/common" "github.com/multiversx/mx-chain-go/debug/handler" "github.com/multiversx/mx-chain-go/p2p" @@ -11,14 +14,15 @@ import ( // ArgSingleDataInterceptor is the argument for the single-data interceptor type ArgSingleDataInterceptor struct { - Topic string - DataFactory process.InterceptedDataFactory - Processor process.InterceptorProcessor - Throttler process.InterceptorThrottler - AntifloodHandler process.P2PAntifloodHandler - WhiteListRequest process.WhiteListHandler - PreferredPeersHolder process.PreferredPeersHolderHandler - CurrentPeerId core.PeerID + Topic string + DataFactory process.InterceptedDataFactory + Processor process.InterceptorProcessor + Throttler process.InterceptorThrottler + AntifloodHandler process.P2PAntifloodHandler + WhiteListRequest process.WhiteListHandler + PreferredPeersHolder process.PreferredPeersHolderHandler + CurrentPeerId core.PeerID + InterceptedDataVerifier process.InterceptedDataVerifier } // SingleDataInterceptor is used for intercepting packed multi data @@ -51,19 +55,23 @@ func NewSingleDataInterceptor(arg ArgSingleDataInterceptor) (*SingleDataIntercep if check.IfNil(arg.PreferredPeersHolder) { return nil, process.ErrNilPreferredPeersHolder } + if check.IfNil(arg.InterceptedDataVerifier) { + return nil, process.ErrNilInterceptedDataVerifier + } if len(arg.CurrentPeerId) == 0 { return nil, process.ErrEmptyPeerID } singleDataIntercept := &SingleDataInterceptor{ baseDataInterceptor: &baseDataInterceptor{ - throttler: arg.Throttler, - antifloodHandler: arg.AntifloodHandler, - topic: arg.Topic, - currentPeerId: arg.CurrentPeerId, - processor: arg.Processor, - preferredPeersHolder: arg.PreferredPeersHolder, - debugHandler: handler.NewDisabledInterceptorDebugHandler(), + throttler: arg.Throttler, + antifloodHandler: arg.AntifloodHandler, + topic: arg.Topic, + currentPeerId: arg.CurrentPeerId, + processor: arg.Processor, + preferredPeersHolder: arg.PreferredPeersHolder, + debugHandler: handler.NewDisabledInterceptorDebugHandler(), + interceptedDataVerifier: arg.InterceptedDataVerifier, }, factory: arg.DataFactory, whiteListRequest: arg.WhiteListRequest, @@ -93,13 +101,12 @@ func (sdi *SingleDataInterceptor) ProcessReceivedMessage(message p2p.MessageP2P, } sdi.receivedDebugInterceptedData(interceptedData) - - err = interceptedData.CheckValidity() + err = sdi.interceptedDataVerifier.Verify(interceptedData) if err != nil { sdi.throttler.EndProcessing() sdi.processDebugInterceptedData(interceptedData, err) - isWrongVersion := err == process.ErrInvalidTransactionVersion || err == process.ErrInvalidChainID + isWrongVersion := errors.Is(err, process.ErrInvalidTransactionVersion) || errors.Is(err, process.ErrInvalidChainID) if isWrongVersion { // this situation is so severe that we need to black list de peers reason := "wrong version of received intercepted data, topic " + sdi.topic + ", error " + err.Error() diff --git a/process/interceptors/singleDataInterceptor_test.go b/process/interceptors/singleDataInterceptor_test.go index 515c2a8724c..9b1fad0a840 100644 --- a/process/interceptors/singleDataInterceptor_test.go +++ b/process/interceptors/singleDataInterceptor_test.go @@ -8,25 +8,27 @@ import ( "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/core/check" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/multiversx/mx-chain-go/process" "github.com/multiversx/mx-chain-go/process/interceptors" "github.com/multiversx/mx-chain-go/process/mock" "github.com/multiversx/mx-chain-go/testscommon" "github.com/multiversx/mx-chain-go/testscommon/p2pmocks" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func createMockArgSingleDataInterceptor() interceptors.ArgSingleDataInterceptor { return interceptors.ArgSingleDataInterceptor{ - Topic: "test topic", - DataFactory: &mock.InterceptedDataFactoryStub{}, - Processor: &mock.InterceptorProcessorStub{}, - Throttler: createMockThrottler(), - AntifloodHandler: &mock.P2PAntifloodHandlerStub{}, - WhiteListRequest: &testscommon.WhiteListHandlerStub{}, - PreferredPeersHolder: &p2pmocks.PeersHolderStub{}, - CurrentPeerId: "pid", + Topic: "test topic", + DataFactory: &mock.InterceptedDataFactoryStub{}, + Processor: &mock.InterceptorProcessorStub{}, + Throttler: createMockThrottler(), + AntifloodHandler: &mock.P2PAntifloodHandlerStub{}, + WhiteListRequest: &testscommon.WhiteListHandlerStub{}, + PreferredPeersHolder: &p2pmocks.PeersHolderStub{}, + CurrentPeerId: "pid", + InterceptedDataVerifier: createMockInterceptedDataVerifier(), } } @@ -57,6 +59,14 @@ func createMockThrottler() *mock.InterceptorThrottlerStub { } } +func createMockInterceptedDataVerifier() *mock.InterceptedDataVerifierMock { + return &mock.InterceptedDataVerifierMock{ + VerifyCalled: func(interceptedData process.InterceptedData) error { + return interceptedData.CheckValidity() + }, + } +} + func TestNewSingleDataInterceptor_EmptyTopicShouldErr(t *testing.T) { t.Parallel() @@ -145,6 +155,17 @@ func TestNewSingleDataInterceptor_EmptyPeerIDShouldErr(t *testing.T) { assert.Equal(t, process.ErrEmptyPeerID, err) } +func TestNewSingleDataInterceptor_NilInterceptedDataVerifierShouldErr(t *testing.T) { + t.Parallel() + + arg := createMockArgMultiDataInterceptor() + arg.InterceptedDataVerifier = nil + mdi, err := interceptors.NewMultiDataInterceptor(arg) + + assert.True(t, check.IfNil(mdi)) + assert.Equal(t, process.ErrNilInterceptedDataVerifier, err) +} + func TestNewSingleDataInterceptor(t *testing.T) { t.Parallel() diff --git a/process/interface.go b/process/interface.go index 2d1ff18e22a..99693655a83 100644 --- a/process/interface.go +++ b/process/interface.go @@ -1401,3 +1401,16 @@ type SentSignaturesTracker interface { ResetCountersForManagedBlockSigner(signerPk []byte) IsInterfaceNil() bool } + +// InterceptedDataVerifier defines a component able to verify intercepted data validity +type InterceptedDataVerifier interface { + Verify(interceptedData InterceptedData) error + IsInterfaceNil() bool +} + +// InterceptedDataVerifierFactory defines a component that is able to create intercepted data verifiers +type InterceptedDataVerifierFactory interface { + Create(topic string) (InterceptedDataVerifier, error) + Close() error + IsInterfaceNil() bool +} diff --git a/process/mock/interceptedDataVerifierFactoryMock.go b/process/mock/interceptedDataVerifierFactoryMock.go new file mode 100644 index 00000000000..245be014b15 --- /dev/null +++ b/process/mock/interceptedDataVerifierFactoryMock.go @@ -0,0 +1,29 @@ +package mock + +import ( + "github.com/multiversx/mx-chain-go/process" +) + +// InterceptedDataVerifierFactoryMock - +type InterceptedDataVerifierFactoryMock struct { + CreateCalled func(topic string) (process.InterceptedDataVerifier, error) +} + +// Create - +func (idvfs *InterceptedDataVerifierFactoryMock) Create(topic string) (process.InterceptedDataVerifier, error) { + if idvfs.CreateCalled != nil { + return idvfs.CreateCalled(topic) + } + + return &InterceptedDataVerifierMock{}, nil +} + +// Close - +func (idvfs *InterceptedDataVerifierFactoryMock) Close() error { + return nil +} + +// IsInterfaceNil - +func (idvfs *InterceptedDataVerifierFactoryMock) IsInterfaceNil() bool { + return idvfs == nil +} diff --git a/process/mock/interceptedDataVerifierMock.go b/process/mock/interceptedDataVerifierMock.go index c8d4d14392b..6668a6ea625 100644 --- a/process/mock/interceptedDataVerifierMock.go +++ b/process/mock/interceptedDataVerifierMock.go @@ -1,17 +1,24 @@ package mock -import "github.com/multiversx/mx-chain-go/process" +import ( + "github.com/multiversx/mx-chain-go/process" +) // InterceptedDataVerifierMock - type InterceptedDataVerifierMock struct { + VerifyCalled func(interceptedData process.InterceptedData) error } -// IsForCurrentShard - -func (i *InterceptedDataVerifierMock) IsForCurrentShard(_ process.InterceptedData) bool { - return true +// Verify - +func (idv *InterceptedDataVerifierMock) Verify(interceptedData process.InterceptedData) error { + if idv.VerifyCalled != nil { + return idv.VerifyCalled(interceptedData) + } + + return nil } -// IsInterfaceNil returns true if underlying object is -func (i *InterceptedDataVerifierMock) IsInterfaceNil() bool { - return i == nil +// IsInterfaceNil - +func (idv *InterceptedDataVerifierMock) IsInterfaceNil() bool { + return idv == nil } diff --git a/process/unsigned/interceptedUnsignedTransaction_test.go b/process/unsigned/interceptedUnsignedTransaction_test.go index b0c00e4982e..102b76c0975 100644 --- a/process/unsigned/interceptedUnsignedTransaction_test.go +++ b/process/unsigned/interceptedUnsignedTransaction_test.go @@ -11,13 +11,14 @@ import ( "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/data/smartContractResult" + logger "github.com/multiversx/mx-chain-logger-go" + "github.com/stretchr/testify/assert" + "github.com/multiversx/mx-chain-go/process" "github.com/multiversx/mx-chain-go/process/mock" "github.com/multiversx/mx-chain-go/process/unsigned" "github.com/multiversx/mx-chain-go/testscommon" "github.com/multiversx/mx-chain-go/testscommon/hashingMocks" - logger "github.com/multiversx/mx-chain-logger-go" - "github.com/stretchr/testify/assert" ) var senderShard = uint32(2) @@ -170,7 +171,7 @@ func TestNewInterceptedUnsignedTransaction_ShouldWork(t *testing.T) { assert.Nil(t, err) } -// ------- CheckValidity +// ------- Verify func TestInterceptedUnsignedTransaction_CheckValidityNilTxHashShouldErr(t *testing.T) { t.Parallel() diff --git a/sharding/nodesSetup.go b/sharding/nodesSetup.go index 26e8bee3351..32f9b1dbc92 100644 --- a/sharding/nodesSetup.go +++ b/sharding/nodesSetup.go @@ -6,6 +6,7 @@ import ( "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/core/check" + "github.com/multiversx/mx-chain-go/config" "github.com/multiversx/mx-chain-go/sharding/nodesCoordinator" ) diff --git a/testscommon/components/components.go b/testscommon/components/components.go index 6d33ad04fa0..6e630b9050d 100644 --- a/testscommon/components/components.go +++ b/testscommon/components/components.go @@ -8,6 +8,10 @@ import ( "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-core-go/data/endProcess" "github.com/multiversx/mx-chain-core-go/data/outport" + logger "github.com/multiversx/mx-chain-logger-go" + wasmConfig "github.com/multiversx/mx-chain-vm-go/config" + "github.com/stretchr/testify/require" + "github.com/multiversx/mx-chain-go/common" commonFactory "github.com/multiversx/mx-chain-go/common/factory" "github.com/multiversx/mx-chain-go/config" @@ -41,9 +45,6 @@ import ( statusHandlerMock "github.com/multiversx/mx-chain-go/testscommon/statusHandler" "github.com/multiversx/mx-chain-go/testscommon/storage" "github.com/multiversx/mx-chain-go/trie" - logger "github.com/multiversx/mx-chain-logger-go" - wasmConfig "github.com/multiversx/mx-chain-vm-go/config" - "github.com/stretchr/testify/require" ) var log = logger.GetOrCreate("componentsMock") diff --git a/testscommon/generalConfig.go b/testscommon/generalConfig.go index 515c64518b4..f5777cfae6b 100644 --- a/testscommon/generalConfig.go +++ b/testscommon/generalConfig.go @@ -441,6 +441,10 @@ func GetGeneralConfig() config.Config { ResourceStats: config.ResourceStatsConfig{ RefreshIntervalInSec: 1, }, + InterceptedDataVerifier: config.InterceptedDataVerifierConfig{ + CacheSpanInSec: 1, + CacheExpiryInSec: 1, + }, } } diff --git a/update/factory/exportHandlerFactory.go b/update/factory/exportHandlerFactory.go index c13f25f3f5a..0cda7a5d2e0 100644 --- a/update/factory/exportHandlerFactory.go +++ b/update/factory/exportHandlerFactory.go @@ -8,6 +8,8 @@ import ( "time" "github.com/multiversx/mx-chain-core-go/core/check" + logger "github.com/multiversx/mx-chain-logger-go" + "github.com/multiversx/mx-chain-go/common" "github.com/multiversx/mx-chain-go/config" "github.com/multiversx/mx-chain-go/dataRetriever" @@ -30,7 +32,6 @@ import ( "github.com/multiversx/mx-chain-go/update/genesis" "github.com/multiversx/mx-chain-go/update/storing" "github.com/multiversx/mx-chain-go/update/sync" - logger "github.com/multiversx/mx-chain-logger-go" ) var log = logger.GetOrCreate("update/factory") @@ -69,6 +70,7 @@ type ArgsExporter struct { TrieSyncerVersion int CheckNodesOnDisk bool NodeOperationMode common.NodeOperation + InterceptedDataVerifierFactory process.InterceptedDataVerifierFactory } type exportHandlerFactory struct { @@ -108,6 +110,7 @@ type exportHandlerFactory struct { trieSyncerVersion int checkNodesOnDisk bool nodeOperationMode common.NodeOperation + interceptedDataVerifierFactory process.InterceptedDataVerifierFactory } // NewExportHandlerFactory creates an exporter factory @@ -266,6 +269,7 @@ func NewExportHandlerFactory(args ArgsExporter) (*exportHandlerFactory, error) { checkNodesOnDisk: args.CheckNodesOnDisk, statusCoreComponents: args.StatusCoreComponents, nodeOperationMode: args.NodeOperationMode, + interceptedDataVerifierFactory: args.InterceptedDataVerifierFactory, } return e, nil @@ -588,6 +592,7 @@ func (e *exportHandlerFactory) createInterceptors() error { FullArchiveInterceptorsContainer: e.fullArchiveInterceptorsContainer, AntifloodHandler: e.networkComponents.InputAntiFloodHandler(), NodeOperationMode: e.nodeOperationMode, + InterceptedDataVerifierFactory: e.interceptedDataVerifierFactory, } fullSyncInterceptors, err := NewFullSyncInterceptorsContainerFactory(argsInterceptors) if err != nil { diff --git a/update/factory/fullSyncInterceptors.go b/update/factory/fullSyncInterceptors.go index 0fe0298c4d6..037155226c9 100644 --- a/update/factory/fullSyncInterceptors.go +++ b/update/factory/fullSyncInterceptors.go @@ -7,6 +7,7 @@ import ( "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/core/throttler" "github.com/multiversx/mx-chain-core-go/marshal" + "github.com/multiversx/mx-chain-go/common" "github.com/multiversx/mx-chain-go/dataRetriever" "github.com/multiversx/mx-chain-go/process" @@ -29,25 +30,26 @@ const numGoRoutines = 2000 // fullSyncInterceptorsContainerFactory will handle the creation the interceptors container for shards type fullSyncInterceptorsContainerFactory struct { - mainContainer process.InterceptorsContainer - fullArchiveContainer process.InterceptorsContainer - shardCoordinator sharding.Coordinator - accounts state.AccountsAdapter - store dataRetriever.StorageService - dataPool dataRetriever.PoolsHolder - mainMessenger process.TopicHandler - fullArchiveMessenger process.TopicHandler - nodesCoordinator nodesCoordinator.NodesCoordinator - blockBlackList process.TimeCacher - argInterceptorFactory *interceptorFactory.ArgInterceptedDataFactory - globalThrottler process.InterceptorThrottler - maxTxNonceDeltaAllowed int - addressPubkeyConv core.PubkeyConverter - whiteListHandler update.WhiteListHandler - whiteListerVerifiedTxs update.WhiteListHandler - antifloodHandler process.P2PAntifloodHandler - preferredPeersHolder update.PreferredPeersHolderHandler - nodeOperationMode common.NodeOperation + mainContainer process.InterceptorsContainer + fullArchiveContainer process.InterceptorsContainer + shardCoordinator sharding.Coordinator + accounts state.AccountsAdapter + store dataRetriever.StorageService + dataPool dataRetriever.PoolsHolder + mainMessenger process.TopicHandler + fullArchiveMessenger process.TopicHandler + nodesCoordinator nodesCoordinator.NodesCoordinator + blockBlackList process.TimeCacher + argInterceptorFactory *interceptorFactory.ArgInterceptedDataFactory + globalThrottler process.InterceptorThrottler + maxTxNonceDeltaAllowed int + addressPubkeyConv core.PubkeyConverter + whiteListHandler update.WhiteListHandler + whiteListerVerifiedTxs update.WhiteListHandler + antifloodHandler process.P2PAntifloodHandler + preferredPeersHolder update.PreferredPeersHolderHandler + nodeOperationMode common.NodeOperation + interceptedDataVerifierFactory process.InterceptedDataVerifierFactory } // ArgsNewFullSyncInterceptorsContainerFactory holds the arguments needed for fullSyncInterceptorsContainerFactory @@ -75,6 +77,7 @@ type ArgsNewFullSyncInterceptorsContainerFactory struct { FullArchiveInterceptorsContainer process.InterceptorsContainer AntifloodHandler process.P2PAntifloodHandler NodeOperationMode common.NodeOperation + InterceptedDataVerifierFactory process.InterceptedDataVerifierFactory } // NewFullSyncInterceptorsContainerFactory is responsible for creating a new interceptors factory object @@ -132,6 +135,9 @@ func NewFullSyncInterceptorsContainerFactory( if check.IfNil(args.AntifloodHandler) { return nil, process.ErrNilAntifloodHandler } + if check.IfNil(args.InterceptedDataVerifierFactory) { + return nil, process.ErrNilInterceptedDataVerifierFactory + } argInterceptorFactory := &interceptorFactory.ArgInterceptedDataFactory{ CoreComponents: args.CoreComponents, @@ -164,8 +170,9 @@ func NewFullSyncInterceptorsContainerFactory( whiteListerVerifiedTxs: args.WhiteListerVerifiedTxs, antifloodHandler: args.AntifloodHandler, //TODO: inject the real peers holder once we have the peers mapping before epoch bootstrap finishes - preferredPeersHolder: disabled.NewPreferredPeersHolder(), - nodeOperationMode: args.NodeOperationMode, + preferredPeersHolder: disabled.NewPreferredPeersHolder(), + nodeOperationMode: args.NodeOperationMode, + interceptedDataVerifierFactory: args.InterceptedDataVerifierFactory, } icf.globalThrottler, err = throttler.NewNumGoRoutinesThrottler(numGoRoutines) @@ -349,15 +356,21 @@ func (ficf *fullSyncInterceptorsContainerFactory) createOneShardHeaderIntercepto return nil, err } + interceptedDataVerifier, err := ficf.interceptedDataVerifierFactory.Create(topic) + if err != nil { + return nil, err + } + interceptor, err := interceptors.NewSingleDataInterceptor( interceptors.ArgSingleDataInterceptor{ - Topic: topic, - DataFactory: hdrFactory, - Processor: hdrProcessor, - Throttler: ficf.globalThrottler, - AntifloodHandler: ficf.antifloodHandler, - WhiteListRequest: ficf.whiteListHandler, - CurrentPeerId: ficf.mainMessenger.ID(), + Topic: topic, + DataFactory: hdrFactory, + Processor: hdrProcessor, + Throttler: ficf.globalThrottler, + AntifloodHandler: ficf.antifloodHandler, + WhiteListRequest: ficf.whiteListHandler, + CurrentPeerId: ficf.mainMessenger.ID(), + InterceptedDataVerifier: interceptedDataVerifier, }, ) if err != nil { @@ -551,17 +564,23 @@ func (ficf *fullSyncInterceptorsContainerFactory) createOneTxInterceptor(topic s return nil, err } + interceptedDataVerifier, err := ficf.interceptedDataVerifierFactory.Create(topic) + if err != nil { + return nil, err + } + interceptor, err := interceptors.NewMultiDataInterceptor( interceptors.ArgMultiDataInterceptor{ - Topic: topic, - Marshalizer: ficf.argInterceptorFactory.CoreComponents.InternalMarshalizer(), - DataFactory: txFactory, - Processor: txProcessor, - Throttler: ficf.globalThrottler, - AntifloodHandler: ficf.antifloodHandler, - WhiteListRequest: ficf.whiteListHandler, - CurrentPeerId: ficf.mainMessenger.ID(), - PreferredPeersHolder: ficf.preferredPeersHolder, + Topic: topic, + Marshalizer: ficf.argInterceptorFactory.CoreComponents.InternalMarshalizer(), + DataFactory: txFactory, + Processor: txProcessor, + Throttler: ficf.globalThrottler, + AntifloodHandler: ficf.antifloodHandler, + WhiteListRequest: ficf.whiteListHandler, + CurrentPeerId: ficf.mainMessenger.ID(), + PreferredPeersHolder: ficf.preferredPeersHolder, + InterceptedDataVerifier: interceptedDataVerifier, }, ) if err != nil { @@ -586,17 +605,23 @@ func (ficf *fullSyncInterceptorsContainerFactory) createOneUnsignedTxInterceptor return nil, err } + interceptedDataVerifier, err := ficf.interceptedDataVerifierFactory.Create(topic) + if err != nil { + return nil, err + } + interceptor, err := interceptors.NewMultiDataInterceptor( interceptors.ArgMultiDataInterceptor{ - Topic: topic, - Marshalizer: ficf.argInterceptorFactory.CoreComponents.InternalMarshalizer(), - DataFactory: txFactory, - Processor: txProcessor, - Throttler: ficf.globalThrottler, - AntifloodHandler: ficf.antifloodHandler, - WhiteListRequest: ficf.whiteListHandler, - CurrentPeerId: ficf.mainMessenger.ID(), - PreferredPeersHolder: ficf.preferredPeersHolder, + Topic: topic, + Marshalizer: ficf.argInterceptorFactory.CoreComponents.InternalMarshalizer(), + DataFactory: txFactory, + Processor: txProcessor, + Throttler: ficf.globalThrottler, + AntifloodHandler: ficf.antifloodHandler, + WhiteListRequest: ficf.whiteListHandler, + CurrentPeerId: ficf.mainMessenger.ID(), + PreferredPeersHolder: ficf.preferredPeersHolder, + InterceptedDataVerifier: interceptedDataVerifier, }, ) if err != nil { @@ -621,17 +646,23 @@ func (ficf *fullSyncInterceptorsContainerFactory) createOneRewardTxInterceptor(t return nil, err } + interceptedDataVerifier, err := ficf.interceptedDataVerifierFactory.Create(topic) + if err != nil { + return nil, err + } + interceptor, err := interceptors.NewMultiDataInterceptor( interceptors.ArgMultiDataInterceptor{ - Topic: topic, - Marshalizer: ficf.argInterceptorFactory.CoreComponents.InternalMarshalizer(), - DataFactory: txFactory, - Processor: txProcessor, - Throttler: ficf.globalThrottler, - AntifloodHandler: ficf.antifloodHandler, - WhiteListRequest: ficf.whiteListHandler, - CurrentPeerId: ficf.mainMessenger.ID(), - PreferredPeersHolder: ficf.preferredPeersHolder, + Topic: topic, + Marshalizer: ficf.argInterceptorFactory.CoreComponents.InternalMarshalizer(), + DataFactory: txFactory, + Processor: txProcessor, + Throttler: ficf.globalThrottler, + AntifloodHandler: ficf.antifloodHandler, + WhiteListRequest: ficf.whiteListHandler, + CurrentPeerId: ficf.mainMessenger.ID(), + PreferredPeersHolder: ficf.preferredPeersHolder, + InterceptedDataVerifier: interceptedDataVerifier, }, ) if err != nil { @@ -694,16 +725,22 @@ func (ficf *fullSyncInterceptorsContainerFactory) createOneMiniBlocksInterceptor return nil, err } + interceptedDataVerifier, err := ficf.interceptedDataVerifierFactory.Create(topic) + if err != nil { + return nil, err + } + interceptor, err := interceptors.NewSingleDataInterceptor( interceptors.ArgSingleDataInterceptor{ - Topic: topic, - DataFactory: txFactory, - Processor: txBlockBodyProcessor, - Throttler: ficf.globalThrottler, - AntifloodHandler: ficf.antifloodHandler, - WhiteListRequest: ficf.whiteListHandler, - CurrentPeerId: ficf.mainMessenger.ID(), - PreferredPeersHolder: ficf.preferredPeersHolder, + Topic: topic, + DataFactory: txFactory, + Processor: txBlockBodyProcessor, + Throttler: ficf.globalThrottler, + AntifloodHandler: ficf.antifloodHandler, + WhiteListRequest: ficf.whiteListHandler, + CurrentPeerId: ficf.mainMessenger.ID(), + PreferredPeersHolder: ficf.preferredPeersHolder, + InterceptedDataVerifier: interceptedDataVerifier, }, ) if err != nil { @@ -733,17 +770,23 @@ func (ficf *fullSyncInterceptorsContainerFactory) generateMetachainHeaderInterce return err } + interceptedDataVerifier, err := ficf.interceptedDataVerifierFactory.Create(identifierHdr) + if err != nil { + return err + } + //only one metachain header topic interceptor, err := interceptors.NewSingleDataInterceptor( interceptors.ArgSingleDataInterceptor{ - Topic: identifierHdr, - DataFactory: hdrFactory, - Processor: hdrProcessor, - Throttler: ficf.globalThrottler, - AntifloodHandler: ficf.antifloodHandler, - WhiteListRequest: ficf.whiteListHandler, - CurrentPeerId: ficf.mainMessenger.ID(), - PreferredPeersHolder: ficf.preferredPeersHolder, + Topic: identifierHdr, + DataFactory: hdrFactory, + Processor: hdrProcessor, + Throttler: ficf.globalThrottler, + AntifloodHandler: ficf.antifloodHandler, + WhiteListRequest: ficf.whiteListHandler, + CurrentPeerId: ficf.mainMessenger.ID(), + PreferredPeersHolder: ficf.preferredPeersHolder, + InterceptedDataVerifier: interceptedDataVerifier, }, ) if err != nil { @@ -769,17 +812,23 @@ func (ficf *fullSyncInterceptorsContainerFactory) createOneTrieNodesInterceptor( return nil, err } + interceptedDataVerifier, err := ficf.interceptedDataVerifierFactory.Create(topic) + if err != nil { + return nil, err + } + interceptor, err := interceptors.NewMultiDataInterceptor( interceptors.ArgMultiDataInterceptor{ - Topic: topic, - Marshalizer: ficf.argInterceptorFactory.CoreComponents.InternalMarshalizer(), - DataFactory: trieNodesFactory, - Processor: trieNodesProcessor, - Throttler: ficf.globalThrottler, - AntifloodHandler: ficf.antifloodHandler, - WhiteListRequest: ficf.whiteListHandler, - CurrentPeerId: ficf.mainMessenger.ID(), - PreferredPeersHolder: ficf.preferredPeersHolder, + Topic: topic, + Marshalizer: ficf.argInterceptorFactory.CoreComponents.InternalMarshalizer(), + DataFactory: trieNodesFactory, + Processor: trieNodesProcessor, + Throttler: ficf.globalThrottler, + AntifloodHandler: ficf.antifloodHandler, + WhiteListRequest: ficf.whiteListHandler, + CurrentPeerId: ficf.mainMessenger.ID(), + PreferredPeersHolder: ficf.preferredPeersHolder, + InterceptedDataVerifier: interceptedDataVerifier, }, ) if err != nil { @@ -811,7 +860,6 @@ func (ficf *fullSyncInterceptorsContainerFactory) generateRewardTxInterceptors() if err != nil { return err } - keys[int(idx)] = identifierScr interceptorSlice[int(idx)] = interceptor }