Skip to content

Commit

Permalink
add asyncRemoveExpiredNonce of ledger and add log.rotate_time_point
Browse files Browse the repository at this point in the history
  • Loading branch information
bxq2011hust committed May 14, 2024
1 parent 89bfa03 commit 965952f
Show file tree
Hide file tree
Showing 26 changed files with 195 additions and 38 deletions.
8 changes: 8 additions & 0 deletions bcos-framework/bcos-framework/ledger/Ledger.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ inline constexpr struct StoreTransactionsAndReceipts
}
} storeTransactionsAndReceipts{};

inline constexpr struct RemoveExpiredNonce
{
task::Task<void> operator()(auto& ledger, protocol::BlockNumber expiredNumber) const
{
co_await tag_invoke(*this, ledger, expiredNumber);
}
} removeExpiredNonce{};

inline constexpr struct GetBlockData
{
task::Task<protocol::Block::Ptr> operator()(
Expand Down
6 changes: 6 additions & 0 deletions bcos-framework/bcos-framework/ledger/LedgerInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,12 @@ class LedgerInterface
virtual void asyncPreStoreBlockTxs(bcos::protocol::ConstTransactionsPtr _blockTxs,
bcos::protocol::Block::ConstPtr block,
std::function<void(Error::UniquePtr&&)> _callback) = 0;

/**
* @brief remove expired tx nonces
* @param blockNumber the current block number
*/
virtual void removeExpiredNonce(protocol::BlockNumber blockNumber, bool sync) = 0;
};

} // namespace bcos::ledger
1 change: 1 addition & 0 deletions bcos-framework/bcos-framework/testutils/faker/FakeLedger.h
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ class FakeLedger : public LedgerInterface, public std::enable_shared_from_this<F
}
_onGetList(nullptr, nonceList);
}
void removeExpiredNonce(protocol::BlockNumber blockNumber, bool sync) override {}

void setStatus(bool _normal) { m_statusNormal = _normal; }
void setTotalTxCount(size_t _totalTxCount) { m_totalTxCount = _totalTxCount; }
Expand Down
32 changes: 32 additions & 0 deletions bcos-ledger/src/libledger/Ledger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1140,6 +1140,38 @@ void Ledger::asyncGetNonceList(bcos::protocol::BlockNumber _startNumber, int64_t
});
}

void Ledger::removeExpiredNonce(protocol::BlockNumber blockNumber, bool sync)
{
auto expiredNumber =
blockNumber > (protocol::BlockNumber)m_blockLimit ? blockNumber - m_blockLimit : 0;
if (expiredNumber > 0)
{
LEDGER_LOG(DEBUG) << "removeExpiredNonce" << LOG_KV("number", blockNumber)
<< LOG_KV("expiredNumber", expiredNumber);
auto deleteExpiredNonces = [expiredNumber, storage = m_storage]() {
Entry deletedEntry;
deletedEntry.setStatus(Entry::DELETED);
auto blockNumberStr = boost::lexical_cast<std::string>(expiredNumber);
storage->asyncSetRow(SYS_BLOCK_NUMBER_2_NONCES, blockNumberStr, std::move(deletedEntry),
[](auto&& error) {
if (error)
{
LEDGER_LOG(WARNING) << LOG_DESC("removeExpiredNonce failed")
<< LOG_KV("message", error->errorMessage());
}
});
};
if (sync)
{
deleteExpiredNonces();
}
else
{
m_threadPool->enqueue(deleteExpiredNonces);
}
}
}

