From f5b94d7408e44694402062ab164974cb33b1872c Mon Sep 17 00:00:00 2001 From: bxq2011hust Date: Thu, 9 May 2024 10:40:18 +0800 Subject: [PATCH] add asyncRemoveExpiredNonce of ledger and add log.rotate_time_point --- bcos-framework/bcos-framework/ledger/Ledger.h | 8 ++ .../bcos-framework/ledger/LedgerInterface.h | 6 ++ .../testutils/faker/FakeLedger.h | 1 + bcos-ledger/src/libledger/Ledger.cpp | 32 ++++++++ bcos-ledger/src/libledger/Ledger.h | 8 +- bcos-ledger/src/libledger/LedgerImpl.h | 4 +- bcos-ledger/src/libledger/LedgerMethods.cpp | 7 ++ bcos-ledger/src/libledger/LedgerMethods.h | 3 + .../bcos-pbft/pbft/storage/LedgerStorage.h | 1 - bcos-scheduler/src/SchedulerImpl.cpp | 6 +- .../client/LedgerServiceClient.h | 5 ++ .../bcos-utilities/BoostLogInitializer.cpp | 17 ++++- .../bcos-utilities/BoostLogInitializer.h | 1 + fisco-bcos-air/AirNodeInitializer.cpp | 5 ++ fisco-bcos-air/AirNodeInitializer.h | 1 + fisco-bcos-air/main.cpp | 9 +++ .../main/ExecutorServiceApp.cpp | 3 +- .../main/SchedulerServiceApp.cpp | 3 +- libinitializer/CommandHelper.cpp | 9 ++- libinitializer/CommandHelper.h | 7 ++ libinitializer/Initializer.cpp | 73 +++++++++++++++---- libinitializer/Initializer.h | 3 + libinitializer/LedgerInitializer.h | 4 +- libinitializer/SchedulerInitializer.h | 4 +- tools/archive-tool/archiveTool.cpp | 8 +- .../BaselineScheduler.h | 5 +- 26 files changed, 195 insertions(+), 38 deletions(-) diff --git a/bcos-framework/bcos-framework/ledger/Ledger.h b/bcos-framework/bcos-framework/ledger/Ledger.h index 778b5f94cf..ee31576fee 100644 --- a/bcos-framework/bcos-framework/ledger/Ledger.h +++ b/bcos-framework/bcos-framework/ledger/Ledger.h @@ -40,6 +40,14 @@ inline constexpr struct StoreTransactionsAndReceipts } } storeTransactionsAndReceipts{}; +inline constexpr struct RemoveExpiredNonce +{ + task::Task operator()(auto& ledger, protocol::BlockNumber expiredNumber) const + { + co_await tag_invoke(*this, ledger, expiredNumber); + } +} removeExpiredNonce{}; + inline constexpr struct GetBlockData { task::Task operator()( diff --git a/bcos-framework/bcos-framework/ledger/LedgerInterface.h b/bcos-framework/bcos-framework/ledger/LedgerInterface.h index 0ad7fa4442..3977d483a8 100644 --- a/bcos-framework/bcos-framework/ledger/LedgerInterface.h +++ b/bcos-framework/bcos-framework/ledger/LedgerInterface.h @@ -167,6 +167,12 @@ class LedgerInterface virtual void asyncPreStoreBlockTxs(bcos::protocol::ConstTransactionsPtr _blockTxs, bcos::protocol::Block::ConstPtr block, std::function _callback) = 0; + + /** + * @brief remove expired tx nonces + * @param blockNumber the current block number + */ + virtual void removeExpiredNonce(protocol::BlockNumber blockNumber, bool sync) = 0; }; } // namespace bcos::ledger diff --git a/bcos-framework/bcos-framework/testutils/faker/FakeLedger.h b/bcos-framework/bcos-framework/testutils/faker/FakeLedger.h index 0894d7a4a0..edb26f6c79 100644 --- a/bcos-framework/bcos-framework/testutils/faker/FakeLedger.h +++ b/bcos-framework/bcos-framework/testutils/faker/FakeLedger.h @@ -309,6 +309,7 @@ class FakeLedger : public LedgerInterface, public std::enable_shared_from_this (protocol::BlockNumber)m_blockLimit ? blockNumber - m_blockLimit : 0; + if (expiredNumber > 0) + { + LEDGER_LOG(DEBUG) << "removeExpiredNonce" << LOG_KV("number", blockNumber) + << LOG_KV("expiredNumber", expiredNumber); + auto deleteExpiredNonces = [expiredNumber, storage = m_storage]() { + Entry deletedEntry; + deletedEntry.setStatus(Entry::DELETED); + auto blockNumberStr = boost::lexical_cast(expiredNumber); + storage->asyncSetRow(SYS_BLOCK_NUMBER_2_NONCES, blockNumberStr, std::move(deletedEntry), + [](auto&& error) { + if (error) + { + LEDGER_LOG(WARNING) << LOG_DESC("removeExpiredNonce failed") + << LOG_KV("message", error->errorMessage()); + } + }); + }; + if (sync) + { + deleteExpiredNonces(); + } + else + { + m_threadPool->enqueue(deleteExpiredNonces); + } + } +} + void Ledger::asyncGetNodeListByType(const std::string_view& _type, std::function _onGetConfig) { diff --git a/bcos-ledger/src/libledger/Ledger.h b/bcos-ledger/src/libledger/Ledger.h index 8836596cc2..c18388aeda 100644 --- a/bcos-ledger/src/libledger/Ledger.h +++ b/bcos-ledger/src/libledger/Ledger.h @@ -43,10 +43,12 @@ class Ledger : public LedgerInterface boost::compute::detail::lru_cache>>; Ledger(bcos::protocol::BlockFactory::Ptr _blockFactory, - bcos::storage::StorageInterface::Ptr _storage, int merkleTreeCacheSize = 100) + bcos::storage::StorageInterface::Ptr _storage, size_t _blockLimit, + int merkleTreeCacheSize = 100) : m_blockFactory(std::move(_blockFactory)), m_storage(std::move(_storage)), - m_threadPool(std::make_shared("WriteReceipts", 1)), + m_threadPool(std::make_shared("ledgerWrite", 2)), + m_blockLimit(_blockLimit), m_merkleTreeCacheSize(merkleTreeCacheSize), m_txProofMerkleCache(m_merkleTreeCacheSize), m_receiptProofMerkleCache(m_merkleTreeCacheSize) @@ -99,6 +101,7 @@ class Ledger : public LedgerInterface std::function>)> _onGetList) override; + void removeExpiredNonce(protocol::BlockNumber blockNumber, bool sync = false) override; void asyncGetNodeListByType(const std::string_view& _type, std::function _onGetConfig) override; @@ -161,6 +164,7 @@ class Ledger : public LedgerInterface mutable RecursiveMutex m_mutex; std::shared_ptr m_threadPool; + size_t m_blockLimit; // Maintain merkle trees of 100 blocks int m_merkleTreeCacheSize; diff --git a/bcos-ledger/src/libledger/LedgerImpl.h b/bcos-ledger/src/libledger/LedgerImpl.h index da8f4cf370..cbcd14843e 100644 --- a/bcos-ledger/src/libledger/LedgerImpl.h +++ b/bcos-ledger/src/libledger/LedgerImpl.h @@ -46,8 +46,8 @@ class LedgerImpl : public bcos::concepts::ledger::LedgerBase bcos::ledger::tag_invoke( co_return; } +bcos::task::Task bcos::ledger::tag_invoke(ledger::tag_t /*unused*/, + LedgerInterface& ledger, protocol::BlockNumber blockNumber) +{ + ledger.removeExpiredNonce(blockNumber, false); + co_return; +} + bcos::task::Task bcos::ledger::tag_invoke( ledger::tag_t /*unused*/, LedgerInterface& ledger, protocol::BlockNumber blockNumber, int32_t blockFlag) diff --git a/bcos-ledger/src/libledger/LedgerMethods.h b/bcos-ledger/src/libledger/LedgerMethods.h index 1eaee39b97..3faa0a744b 100644 --- a/bcos-ledger/src/libledger/LedgerMethods.h +++ b/bcos-ledger/src/libledger/LedgerMethods.h @@ -54,6 +54,9 @@ inline task::Task tag_invoke(ledger::tag_t /*unused*/, Ledg task::Task tag_invoke(ledger::tag_t, LedgerInterface& ledger, bcos::protocol::ConstTransactionsPtr blockTxs, bcos::protocol::Block::ConstPtr block); +task::Task tag_invoke(ledger::tag_t, LedgerInterface& ledger, + protocol::BlockNumber expiredNumber); + task::Task tag_invoke(ledger::tag_t /*unused*/, LedgerInterface& ledger, protocol::BlockNumber blockNumber, int32_t blockFlag); diff --git a/bcos-pbft/bcos-pbft/pbft/storage/LedgerStorage.h b/bcos-pbft/bcos-pbft/pbft/storage/LedgerStorage.h index cc531b84c5..cc284b6ecb 100644 --- a/bcos-pbft/bcos-pbft/pbft/storage/LedgerStorage.h +++ b/bcos-pbft/bcos-pbft/pbft/storage/LedgerStorage.h @@ -95,7 +95,6 @@ class LedgerStorage : public PBFTStorage, public std::enable_shared_from_this m_storage; bcos::protocol::BlockFactory::Ptr m_blockFactory; diff --git a/bcos-scheduler/src/SchedulerImpl.cpp b/bcos-scheduler/src/SchedulerImpl.cpp index 6be1063847..687e86efc3 100644 --- a/bcos-scheduler/src/SchedulerImpl.cpp +++ b/bcos-scheduler/src/SchedulerImpl.cpp @@ -665,9 +665,9 @@ void SchedulerImpl::commitBlock(bcos::protocol::BlockHeader::Ptr header, self->m_ledgerConfig = ledgerConfig; commitLock->unlock(); // just unlock here - - // Note: blockNumber = 0, means system deploy, and tx is not existed in txpool. - // So it should not exec tx notifier + self->m_ledger->removeExpiredNonce(blockNumber, false); + // Note: blockNumber = 0, means system deploy, and tx is not existed in + // txpool. So it should not exec tx notifier if (self->m_txNotifier && blockNumber != 0) { SCHEDULER_LOG(DEBUG) << "Start notify block result: " << blockNumber; diff --git a/bcos-tars-protocol/bcos-tars-protocol/client/LedgerServiceClient.h b/bcos-tars-protocol/bcos-tars-protocol/client/LedgerServiceClient.h index fe683f99d9..784feeeb98 100644 --- a/bcos-tars-protocol/bcos-tars-protocol/client/LedgerServiceClient.h +++ b/bcos-tars-protocol/bcos-tars-protocol/client/LedgerServiceClient.h @@ -115,6 +115,11 @@ class LedgerServiceClient : public bcos::ledger::LedgerInterface BCOS_LOG(ERROR) << LOG_DESC("unimplement method asyncGetNonceList"); } + void removeExpiredNonce(bcos::protocol::BlockNumber blockNumber, bool sync) override + { + BCOS_LOG(ERROR) << LOG_DESC("unimplement method asyncGetNonceList"); + } + private: bcostars::LedgerServicePrx m_prx; bcos::protocol::BlockFactory::Ptr m_blockFactory; diff --git a/bcos-utilities/bcos-utilities/BoostLogInitializer.cpp b/bcos-utilities/bcos-utilities/BoostLogInitializer.cpp index 4091dfc7f2..73056e35fe 100644 --- a/bcos-utilities/bcos-utilities/BoostLogInitializer.cpp +++ b/bcos-utilities/bcos-utilities/BoostLogInitializer.cpp @@ -24,6 +24,7 @@ #include "bcos-utilities/BoostLog.h" #include #include +#include #include #include #include @@ -173,6 +174,18 @@ void BoostLogInitializer::initLog(boost::property_tree::ptree const& _pt, m_archivePath = _pt.get("log.archive_path", ""); m_rotateFileNamePattern = _pt.get("log.rotate_name_pattern", "log_%Y%m%d%H.%M.log"); + auto rotateTimePoint = _pt.get("log.rotate_time_point", "0:0:0"); + std::vector rotateTimePointVec; + boost::split(rotateTimePointVec, rotateTimePoint, boost::is_any_of(":")); + if (rotateTimePointVec.size() != 3) + { + throw std::runtime_error( + "log.rotate_time_point must be in format of HH:MM:SS, current is " + rotateTimePoint); + } + for (size_t i = 0; i < rotateTimePointVec.size(); i++) + { + m_rotateTimePoint[i] = std::stoi(rotateTimePointVec[i]); + } m_rotateSize = _pt.get("log.max_log_file_size", 1024) * MB_IN_BYTES; if (m_rotateSize < 100 * MB_IN_BYTES) { @@ -274,8 +287,8 @@ boost::shared_ptr BoostLogInitializer::initLo boost::shared_ptr sink(new sink_t()); sink->locked_backend()->enable_final_rotation(false); sink->locked_backend()->set_open_mode(std::ios::app); - sink->locked_backend()->set_time_based_rotation( - boost::log::sinks::file::rotation_at_time_point(0, 0, 0)); + sink->locked_backend()->set_time_based_rotation(boost::log::sinks::file::rotation_at_time_point( + m_rotateTimePoint[0], m_rotateTimePoint[1], m_rotateTimePoint[2])); sink->locked_backend()->set_file_name_pattern(_logPath + "/" + m_logNamePattern); sink->locked_backend()->set_target_file_name_pattern(_logPath + "/" + m_rotateFileNamePattern); /// set rotation size MB diff --git a/bcos-utilities/bcos-utilities/BoostLogInitializer.h b/bcos-utilities/bcos-utilities/BoostLogInitializer.h index 90e408f149..d4c5cd816a 100644 --- a/bcos-utilities/bcos-utilities/BoostLogInitializer.h +++ b/bcos-utilities/bcos-utilities/BoostLogInitializer.h @@ -154,5 +154,6 @@ class BoostLogInitializer bool m_autoFlush = true; bool m_enableLog = true; std::atomic_bool m_running = {false}; + std::vector m_rotateTimePoint = {0, 0, 0}; }; } // namespace bcos diff --git a/fisco-bcos-air/AirNodeInitializer.cpp b/fisco-bcos-air/AirNodeInitializer.cpp index 6db4c32f0f..3ae464e930 100644 --- a/fisco-bcos-air/AirNodeInitializer.cpp +++ b/fisco-bcos-air/AirNodeInitializer.cpp @@ -191,3 +191,8 @@ void AirNodeInitializer::stop() exit(-1); } } + +void AirNodeInitializer::prune() +{ + m_nodeInitializer->prune(); +} diff --git a/fisco-bcos-air/AirNodeInitializer.h b/fisco-bcos-air/AirNodeInitializer.h index fa23da0a20..d5bec5e514 100644 --- a/fisco-bcos-air/AirNodeInitializer.h +++ b/fisco-bcos-air/AirNodeInitializer.h @@ -44,6 +44,7 @@ class AirNodeInitializer virtual void init(std::string const& _configFilePath, std::string const& _genesisFile); virtual void start(); virtual void stop(); + virtual void prune(); protected: virtual void initAirNode(std::string const& _configFilePath, std::string const& _genesisFile, diff --git a/fisco-bcos-air/main.cpp b/fisco-bcos-air/main.cpp index a00d0cb7d8..bb04a73af0 100644 --- a/fisco-bcos-air/main.cpp +++ b/fisco-bcos-air/main.cpp @@ -57,6 +57,15 @@ int main(int argc, const char* argv[]) { auto param = bcos::initializer::initAirNodeCommandLine(argc, argv, false); initializer->init(param.configFilePath, param.genesisFilePath); + if (param.op == bcos::initializer::Params::operation::Prune) + { + std::cout << "[" << bcos::getCurrentDateTime() << "] "; + std::cout << "prune the node data..." << std::endl; + initializer->prune(); + std::cout << "[" << bcos::getCurrentDateTime() << "] "; + std::cout << "prune the node data success." << std::endl; + return 0; + } bcos::initializer::showNodeVersionMetric(); bcos::initializer::printVersion(); diff --git a/fisco-bcos-tars-service/ExecutorService/main/ExecutorServiceApp.cpp b/fisco-bcos-tars-service/ExecutorService/main/ExecutorServiceApp.cpp index 2e2d948064..eea2d5cfd0 100644 --- a/fisco-bcos-tars-service/ExecutorService/main/ExecutorServiceApp.cpp +++ b/fisco-bcos-tars-service/ExecutorService/main/ExecutorServiceApp.cpp @@ -150,7 +150,8 @@ void ExecutorServiceApp::createAndInitExecutor() std::make_shared(m_nodeConfig->keyPageSize()); auto blockFactory = m_protocolInitializer->blockFactory(); - auto ledger = std::make_shared(blockFactory, storage); + auto ledger = + std::make_shared(blockFactory, storage, m_nodeConfig->blockLimit()); auto executorFactory = std::make_shared(ledger, m_txpool, cacheFactory, storage, executionMessageFactory, stateStorageFactory, diff --git a/fisco-bcos-tars-service/SchedulerService/main/SchedulerServiceApp.cpp b/fisco-bcos-tars-service/SchedulerService/main/SchedulerServiceApp.cpp index 2d37304c92..4b594bbc2a 100644 --- a/fisco-bcos-tars-service/SchedulerService/main/SchedulerServiceApp.cpp +++ b/fisco-bcos-tars-service/SchedulerService/main/SchedulerServiceApp.cpp @@ -150,7 +150,8 @@ void SchedulerServiceApp::createScheduler() auto blockFactory = m_protocolInitializer->blockFactory(); auto ledger = std::make_shared(blockFactory, StorageInitializer::build(m_nodeConfig->pdAddrs(), getLogPath(), m_nodeConfig->pdCaPath(), - m_nodeConfig->pdCertPath(), m_nodeConfig->pdKeyPath())); + m_nodeConfig->pdCertPath(), m_nodeConfig->pdKeyPath()), + m_nodeConfig->blockLimit()); auto executionMessageFactory = std::make_shared(); auto executorManager = std::make_shared( diff --git a/libinitializer/CommandHelper.cpp b/libinitializer/CommandHelper.cpp index cdddad6b01..c5972f1d0e 100644 --- a/libinitializer/CommandHelper.cpp +++ b/libinitializer/CommandHelper.cpp @@ -81,7 +81,7 @@ bcos::initializer::Params bcos::initializer::initAirNodeCommandLine( boost::program_options::value()->default_value("./config.ini"), "config file path, eg. config.ini")("genesis,g", boost::program_options::value()->default_value("./config.genesis"), - "genesis config file path, eg. genesis.ini"); + "genesis config file path, eg. genesis.ini")("prune,p", "prune the node data"); if (_autoSendTx) { @@ -148,5 +148,10 @@ bcos::initializer::Params bcos::initializer::initAirNodeCommandLine( txSpeed = vm["txSpeed"].as(); } } - return bcos::initializer::Params{configPath, genesisFilePath, txSpeed}; + auto op = Params::operation::None; + if (vm.count("prune")) + { + op = Params::operation::Prune; + } + return bcos::initializer::Params{configPath, genesisFilePath, txSpeed, op}; } \ No newline at end of file diff --git a/libinitializer/CommandHelper.h b/libinitializer/CommandHelper.h index 68690e9d42..14c532854a 100644 --- a/libinitializer/CommandHelper.h +++ b/libinitializer/CommandHelper.h @@ -33,6 +33,13 @@ struct Params std::string configFilePath; std::string genesisFilePath; float txSpeed; + enum class operation + { + None, + Prune, + Snapshot, + SnapshotWithoutTxAndReceipt, + } op; }; Params initAirNodeCommandLine(int argc, const char* argv[], bool _autoSendTx); } // namespace initializer diff --git a/libinitializer/Initializer.cpp b/libinitializer/Initializer.cpp index f774ff8efe..57aaf2dd89 100644 --- a/libinitializer/Initializer.cpp +++ b/libinitializer/Initializer.cpp @@ -149,7 +149,7 @@ void Initializer::init(bcos::protocol::NodeArchitectureType _nodeArchType, INITIALIZER_LOG(INFO) << LOG_DESC("initNode") << LOG_KV("storagePath", storagePath) << LOG_KV("storageType", m_nodeConfig->storageType()) << LOG_KV("consensusStoragePath", consensusStoragePath); - bcos::storage::TransactionalStorageInterface::Ptr storage = nullptr; + bcos::storage::TransactionalStorageInterface::Ptr schedulerStorage = nullptr; bcos::storage::TransactionalStorageInterface::Ptr consensusStorage = nullptr; bcos::storage::TransactionalStorageInterface::Ptr airExecutorStorage = nullptr; @@ -165,18 +165,18 @@ void Initializer::init(bcos::protocol::NodeArchitectureType _nodeArchType, option.enable_blob_files = m_nodeConfig->enableRocksDBBlob(); // m_protocolInitializer->dataEncryption() will return nullptr when storage_security = false - storage = + m_storage = StorageInitializer::build(storagePath, option, m_protocolInitializer->dataEncryption(), m_nodeConfig->keyPageSize(), m_nodeConfig->enableStatistics()); - schedulerStorage = storage; + schedulerStorage = m_storage; consensusStorage = StorageInitializer::build( consensusStoragePath, option, m_protocolInitializer->dataEncryption(), 0); - airExecutorStorage = storage; + airExecutorStorage = m_storage; } #ifdef WITH_TIKV else if (boost::iequals(m_nodeConfig->storageType(), "TiKV")) { - storage = StorageInitializer::build(m_nodeConfig->pdAddrs(), _logPath, + m_storage = StorageInitializer::build(m_nodeConfig->pdAddrs(), _logPath, m_nodeConfig->pdCaPath(), m_nodeConfig->pdCertPath(), m_nodeConfig->pdKeyPath()); if (_nodeArchType == bcos::protocol::NodeArchitectureType::MAX) { // TODO: in max node, scheduler will use storage to commit but the ledger only use @@ -184,8 +184,8 @@ void Initializer::init(bcos::protocol::NodeArchitectureType _nodeArchType, // scheduler is committing block schedulerStorage = StorageInitializer::build(m_nodeConfig->pdAddrs(), _logPath, m_nodeConfig->pdCaPath(), m_nodeConfig->pdCertPath(), m_nodeConfig->pdKeyPath()); - consensusStorage = storage; - airExecutorStorage = storage; + consensusStorage = m_storage; + airExecutorStorage = m_storage; } else { // in AIR/PRO node, scheduler and executor in one process so need different storage @@ -205,7 +205,7 @@ void Initializer::init(bcos::protocol::NodeArchitectureType _nodeArchType, // build ledger auto ledger = - LedgerInitializer::build(m_protocolInitializer->blockFactory(), storage, m_nodeConfig); + LedgerInitializer::build(m_protocolInitializer->blockFactory(), m_storage, m_nodeConfig); m_ledger = ledger; bcos::protocol::ExecutionMessageFactory::Ptr executionMessageFactory = nullptr; @@ -237,7 +237,7 @@ void Initializer::init(bcos::protocol::NodeArchitectureType _nodeArchType, bcos::executor::GlobalHashImpl::g_hashImpl = m_protocolInitializer->cryptoSuite()->hashImpl(); // using Hasher = std::remove_cvref_t; - auto existsRocksDB = std::dynamic_pointer_cast(storage); + auto existsRocksDB = std::dynamic_pointer_cast(m_storage); auto baselineSchedulerConfig = m_nodeConfig->baselineSchedulerConfig(); task::syncWait(transaction_scheduler::BaselineSchedulerInitializer::checkRequirements( @@ -290,7 +290,7 @@ void Initializer::init(bcos::protocol::NodeArchitectureType _nodeArchType, if (m_nodeConfig->enableLRUCacheStorage()) { cacheFactory = std::make_shared( - storage, m_nodeConfig->cacheSize()); + m_storage, m_nodeConfig->cacheSize()); INITIALIZER_LOG(INFO) << "initNode: enableLRUCacheStorage, size: " << m_nodeConfig->cacheSize(); } @@ -398,16 +398,17 @@ void Initializer::init(bcos::protocol::NodeArchitectureType _nodeArchType, { INITIALIZER_LOG(INFO) << LOG_BADGE("create archive service"); m_archiveService = std::make_shared( - storage, ledger, m_nodeConfig->archiveListenIP(), m_nodeConfig->archiveListenPort()); + m_storage, ledger, m_nodeConfig->archiveListenIP(), m_nodeConfig->archiveListenPort()); } #ifdef WITH_LIGHTNODE - bcos::storage::StorageImpl storageWrapper(storage); + bcos::storage::StorageImpl storageWrapper(m_storage); auto hasher = m_protocolInitializer->cryptoSuite()->hashImpl()->hasher(); using Hasher = std::remove_cvref_t; auto lightNodeLedger = std::make_shared>(hasher.clone(), - std::move(storageWrapper), m_protocolInitializer->blockFactory(), storage); + std::move(storageWrapper), m_protocolInitializer->blockFactory(), m_storage, + m_nodeConfig->blockLimit()); lightNodeLedger->setKeyPageSize(m_nodeConfig->keyPageSize()); auto txpool = m_txpoolInitializer->txpool(); @@ -607,3 +608,49 @@ void Initializer::stop() exit(-1); } } + + +void Initializer::prune() +{ + auto blockLimit = (protocol::BlockNumber)m_nodeConfig->blockLimit(); + bcos::protocol::BlockNumber currentBlockNumber = 0; + auto ledger = m_ledger; + ledger->asyncGetBlockNumber( + [¤tBlockNumber](Error::Ptr error, bcos::protocol::BlockNumber _blockNumber) { + if (error) + { + INITIALIZER_LOG(ERROR) + << LOG_DESC("get block number failed") << LOG_DESC(error->errorMessage()); + return; + } + currentBlockNumber = _blockNumber; + }); + if (currentBlockNumber <= blockLimit) + { + return; + } + auto endBlockNumber = currentBlockNumber - blockLimit; + for (bcos::protocol::BlockNumber i = blockLimit + 1; i < endBlockNumber; i++) + { + ledger->removeExpiredNonce(i, true); + if (i % 1000 == 0 || i == endBlockNumber) + { + std::cout << "removed nonces of block " << i << std::endl; + } + } + // rocksDB compaction + if (boost::iequals("rocksdb", m_nodeConfig->storageType())) + { + auto storage = std::dynamic_pointer_cast(m_storage); + auto& rocksDB = storage->rocksDB(); + auto startKey = rocksdb::Slice(bcos::storage::toDBKey( + bcos::ledger::SYS_BLOCK_NUMBER_2_NONCES, std::to_string(blockLimit))); + auto endKey = rocksdb::Slice(bcos::storage::toDBKey( + bcos::ledger::SYS_BLOCK_NUMBER_2_NONCES, std::to_string(endBlockNumber))); + auto status = rocksDB.CompactRange(rocksdb::CompactRangeOptions(), &startKey, &endKey); + if (!status.ok()) + { + std::cerr << LOG_DESC("rocksDB compact range failed") << LOG_DESC(status.ToString()); + } + } +} diff --git a/libinitializer/Initializer.h b/libinitializer/Initializer.h index c061bb2f8e..b11d6d6fc2 100644 --- a/libinitializer/Initializer.h +++ b/libinitializer/Initializer.h @@ -53,6 +53,7 @@ class Initializer virtual void start(); virtual void stop(); + virtual void prune(); bcos::tool::NodeConfig::Ptr nodeConfig() { return m_nodeConfig; } ProtocolInitializer::Ptr protocolInitializer() { return m_protocolInitializer; } @@ -82,6 +83,7 @@ class Initializer /// NOTE: this should be last called void initSysContract(); + bcos::storage::TransactionalStorageInterface::Ptr storage() { return m_storage; } private: bcos::tool::NodeConfig::Ptr m_nodeConfig; @@ -100,6 +102,7 @@ class Initializer std::string const c_consensusStorageDBName = "consensus_log"; std::string const c_fileSeparator = "/"; std::shared_ptr m_archiveService = nullptr; + bcos::storage::TransactionalStorageInterface::Ptr m_storage = nullptr; std::function()> m_baselineSchedulerHolder; std::function)> diff --git a/libinitializer/LedgerInitializer.h b/libinitializer/LedgerInitializer.h index 560104766a..38e92d574e 100644 --- a/libinitializer/LedgerInitializer.h +++ b/libinitializer/LedgerInitializer.h @@ -43,14 +43,14 @@ class LedgerInitializer ledger = std::make_shared>( bcos::crypto::hasher::openssl::OpenSSL_SM3_Hasher{}, std::move(storageWrapper), - blockFactory, storage); + blockFactory, storage, nodeConfig->blockLimit()); } else { ledger = std::make_shared>( bcos::crypto::hasher::openssl::OpenSSL_Keccak256_Hasher{}, - std::move(storageWrapper), blockFactory, storage); + std::move(storageWrapper), blockFactory, storage, nodeConfig->blockLimit()); } ledger->buildGenesisBlock(nodeConfig->genesisConfig(), *nodeConfig->ledgerConfig()); diff --git a/libinitializer/SchedulerInitializer.h b/libinitializer/SchedulerInitializer.h index 7b0e20435d..5c6b738a9b 100644 --- a/libinitializer/SchedulerInitializer.h +++ b/libinitializer/SchedulerInitializer.h @@ -43,8 +43,8 @@ class SchedulerInitializer bcos::protocol::ExecutionMessageFactory::Ptr executionMessageFactory, bcos::protocol::BlockFactory::Ptr blockFactory, bcos::txpool::TxPoolInterface::Ptr txPool, bcos::protocol::TransactionSubmitResultFactory::Ptr transactionSubmitResultFactory, - crypto::Hash::Ptr hashImpl, bool isAuthCheck, bool isWasm, bool isSerialExecute, - int64_t schedulerSeq) + crypto::Hash::Ptr hashImpl, size_t blockLimit, bool isAuthCheck, bool isWasm, + bool isSerialExecute, int64_t schedulerSeq) { bcos::scheduler::SchedulerFactory factory(std::move(executorManager), std::move(_ledger), std::move(storage), std::move(executionMessageFactory), std::move(blockFactory), diff --git a/tools/archive-tool/archiveTool.cpp b/tools/archive-tool/archiveTool.cpp index fb9bedb671..600932f55c 100644 --- a/tools/archive-tool/archiveTool.cpp +++ b/tools/archive-tool/archiveTool.cpp @@ -344,8 +344,8 @@ void reimportBlocks(auto archiveStorage, TransactionalStorageInterface::Ptr loca // create factory auto protocolInitializer = std::make_shared(); protocolInitializer->init(nodeConfig); - auto ledger = - std::make_shared(protocolInitializer->blockFactory(), localStorage); + auto ledger = std::make_shared( + protocolInitializer->blockFactory(), localStorage, nodeConfig->blockLimit()); auto blockFactory = protocolInitializer->blockFactory(); auto transactionFactory = blockFactory->transactionFactory(); auto receiptFactory = blockFactory->receiptFactory(); @@ -641,8 +641,8 @@ int main(int argc, const char* argv[]) createBackendStorage(nodeConfig, logInitializer->logPath(), !isArchive, secondaryPath); auto protocolInitializer = std::make_shared(); protocolInitializer->init(nodeConfig); - auto ledger = - std::make_shared(protocolInitializer->blockFactory(), localStorage); + auto ledger = std::make_shared( + protocolInitializer->blockFactory(), localStorage, nodeConfig->blockLimit()); std::promise promise; ledger->asyncGetBlockNumber( [&promise](const Error::Ptr& error, bcos::protocol::BlockNumber number) { diff --git a/transaction-scheduler/bcos-transaction-scheduler/BaselineScheduler.h b/transaction-scheduler/bcos-transaction-scheduler/BaselineScheduler.h index 251369c9f8..7633d4fe25 100644 --- a/transaction-scheduler/bcos-transaction-scheduler/BaselineScheduler.h +++ b/transaction-scheduler/bcos-transaction-scheduler/BaselineScheduler.h @@ -112,8 +112,8 @@ task::Task calculateStateRoot( h256 m_hash; std::reference_wrapper m_hashes; - XORHash(decltype(hashes) const& hashes) : m_hashes(hashes){}; - XORHash(XORHash& source, tbb::split /*unused*/) : m_hashes(source.m_hashes){}; + XORHash(decltype(hashes) const& hashes) : m_hashes(hashes) {}; + XORHash(XORHash& source, tbb::split /*unused*/) : m_hashes(source.m_hashes) {}; void operator()(const tbb::blocked_range& range) { for (size_t i = range.begin(); i != range.end(); ++i) @@ -607,7 +607,6 @@ class BaselineScheduler : public scheduler::SchedulerInterface view, *blockHeader, *transaction, 0, emptyLedgerConfig, task::syncWait); } - callback(nullptr, std::move(receipt)); }(this, std::move(transaction), std::move(callback))); }