Skip to content

Commit

Permalink
Merge pull request #5795 from multiversx/fix-import-db-resource-leak
Browse files Browse the repository at this point in the history
Fix import db resource leak
  • Loading branch information
iulianpascalau authored Jan 16, 2024
2 parents 229ae2e + ea525d8 commit 023a194
Show file tree
Hide file tree
Showing 17 changed files with 148 additions and 577 deletions.
1 change: 0 additions & 1 deletion cmd/node/CLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ GLOBAL OPTIONS:
--import-db value This flag, if set, will make the node start the import process using the provided data path. Will re-checkand re-process everything
--import-db-no-sig-check This flag, if set, will cause the signature checks on headers to be skipped. Can be used only if the import-db was previously set
--import-db-save-epoch-root-hash This flag, if set, will export the trie snapshots at every new epoch
--import-db-start-epoch value This flag will specify the start in epoch value in import-db process (default: 0)
--redundancy-level value This flag specifies the level of redundancy used by the current instance for the node (-1 = disabled, 0 = main instance (default), 1 = first backup, 2 = second backup, etc.) (default: 0)
--full-archive Boolean option for settings an observer as full archive, which will sync the entire database of its shard
--mem-ballast value Flag that specifies the number of MegaBytes to be used as a memory ballast for Garbage Collector optimization. If set to 0 (or not set at all), the feature will be disabled. This flag should be used only for well-monitored nodes and by advanced users, as a too high memory ballast could lead to Out Of Memory panics. The memory ballast should not be higher than 20-25% of the machine's available RAM (default: 0)
Expand Down
13 changes: 1 addition & 12 deletions cmd/node/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,12 +331,6 @@ var (
Name: "import-db-save-epoch-root-hash",
Usage: "This flag, if set, will export the trie snapshots at every new epoch",
}
// importDbStartInEpoch defines a flag for an optional flag that can specify the start in epoch value when executing the import-db process
importDbStartInEpoch = cli.Uint64Flag{
Name: "import-db-start-epoch",
Value: 0,
Usage: "This flag will specify the start in epoch value in import-db process",
}
// redundancyLevel defines a flag that specifies the level of redundancy used by the current instance for the node (-1 = disabled, 0 = main instance (default), 1 = first backup, 2 = second backup, etc.)
redundancyLevel = cli.Int64Flag{
Name: "redundancy-level",
Expand Down Expand Up @@ -461,7 +455,6 @@ func getFlags() []cli.Flag {
importDbDirectory,
importDbNoSigCheck,
importDbSaveEpochRootHash,
importDbStartInEpoch,
redundancyLevel,
fullArchive,
memBallast,
Expand Down Expand Up @@ -557,7 +550,6 @@ func applyFlags(ctx *cli.Context, cfgs *config.Configs, flagsConfig *config.Cont
ImportDBWorkingDir: importDbDirectoryValue,
ImportDbNoSigCheckFlag: ctx.GlobalBool(importDbNoSigCheck.Name),
ImportDbSaveTrieEpochRootHash: ctx.GlobalBool(importDbSaveEpochRootHash.Name),
ImportDBStartInEpoch: uint32(ctx.GlobalUint64(importDbStartInEpoch.Name)),
}
cfgs.FlagsConfig = flagsConfig
cfgs.ImportDbConfig = importDBConfigs
Expand Down Expand Up @@ -715,9 +707,7 @@ func processConfigImportDBMode(log logger.Logger, configs *config.Configs) error
return err
}

if importDbFlags.ImportDBStartInEpoch == 0 {
generalConfigs.GeneralSettings.StartInEpochEnabled = false
}
generalConfigs.GeneralSettings.StartInEpochEnabled = false

// We need to increment "NumActivePersisters" in order to make the storage resolvers work (since they open 2 epochs in advance)
generalConfigs.StoragePruning.NumActivePersisters++
Expand All @@ -736,7 +726,6 @@ func processConfigImportDBMode(log logger.Logger, configs *config.Configs) error
"fullArchiveP2P.ThresholdMinConnectedPeers", fullArchiveP2PConfigs.Node.ThresholdMinConnectedPeers,
"no sig check", importDbFlags.ImportDbNoSigCheckFlag,
"import save trie epoch root hash", importDbFlags.ImportDbSaveTrieEpochRootHash,
"import DB start in epoch", importDbFlags.ImportDBStartInEpoch,
"import DB shard ID", importDbFlags.ImportDBTargetShardID,
"kad dht discoverer", "off",
)
Expand Down
1 change: 0 additions & 1 deletion config/contextFlagsConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ type ContextFlagsConfig struct {
// ImportDbConfig will hold the import-db parameters
type ImportDbConfig struct {
IsImportDBMode bool
ImportDBStartInEpoch uint32
ImportDBTargetShardID uint32
ImportDBWorkingDir string
ImportDbNoSigCheckFlag bool
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package storagerequesterscontainer

import (
"fmt"

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-go/dataRetriever"
"github.com/multiversx/mx-chain-go/dataRetriever/factory/containers"
storagerequesters "github.com/multiversx/mx-chain-go/dataRetriever/storageRequesters"
Expand Down Expand Up @@ -76,11 +73,6 @@ func (mrcf *metaRequestersContainerFactory) Create() (dataRetriever.RequestersCo
return nil, err
}

err = mrcf.generateTrieNodesRequesters()
if err != nil {
return nil, err
}

return mrcf.container, nil
}

Expand Down Expand Up @@ -178,80 +170,6 @@ func (mrcf *metaRequestersContainerFactory) createMetaChainHeaderRequester() (da
return requester, nil
}

func (mrcf *metaRequestersContainerFactory) generateTrieNodesRequesters() error {
keys := make([]string, 0)
requestersSlice := make([]dataRetriever.Requester, 0)

userAccountsStorer, err := mrcf.store.GetStorer(dataRetriever.UserAccountsUnit)
if err != nil {
return err
}

identifierTrieNodes := factory.AccountTrieNodesTopic + core.CommunicationIdentifierBetweenShards(core.MetachainShardId, core.MetachainShardId)
storageManager, userAccountsDataTrie, err := mrcf.newImportDBTrieStorage(
userAccountsStorer,
dataRetriever.UserAccountsUnit,
mrcf.enableEpochsHandler,
mrcf.stateStatsHandler,
)
if err != nil {
return fmt.Errorf("%w while creating user accounts data trie storage getter", err)
}
arg := storagerequesters.ArgTrieRequester{
Messenger: mrcf.messenger,
ResponseTopicName: identifierTrieNodes,
Marshalizer: mrcf.marshalizer,
TrieDataGetter: userAccountsDataTrie,
TrieStorageManager: storageManager,
ManualEpochStartNotifier: mrcf.manualEpochStartNotifier,
ChanGracefullyClose: mrcf.chanGracefullyClose,
DelayBeforeGracefulClose: defaultBeforeGracefulClose,
}
requester, err := storagerequesters.NewTrieNodeRequester(arg)
if err != nil {
return fmt.Errorf("%w while creating user accounts trie node requester", err)
}

requestersSlice = append(requestersSlice, requester)
keys = append(keys, identifierTrieNodes)

peerAccountsStorer, err := mrcf.store.GetStorer(dataRetriever.PeerAccountsUnit)
if err != nil {
return err
}

identifierTrieNodes = factory.ValidatorTrieNodesTopic + core.CommunicationIdentifierBetweenShards(core.MetachainShardId, core.MetachainShardId)
storageManager, peerAccountsDataTrie, err := mrcf.newImportDBTrieStorage(
peerAccountsStorer,
dataRetriever.PeerAccountsUnit,
mrcf.enableEpochsHandler,
mrcf.stateStatsHandler,
)
if err != nil {
return fmt.Errorf("%w while creating peer accounts data trie storage getter", err)
}
arg = storagerequesters.ArgTrieRequester{
Messenger: mrcf.messenger,
ResponseTopicName: identifierTrieNodes,
Marshalizer: mrcf.marshalizer,
TrieDataGetter: peerAccountsDataTrie,
TrieStorageManager: storageManager,
ManualEpochStartNotifier: mrcf.manualEpochStartNotifier,
ChanGracefullyClose: mrcf.chanGracefullyClose,
DelayBeforeGracefulClose: defaultBeforeGracefulClose,
}

requester, err = storagerequesters.NewTrieNodeRequester(arg)
if err != nil {
return fmt.Errorf("%w while creating peer accounts trie node requester", err)
}

requestersSlice = append(requestersSlice, requester)
keys = append(keys, identifierTrieNodes)

return mrcf.container.AddMultiple(keys, requestersSlice)
}

func (mrcf *metaRequestersContainerFactory) generateRewardsRequesters(
topic string,
unit dataRetriever.UnitType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,10 @@ func TestMetaRequestersContainerFactory_With4ShardsShouldWork(t *testing.T) {
numRequestersUnsigned := noOfShards + 1
numRequestersRewards := noOfShards
numRequestersTxs := noOfShards + 1
numRequestersTrieNodes := 2
numPeerAuthentication := 1
numValidatorInfo := 1
totalRequesters := numRequestersShardHeadersForMetachain + numRequesterMetablocks + numRequestersMiniBlocks +
numRequestersUnsigned + numRequestersTxs + numRequestersTrieNodes + numRequestersRewards + numPeerAuthentication +
numRequestersUnsigned + numRequestersTxs + numRequestersRewards + numPeerAuthentication +
numValidatorInfo

assert.Equal(t, totalRequesters, container.Len())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package storagerequesterscontainer

import (
"fmt"

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-go/dataRetriever"
"github.com/multiversx/mx-chain-go/dataRetriever/factory/containers"
Expand Down Expand Up @@ -76,11 +74,6 @@ func (srcf *shardRequestersContainerFactory) Create() (dataRetriever.RequestersC
return nil, err
}

err = srcf.generateTrieNodesRequesters()
if err != nil {
return nil, err
}

return srcf.container, nil
}

Expand Down Expand Up @@ -151,48 +144,6 @@ func (srcf *shardRequestersContainerFactory) generateMetablockHeaderRequesters()
return srcf.container.Add(identifierHdr, requester)
}

func (srcf *shardRequestersContainerFactory) generateTrieNodesRequesters() error {
shardC := srcf.shardCoordinator

keys := make([]string, 0)
requestersSlice := make([]dataRetriever.Requester, 0)

userAccountsStorer, err := srcf.store.GetStorer(dataRetriever.UserAccountsUnit)
if err != nil {
return err
}

identifierTrieNodes := factory.AccountTrieNodesTopic + shardC.CommunicationIdentifier(core.MetachainShardId)
storageManager, userAccountsDataTrie, err := srcf.newImportDBTrieStorage(
userAccountsStorer,
dataRetriever.UserAccountsUnit,
srcf.enableEpochsHandler,
srcf.stateStatsHandler,
)
if err != nil {
return fmt.Errorf("%w while creating user accounts data trie storage getter", err)
}
arg := storagerequesters.ArgTrieRequester{
Messenger: srcf.messenger,
ResponseTopicName: identifierTrieNodes,
Marshalizer: srcf.marshalizer,
TrieDataGetter: userAccountsDataTrie,
TrieStorageManager: storageManager,
ManualEpochStartNotifier: srcf.manualEpochStartNotifier,
ChanGracefullyClose: srcf.chanGracefullyClose,
DelayBeforeGracefulClose: defaultBeforeGracefulClose,
}
requester, err := storagerequesters.NewTrieNodeRequester(arg)
if err != nil {
return fmt.Errorf("%w while creating user accounts trie node requester", err)
}

requestersSlice = append(requestersSlice, requester)
keys = append(keys, identifierTrieNodes)

return srcf.container.AddMultiple(keys, requestersSlice)
}

func (srcf *shardRequestersContainerFactory) generateRewardRequester(
topic string,
unit dataRetriever.UnitType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,10 @@ func TestShardRequestersContainerFactory_With4ShardsShouldWork(t *testing.T) {
numRequesterHeaders := 1
numRequesterMiniBlocks := noOfShards + 2
numRequesterMetaBlockHeaders := 1
numRequesterTrieNodes := 1
numPeerAuthentication := 1
numValidatorInfo := 1
totalRequesters := numRequesterTxs + numRequesterHeaders + numRequesterMiniBlocks +
numRequesterMetaBlockHeaders + numRequesterSCRs + numRequesterRewardTxs + numRequesterTrieNodes +
numRequesterMetaBlockHeaders + numRequesterSCRs + numRequesterRewardTxs +
numPeerAuthentication + numValidatorInfo

assert.Equal(t, totalRequesters, container.Len())
Expand Down
Loading

0 comments on commit 023a194

Please sign in to comment.