void Ledger::asyncGetNodeListByType(const std::string_view& _type,
std::function<void(Error::Ptr, consensus::ConsensusNodeListPtr)> _onGetConfig)
{
Expand Down
8 changes: 6 additions & 2 deletions bcos-ledger/src/libledger/Ledger.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ class Ledger : public LedgerInterface
boost::compute::detail::lru_cache<int64_t, std::shared_ptr<std::vector<h256>>>;

Ledger(bcos::protocol::BlockFactory::Ptr _blockFactory,
bcos::storage::StorageInterface::Ptr _storage, int merkleTreeCacheSize = 100)
bcos::storage::StorageInterface::Ptr _storage, size_t _blockLimit,
int merkleTreeCacheSize = 100)
: m_blockFactory(std::move(_blockFactory)),
m_storage(std::move(_storage)),
m_threadPool(std::make_shared<ThreadPool>("WriteReceipts", 1)),
m_threadPool(std::make_shared<ThreadPool>("ledgerWrite", 2)),
m_blockLimit(_blockLimit),
m_merkleTreeCacheSize(merkleTreeCacheSize),
m_txProofMerkleCache(m_merkleTreeCacheSize),
m_receiptProofMerkleCache(m_merkleTreeCacheSize)
Expand Down Expand Up @@ -99,6 +101,7 @@ class Ledger : public LedgerInterface
std::function<void(
Error::Ptr, std::shared_ptr<std::map<protocol::BlockNumber, protocol::NonceListPtr>>)>
_onGetList) override;
void removeExpiredNonce(protocol::BlockNumber blockNumber, bool sync = false) override;

void asyncGetNodeListByType(const std::string_view& _type,
std::function<void(Error::Ptr, consensus::ConsensusNodeListPtr)> _onGetConfig) override;
Expand Down Expand Up @@ -161,6 +164,7 @@ class Ledger : public LedgerInterface

mutable RecursiveMutex m_mutex;
std::shared_ptr<bcos::ThreadPool> m_threadPool;
size_t m_blockLimit;

// Maintain merkle trees of 100 blocks
int m_merkleTreeCacheSize;
Expand Down
4 changes: 2 additions & 2 deletions bcos-ledger/src/libledger/LedgerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ class LedgerImpl : public bcos::concepts::ledger::LedgerBase<LedgerImpl<Hasher,

public:
LedgerImpl(Hasher hasher, Storage storage, bcos::protocol::BlockFactory::Ptr blockFactory,
bcos::storage::StorageInterface::Ptr storageInterface)
: Ledger(std::move(blockFactory), storageInterface),
bcos::storage::StorageInterface::Ptr storageInterface, size_t blockLimit)
: Ledger(std::move(blockFactory), storageInterface, blockLimit),
m_hasher(std::move(hasher)),
m_backupStorage(storageInterface),
m_storage{std::move(storage)},
Expand Down
7 changes: 7 additions & 0 deletions bcos-ledger/src/libledger/LedgerMethods.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ bcos::task::Task<void> bcos::ledger::tag_invoke(
co_return;
}

bcos::task::Task<void> bcos::ledger::tag_invoke(ledger::tag_t<removeExpiredNonce> /*unused*/,
LedgerInterface& ledger, protocol::BlockNumber blockNumber)
{
ledger.removeExpiredNonce(blockNumber, false);
co_return;
}

bcos::task::Task<bcos::protocol::Block::Ptr> bcos::ledger::tag_invoke(
ledger::tag_t<getBlockData> /*unused*/, LedgerInterface& ledger,
protocol::BlockNumber blockNumber, int32_t blockFlag)
Expand Down
3 changes: 3 additions & 0 deletions bcos-ledger/src/libledger/LedgerMethods.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ inline task::Task<void> tag_invoke(ledger::tag_t<prewriteBlock> /*unused*/, Ledg
task::Task<void> tag_invoke(ledger::tag_t<storeTransactionsAndReceipts>, LedgerInterface& ledger,
bcos::protocol::ConstTransactionsPtr blockTxs, bcos::protocol::Block::ConstPtr block);

task::Task<void> tag_invoke(ledger::tag_t<removeExpiredNonce>, LedgerInterface& ledger,
protocol::BlockNumber expiredNumber);

task::Task<protocol::Block::Ptr> tag_invoke(ledger::tag_t<getBlockData> /*unused*/,
LedgerInterface& ledger, protocol::BlockNumber blockNumber, int32_t blockFlag);

Expand Down
1 change: 0 additions & 1 deletion bcos-pbft/bcos-pbft/pbft/storage/LedgerStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ class LedgerStorage : public PBFTStorage, public std::enable_shared_from_this<Le
bcos::protocol::BlockHeader::Ptr _blockHeader,
bcos::ledger::LedgerConfig::Ptr _ledgerConfig);

protected:
bcos::scheduler::SchedulerInterface::Ptr m_scheduler;
std::shared_ptr<bcos::storage::KVStorageHelper> m_storage;
bcos::protocol::BlockFactory::Ptr m_blockFactory;
Expand Down
6 changes: 3 additions & 3 deletions bcos-scheduler/src/SchedulerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -665,9 +665,9 @@ void SchedulerImpl::commitBlock(bcos::protocol::BlockHeader::Ptr header,
self->m_ledgerConfig = ledgerConfig;
commitLock->unlock(); // just unlock here


// Note: blockNumber = 0, means system deploy, and tx is not existed in txpool.
// So it should not exec tx notifier
self->m_ledger->removeExpiredNonce(blockNumber, false);
// Note: blockNumber = 0, means system deploy, and tx is not existed in
// txpool. So it should not exec tx notifier
if (self->m_txNotifier && blockNumber != 0)
{
SCHEDULER_LOG(DEBUG) << "Start notify block result: " << blockNumber;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ class LedgerServiceClient : public bcos::ledger::LedgerInterface
BCOS_LOG(ERROR) << LOG_DESC("unimplement method asyncGetNonceList");
}

void removeExpiredNonce(bcos::protocol::BlockNumber blockNumber, bool sync) override
{
BCOS_LOG(ERROR) << LOG_DESC("unimplement method asyncGetNonceList");
}

private:
bcostars::LedgerServicePrx m_prx;
bcos::protocol::BlockFactory::Ptr m_blockFactory;
Expand Down
17 changes: 15 additions & 2 deletions bcos-utilities/bcos-utilities/BoostLogInitializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "bcos-utilities/BoostLog.h"
#include <bcos-framework/bcos-framework/protocol/GlobalConfig.h>
#include <bcos-utilities/RateCollector.h>
#include <boost/algorithm/string/split.hpp>
#include <boost/core/null_deleter.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/log/core/core.hpp>
Expand Down Expand Up @@ -173,6 +174,18 @@ void BoostLogInitializer::initLog(boost::property_tree::ptree const& _pt,
m_archivePath = _pt.get<std::string>("log.archive_path", "");
m_rotateFileNamePattern =
_pt.get<std::string>("log.rotate_name_pattern", "log_%Y%m%d%H.%M.log");
auto rotateTimePoint = _pt.get<std::string>("log.rotate_time_point", "0:0:0");
std::vector<std::string> rotateTimePointVec;
boost::split(rotateTimePointVec, rotateTimePoint, boost::is_any_of(":"));
if (rotateTimePointVec.size() != 3)
{
throw std::runtime_error(
"log.rotate_time_point must be in format of HH:MM:SS, current is " + rotateTimePoint);
}
for (size_t i = 0; i < rotateTimePointVec.size(); i++)
{
m_rotateTimePoint[i] = std::stoi(rotateTimePointVec[i]);
}
m_rotateSize = _pt.get<uint64_t>("log.max_log_file_size", 1024) * MB_IN_BYTES;
if (m_rotateSize < 100 * MB_IN_BYTES)
{
Expand Down Expand Up @@ -274,8 +287,8 @@ boost::shared_ptr<bcos::BoostLogInitializer::sink_t> BoostLogInitializer::initLo
boost::shared_ptr<sink_t> sink(new sink_t());
sink->locked_backend()->enable_final_rotation(false);
sink->locked_backend()->set_open_mode(std::ios::app);
sink->locked_backend()->set_time_based_rotation(
boost::log::sinks::file::rotation_at_time_point(0, 0, 0));
sink->locked_backend()->set_time_based_rotation(boost::log::sinks::file::rotation_at_time_point(
m_rotateTimePoint[0], m_rotateTimePoint[1], m_rotateTimePoint[2]));
sink->locked_backend()->set_file_name_pattern(_logPath + "/" + m_logNamePattern);
sink->locked_backend()->set_target_file_name_pattern(_logPath + "/" + m_rotateFileNamePattern);
/// set rotation size MB
Expand Down
1 change: 1 addition & 0 deletions bcos-utilities/bcos-utilities/BoostLogInitializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,5 +154,6 @@ class BoostLogInitializer
bool m_autoFlush = true;
bool m_enableLog = true;
std::atomic_bool m_running = {false};
std::vector<int> m_rotateTimePoint = {0, 0, 0};
};
} // namespace bcos
5 changes: 5 additions & 0 deletions fisco-bcos-air/AirNodeInitializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,8 @@ void AirNodeInitializer::stop()
exit(-1);
}
}

void AirNodeInitializer::prune()
{
m_nodeInitializer->prune();
}
1 change: 1 addition & 0 deletions fisco-bcos-air/AirNodeInitializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class AirNodeInitializer
virtual void init(std::string const& _configFilePath, std::string const& _genesisFile);
virtual void start();
virtual void stop();
virtual void prune();

protected:
virtual void initAirNode(std::string const& _configFilePath, std::string const& _genesisFile,
Expand Down
9 changes: 9 additions & 0 deletions fisco-bcos-air/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ int main(int argc, const char* argv[])
{
auto param = bcos::initializer::initAirNodeCommandLine(argc, argv, false);
initializer->init(param.configFilePath, param.genesisFilePath);
if (param.op == bcos::initializer::Params::operation::Prune)
{
std::cout << "[" << bcos::getCurrentDateTime() << "] ";
std::cout << "prune the node data..." << std::endl;
initializer->prune();
std::cout << "[" << bcos::getCurrentDateTime() << "] ";
std::cout << "prune the node data success." << std::endl;
return 0;
}
bcos::initializer::showNodeVersionMetric();

bcos::initializer::printVersion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ void ExecutorServiceApp::createAndInitExecutor()
std::make_shared<bcos::storage::StateStorageFactory>(m_nodeConfig->keyPageSize());

auto blockFactory = m_protocolInitializer->blockFactory();
auto ledger = std::make_shared<bcos::ledger::Ledger>(blockFactory, storage);
auto ledger =
std::make_shared<bcos::ledger::Ledger>(blockFactory, storage, m_nodeConfig->blockLimit());

auto executorFactory = std::make_shared<bcos::executor::TransactionExecutorFactory>(ledger,
m_txpool, cacheFactory, storage, executionMessageFactory, stateStorageFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ void SchedulerServiceApp::createScheduler()
auto blockFactory = m_protocolInitializer->blockFactory();
auto ledger = std::make_shared<bcos::ledger::Ledger>(blockFactory,
StorageInitializer::build(m_nodeConfig->pdAddrs(), getLogPath(), m_nodeConfig->pdCaPath(),
m_nodeConfig->pdCertPath(), m_nodeConfig->pdKeyPath()));
m_nodeConfig->pdCertPath(), m_nodeConfig->pdKeyPath()),
m_nodeConfig->blockLimit());
auto executionMessageFactory =
std::make_shared<bcostars::protocol::ExecutionMessageFactoryImpl>();
auto executorManager = std::make_shared<bcos::scheduler::RemoteExecutorManager>(
Expand Down
9 changes: 7 additions & 2 deletions libinitializer/CommandHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ bcos::initializer::Params bcos::initializer::initAirNodeCommandLine(
boost::program_options::value<std::string>()->default_value("./config.ini"),
"config file path, eg. config.ini")("genesis,g",
boost::program_options::value<std::string>()->default_value("./config.genesis"),
"genesis config file path, eg. genesis.ini");
"genesis config file path, eg. genesis.ini")("prune,p", "prune the node data");

if (_autoSendTx)
{
Expand Down Expand Up @@ -148,5 +148,10 @@ bcos::initializer::Params bcos::initializer::initAirNodeCommandLine(
txSpeed = vm["txSpeed"].as<float>();
}
}
return bcos::initializer::Params{configPath, genesisFilePath, txSpeed};
auto op = Params::operation::None;
if (vm.count("prune"))
{
op = Params::operation::Prune;
}
return bcos::initializer::Params{configPath, genesisFilePath, txSpeed, op};
}
7 changes: 7 additions & 0 deletions libinitializer/CommandHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ struct Params
std::string configFilePath;
std::string genesisFilePath;
float txSpeed;
enum class operation
{
None,
Prune,
Snapshot,
SnapshotWithoutTxAndReceipt,
} op;
};
Params initAirNodeCommandLine(int argc, const char* argv[], bool _autoSendTx);
} // namespace initializer
Expand Down
Loading

0 comments on commit 965952f

Please sign in to comment.