From 852ffa3919d96419b617dc8f3edff83a9f9d8c60 Mon Sep 17 00:00:00 2001 From: XingQiang Bai Date: Wed, 31 Jul 2024 17:29:30 +0800 Subject: [PATCH] add storage.sync_archived_blocks (#4556) --- .../bcos-framework/ledger/LedgerInterface.h | 5 + .../testutils/faker/FakeLedger.h | 5 + bcos-ledger/src/libledger/Ledger.cpp | 31 +++++ bcos-ledger/src/libledger/Ledger.h | 2 + bcos-sync/bcos-sync/BlockSync.cpp | 129 ++++++++++++++++++ bcos-sync/bcos-sync/BlockSync.h | 8 +- bcos-sync/bcos-sync/BlockSyncConfig.h | 9 +- bcos-sync/bcos-sync/BlockSyncFactory.cpp | 8 +- bcos-sync/bcos-sync/BlockSyncFactory.h | 6 +- .../client/LedgerServiceClient.h | 7 + bcos-tool/bcos-tool/NodeConfig.cpp | 1 + bcos-tool/bcos-tool/NodeConfig.h | 2 + bcos-utilities/bcos-utilities/Worker.cpp | 2 + libinitializer/Initializer.cpp | 1 - libinitializer/PBFTInitializer.cpp | 3 +- 15 files changed, 209 insertions(+), 10 deletions(-) diff --git a/bcos-framework/bcos-framework/ledger/LedgerInterface.h b/bcos-framework/bcos-framework/ledger/LedgerInterface.h index 1bb3aa8518..8d035a65f5 100644 --- a/bcos-framework/bcos-framework/ledger/LedgerInterface.h +++ b/bcos-framework/bcos-framework/ledger/LedgerInterface.h @@ -136,6 +136,11 @@ class LedgerInterface virtual void asyncGetCurrentStateByKey(std::string_view const& _key, std::function&&)> _callback) = 0; + virtual Error::Ptr setCurrentStateByKey( + std::string_view const& _key, bcos::storage::Entry entry) + { + return nullptr; + } /** * @brief async get system config by table key * @param _key the key of row, you can checkout all key in LedgerTypeDef.h diff --git a/bcos-framework/bcos-framework/testutils/faker/FakeLedger.h b/bcos-framework/bcos-framework/testutils/faker/FakeLedger.h index 1c7a68406d..661a4b3d75 100644 --- a/bcos-framework/bcos-framework/testutils/faker/FakeLedger.h +++ b/bcos-framework/bcos-framework/testutils/faker/FakeLedger.h @@ -247,6 +247,11 @@ class FakeLedger : public LedgerInterface, public std::enable_shared_from_this _onGetConfig) override diff --git a/bcos-ledger/src/libledger/Ledger.cpp b/bcos-ledger/src/libledger/Ledger.cpp index ca19e4422c..61ca8cf0a9 100644 --- a/bcos-ledger/src/libledger/Ledger.cpp +++ b/bcos-ledger/src/libledger/Ledger.cpp @@ -543,6 +543,29 @@ void Ledger::asyncGetBlockDataByNumber(bcos::protocol::BlockNumber _blockNumber, return; } + if ((_blockFlag & TRANSACTIONS) != 0 || (_blockFlag & RECEIPTS) != 0) + { + protocol::BlockNumber archivedBlockNumber = 0; + std::promise>> statePromise; + asyncGetCurrentStateByKey(ledger::SYS_KEY_ARCHIVED_NUMBER, + [&statePromise](Error::Ptr&& err, std::optional&& entry) { + statePromise.set_value(std::make_pair(std::move(err), std::move(entry))); + }); + auto archiveRet = statePromise.get_future().get(); + if (!archiveRet.first && archiveRet.second.has_value()) + { + archivedBlockNumber = boost::lexical_cast(archiveRet.second->get()); + } + if (_blockNumber < archivedBlockNumber) + { + LEDGER_LOG(INFO) << "GetBlockDataByNumber, block number is larger than archived number"; + _onGetBlock(BCOS_ERROR_PTR(LedgerError::ErrorArgument, + "Wrong argument, this block's transactions and receipts are archived"), + nullptr); + return; + } + } + std::list> fetchers; auto block = m_blockFactory->createBlock(); auto total = std::make_shared(0); @@ -2335,3 +2358,11 @@ void Ledger::asyncGetCurrentStateByKey(std::string_view const& _key, }); }); } + +Error::Ptr Ledger::setCurrentStateByKey(std::string_view const& _key, bcos::storage::Entry entry) +{ + std::promise setPromise; + m_stateStorage->asyncSetRow(ledger::SYS_CURRENT_STATE, _key, std::move(entry), + [&](Error::UniquePtr err) { setPromise.set_value(std::move(err)); }); + return setPromise.get_future().get(); +} diff --git a/bcos-ledger/src/libledger/Ledger.h b/bcos-ledger/src/libledger/Ledger.h index 6d8f987d2b..07e97d0ac9 100644 --- a/bcos-ledger/src/libledger/Ledger.h +++ b/bcos-ledger/src/libledger/Ledger.h @@ -112,6 +112,8 @@ class Ledger : public LedgerInterface void asyncGetCurrentStateByKey(std::string_view const& _key, std::function&&)> _callback) override; + Error::Ptr setCurrentStateByKey( + std::string_view const& _key, bcos::storage::Entry entry) override; task::Task> getStorageAt(std::string_view _address, std::string_view _key, protocol::BlockNumber _blockNumber) override; diff --git a/bcos-sync/bcos-sync/BlockSync.cpp b/bcos-sync/bcos-sync/BlockSync.cpp index dc5df88d61..5efae06de3 100644 --- a/bcos-sync/bcos-sync/BlockSync.cpp +++ b/bcos-sync/bcos-sync/BlockSync.cpp @@ -19,10 +19,13 @@ * @date 2021-05-24 */ #include "bcos-sync/BlockSync.h" +#include "bcos-framework/ledger/LedgerTypeDef.h" #include "bcos-framework/protocol/CommonError.h" +#include "bcos-framework/protocol/ProtocolTypeDef.h" #include #include #include +#include using namespace bcos; using namespace bcos::sync; @@ -226,6 +229,12 @@ void BlockSync::executeWorker() // send block-download-request to peers if this node is behind others tryToRequestBlocks(); + + if (m_config->syncArchivedBlockBody()) + { + syncArchivedBlockBody(); + verifyAndCommitArchivedBlock(); + } } catch (std::exception const& e) { @@ -455,7 +464,33 @@ void BlockSync::onPeerStatus(NodeIDPtr _nodeID, BlockSyncMsgInterface::Ptr _sync void BlockSync::onPeerBlocks(NodeIDPtr _nodeID, BlockSyncMsgInterface::Ptr _syncMsg) { auto number = _syncMsg->number(); + auto archivedNumber = m_config->archiveBlockNumber(); auto blockMsg = m_config->msgFactory()->createBlocksMsg(std::move(_syncMsg)); + if (number < archivedNumber) + { + size_t downloadedBlockCount = 0; + { + ReadGuard lock(x_archivedBlockQueue); + downloadedBlockCount = m_archivedBlockQueue.size(); + } + if (downloadedBlockCount >= m_config->maxDownloadingBlockQueueSize()) + { + BLKSYNC_LOG(WARNING) << LOG_BADGE("Download") << LOG_BADGE("BlockSync") + << LOG_DESC("archivedBlockQueue is full") + << LOG_KV("queueSize", downloadedBlockCount); + return; + } + auto block = m_config->blockFactory()->createBlock(blockMsg->blockData(0), true, true); + BLKSYNC_LOG(DEBUG) << LOG_BADGE("Download") << BLOCK_NUMBER(number) + << LOG_BADGE("BlockSync") + << LOG_DESC("Receive peer block packet(archived)") + << LOG_KV("peer", _nodeID->shortHex()); + { + WriteGuard lock(x_archivedBlockQueue); + m_archivedBlockQueue.push(block); + } + return; + } BLKSYNC_LOG(DEBUG) << LOG_BADGE("Download") << BLOCK_NUMBER(number) << LOG_BADGE("BlockSync") << LOG_DESC("Receive peer block packet") << LOG_KV("peer", _nodeID->shortHex()); @@ -1019,3 +1054,97 @@ std::vector BlockSync::getPeerStatus() }); return statuses; } + +void BlockSync::syncArchivedBlockBody() +{ + BlockNumber topBlockNumber = 0; + { + ReadGuard lock(x_archivedBlockQueue); + topBlockNumber = m_archivedBlockQueue.top()->blockHeader()->number(); + } + auto from = topBlockNumber; + auto archivedBlockNumber = m_config->archiveBlockNumber(); + // use 1/4 of the maxDownloadingBlockQueueSize to limit the memory usage + auto maxCount = m_config->maxDownloadingBlockQueueSize() / 4; + if (archivedBlockNumber - from > (BlockNumber)maxCount) + { + from = archivedBlockNumber - (BlockNumber)maxCount; + } + requestBlocks(from, archivedBlockNumber - 1); +} + +void BlockSync::verifyAndCommitArchivedBlock() +{ + BlockNumber topBlockNumber = 0; + { + ReadGuard lock(x_archivedBlockQueue); + topBlockNumber = m_archivedBlockQueue.top()->blockHeader()->number(); + } + auto archivedBlockNumber = m_config->archiveBlockNumber(); + if (topBlockNumber != archivedBlockNumber - 1) + { + return; + } + // verify the tx root and receipt root + bcos::protocol::Block::Ptr block = nullptr; + { + WriteGuard lock(x_archivedBlockQueue); + block = m_archivedBlockQueue.top(); + // m_archivedBlockQueue.pop(); + } + + std::promise blockHeaderFuture; + m_config->ledger()->asyncGetBlockDataByNumber(topBlockNumber, bcos::ledger::HEADER, + [&blockHeaderFuture](const Error::Ptr& error, const Block::Ptr& block) { + if (error) + { + blockHeaderFuture.set_value(nullptr); + } + else + { + blockHeaderFuture.set_value(block->blockHeader()); + } + }); + auto localBlockHeader = blockHeaderFuture.get_future().get(); + if (!localBlockHeader) + { + BLKSYNC_LOG(ERROR) << LOG_DESC("BlockSync get local block header failed") + << LOG_KV("number", topBlockNumber) + << LOG_KV("reason", "get local block header failed"); + return; + } + + auto transactionRoot = + block->calculateTransactionRoot(*m_config->blockFactory()->cryptoSuite()->hashImpl()); + auto receiptRoot = + block->calculateReceiptRoot(*m_config->blockFactory()->cryptoSuite()->hashImpl()); + if (transactionRoot != localBlockHeader->txsRoot() || + receiptRoot != localBlockHeader->receiptsRoot()) + { + BLKSYNC_LOG(ERROR) << LOG_DESC("BlockSync verify archived block failed") + << LOG_KV("number", topBlockNumber) + << LOG_KV("transactionRoot", *toHexString(transactionRoot)) + << LOG_KV("receiptRoot", *toHexString(receiptRoot)) + << LOG_KV("reason", "transactionRoot or receiptRoot not match"); + WriteGuard lock(x_archivedBlockQueue); + m_archivedBlockQueue.pop(); + return; + } + auto err = m_config->ledger()->storeTransactionsAndReceipts(nullptr, block); + if (err) + { + BLKSYNC_LOG(ERROR) << LOG_DESC("BlockSync commit archived block failed") + << LOG_KV("number", topBlockNumber) + << LOG_KV("reason", "storeTransactionsAndReceipts failed"); + return; + } + BLKSYNC_LOG(INFO) << LOG_DESC("BlockSync commit archived block success") + << LOG_KV("number", topBlockNumber); + // update the archived number + m_config->ledger()->setCurrentStateByKey(bcos::ledger::SYS_KEY_ARCHIVED_NUMBER, + bcos::storage::Entry(std::to_string(topBlockNumber))); + { + WriteGuard lock(x_archivedBlockQueue); + m_archivedBlockQueue.pop(); + } +} diff --git a/bcos-sync/bcos-sync/BlockSync.h b/bcos-sync/bcos-sync/BlockSync.h index 16aae731a9..82b8ece831 100644 --- a/bcos-sync/bcos-sync/BlockSync.h +++ b/bcos-sync/bcos-sync/BlockSync.h @@ -133,12 +133,15 @@ class BlockSync : public BlockSyncInterface, // update SyncTreeTopology node info virtual void updateTreeTopologyNodeInfo(); -protected: + virtual void syncArchivedBlockBody(); + virtual void verifyAndCommitArchivedBlock(); + void requestBlocks(bcos::protocol::BlockNumber _from, bcos::protocol::BlockNumber _to); void fetchAndSendBlock( bcos::crypto::PublicPtr const& _peer, bcos::protocol::BlockNumber _number); void printSyncInfo(); +protected: BlockSyncConfig::Ptr m_config; SyncPeerStatus::Ptr m_syncStatus; DownloadingQueue::Ptr m_downloadingQueue; @@ -162,6 +165,9 @@ class BlockSync : public BlockSyncInterface, std::atomic_bool m_masterNode = {false}; bool m_allowFreeNode = false; + mutable SharedMutex x_archivedBlockQueue; + BlockQueue m_archivedBlockQueue; + SyncTreeTopology::Ptr m_syncTreeTopology{nullptr}; }; } // namespace bcos::sync diff --git a/bcos-sync/bcos-sync/BlockSyncConfig.h b/bcos-sync/bcos-sync/BlockSyncConfig.h index 126a8f81be..b256efff12 100644 --- a/bcos-sync/bcos-sync/BlockSyncConfig.h +++ b/bcos-sync/bcos-sync/BlockSyncConfig.h @@ -45,7 +45,8 @@ class BlockSyncConfig : public SyncConfig bcos::scheduler::SchedulerInterface::Ptr _scheduler, bcos::consensus::ConsensusInterface::Ptr _consensus, BlockSyncMsgFactory::Ptr _msgFactory, bcos::tool::NodeTimeMaintenance::Ptr _nodeTimeMaintenance, - bool _enableSendBlockStatusByTree = false, std::uint32_t _syncTreeWidth = 3) + bool _enableSendBlockStatusByTree = false, std::uint32_t _syncTreeWidth = 3, + bool _syncArchivedBlockBody = false) : SyncConfig(std::move(_nodeId)), m_ledger(std::move(_ledger)), m_txpool(std::move(_txpool)), @@ -57,7 +58,8 @@ class BlockSyncConfig : public SyncConfig m_msgFactory(std::move(_msgFactory)), m_nodeTimeMaintenance(std::move(_nodeTimeMaintenance)), m_enableSendBlockStatusByTree(_enableSendBlockStatusByTree), - m_syncTreeWidth(_syncTreeWidth) + m_syncTreeWidth(_syncTreeWidth), + m_syncArchivedBlockBody(_syncArchivedBlockBody) {} ~BlockSyncConfig() override = default; @@ -140,6 +142,7 @@ class BlockSyncConfig : public SyncConfig } bool masterNode() const { return m_masterNode; } + bool syncArchivedBlockBody() const { return m_syncArchivedBlockBody; } bcos::protocol::BlockNumber archiveBlockNumber() const; @@ -205,5 +208,7 @@ class BlockSyncConfig : public SyncConfig bool m_enableSendBlockStatusByTree = false; std::uint32_t m_syncTreeWidth; + + bool m_syncArchivedBlockBody = false; }; } // namespace bcos::sync diff --git a/bcos-sync/bcos-sync/BlockSyncFactory.cpp b/bcos-sync/bcos-sync/BlockSyncFactory.cpp index af378cc472..9036242a33 100644 --- a/bcos-sync/bcos-sync/BlockSyncFactory.cpp +++ b/bcos-sync/bcos-sync/BlockSyncFactory.cpp @@ -32,7 +32,7 @@ BlockSyncFactory::BlockSyncFactory(bcos::crypto::PublicPtr _nodeId, bcos::scheduler::SchedulerInterface::Ptr _scheduler, bcos::consensus::ConsensusInterface::Ptr _consensus, bcos::tool::NodeTimeMaintenance::Ptr _nodeTimeMaintenance, bool _enableSendBlockStatusByTree, - std::uint32_t _syncTreeWidth) + std::uint32_t _syncTreeWidth, bool _syncArchivedBlockBody) : m_nodeId(std::move(_nodeId)), m_blockFactory(std::move(_blockFactory)), m_txResultFactory(std::move(_txResultFactory)), @@ -43,7 +43,8 @@ BlockSyncFactory::BlockSyncFactory(bcos::crypto::PublicPtr _nodeId, m_consensus(std::move(_consensus)), m_nodeTimeMaintenance(std::move(_nodeTimeMaintenance)), m_enableSendBlockStatusByTree(_enableSendBlockStatusByTree), - m_syncTreeWidth(_syncTreeWidth) + m_syncTreeWidth(_syncTreeWidth), + m_syncArchivedBlockBody(_syncArchivedBlockBody) {} BlockSync::Ptr BlockSyncFactory::createBlockSync() @@ -51,6 +52,7 @@ BlockSync::Ptr BlockSyncFactory::createBlockSync() auto msgFactory = std::make_shared(); auto syncConfig = std::make_shared(m_nodeId, m_ledger, m_txpool, m_blockFactory, m_txResultFactory, m_frontService, m_scheduler, m_consensus, msgFactory, - m_nodeTimeMaintenance, m_enableSendBlockStatusByTree, m_syncTreeWidth); + m_nodeTimeMaintenance, m_enableSendBlockStatusByTree, m_syncTreeWidth, + m_syncArchivedBlockBody); return std::make_shared(syncConfig); } diff --git a/bcos-sync/bcos-sync/BlockSyncFactory.h b/bcos-sync/bcos-sync/BlockSyncFactory.h index 3fba4ef823..094b2e0748 100644 --- a/bcos-sync/bcos-sync/BlockSyncFactory.h +++ b/bcos-sync/bcos-sync/BlockSyncFactory.h @@ -37,12 +37,13 @@ class BlockSyncFactory bcos::scheduler::SchedulerInterface::Ptr _scheduler, bcos::consensus::ConsensusInterface::Ptr _consensus, bcos::tool::NodeTimeMaintenance::Ptr _nodeTimeMaintenance, - bool enableSendBlockStatusByTree = false, std::uint32_t syncTreeWidth = 3); + bool enableSendBlockStatusByTree = false, std::uint32_t syncTreeWidth = 3, + bool _syncArchivedBlockBody = false); virtual ~BlockSyncFactory() = default; virtual BlockSync::Ptr createBlockSync(); -protected: +private: bcos::crypto::PublicPtr m_nodeId; bcos::protocol::BlockFactory::Ptr m_blockFactory; bcos::protocol::TransactionSubmitResultFactory::Ptr m_txResultFactory; @@ -54,5 +55,6 @@ class BlockSyncFactory bcos::tool::NodeTimeMaintenance::Ptr m_nodeTimeMaintenance; bool m_enableSendBlockStatusByTree; std::int64_t m_syncTreeWidth; + bool m_syncArchivedBlockBody = false; }; } // namespace bcos::sync \ No newline at end of file diff --git a/bcos-tars-protocol/bcos-tars-protocol/client/LedgerServiceClient.h b/bcos-tars-protocol/bcos-tars-protocol/client/LedgerServiceClient.h index 784feeeb98..5f2e20d5f9 100644 --- a/bcos-tars-protocol/bcos-tars-protocol/client/LedgerServiceClient.h +++ b/bcos-tars-protocol/bcos-tars-protocol/client/LedgerServiceClient.h @@ -98,6 +98,13 @@ class LedgerServiceClient : public bcos::ledger::LedgerInterface BCOS_LOG(ERROR) << LOG_DESC("unimplemented method asyncGetCurrentStateByKey"); } + bcos::Error::Ptr setCurrentStateByKey( + std::string_view const& _key, bcos::storage::Entry entry) override + { + BCOS_LOG(ERROR) << LOG_DESC("unimplemented method setCurrentStateByKey"); + return nullptr; + } + void asyncGetSystemConfigByKey(std::string_view const& _key, std::function _onGetConfig) override; diff --git a/bcos-tool/bcos-tool/NodeConfig.cpp b/bcos-tool/bcos-tool/NodeConfig.cpp index 6c73f29aa9..9d7b2820da 100644 --- a/bcos-tool/bcos-tool/NodeConfig.cpp +++ b/bcos-tool/bcos-tool/NodeConfig.cpp @@ -714,6 +714,7 @@ void NodeConfig::loadStorageConfig(boost::property_tree::ptree const& _pt) m_pdCertPath = _pt.get("storage.pd_ssl_cert_path", ""); m_pdKeyPath = _pt.get("storage.pd_ssl_key_path", ""); m_enableArchive = _pt.get("storage.enable_archive", false); + m_syncArchivedBlocks = _pt.get("storage.sync_archived_blocks", false); m_enableSeparateBlockAndState = _pt.get("storage.enable_separate_block_state", false); if (boost::iequals(m_storageType, bcos::storage::TiKV)) { diff --git a/bcos-tool/bcos-tool/NodeConfig.h b/bcos-tool/bcos-tool/NodeConfig.h index 726d65995b..1deab6d95a 100644 --- a/bcos-tool/bcos-tool/NodeConfig.h +++ b/bcos-tool/bcos-tool/NodeConfig.h @@ -143,6 +143,7 @@ class NodeConfig std::string const& storageDBName() const { return m_storageDBName; } std::string const& stateDBName() const { return m_stateDBName; } bool enableArchive() const { return m_enableArchive; } + bool syncArchivedBlocks() const { return m_syncArchivedBlocks; } bool enableSeparateBlockAndState() const { return m_enableSeparateBlockAndState; } std::string const& archiveListenIP() const { return m_archiveListenIP; } uint16_t archiveListenPort() const { return m_archiveListenPort; } @@ -401,6 +402,7 @@ class NodeConfig bool m_enableRocksDBBlob = false; bool m_enableArchive = false; + bool m_syncArchivedBlocks = false; bool m_enableSeparateBlockAndState = false; std::string m_stateDBPath; std::string m_blockDBPath; diff --git a/bcos-utilities/bcos-utilities/Worker.cpp b/bcos-utilities/bcos-utilities/Worker.cpp index 1eac77b527..081156c75d 100644 --- a/bcos-utilities/bcos-utilities/Worker.cpp +++ b/bcos-utilities/bcos-utilities/Worker.cpp @@ -123,7 +123,9 @@ void Worker::terminate() if (m_workerThread) { if (m_workerState.exchange(WorkerState::Killing) == WorkerState::Killing) + { return; // Somebody else is doing this + } l.unlock(); m_workerStateNotifier.notify_all(); m_workerThread->join(); diff --git a/libinitializer/Initializer.cpp b/libinitializer/Initializer.cpp index 0d8d554914..e25af8718a 100644 --- a/libinitializer/Initializer.cpp +++ b/libinitializer/Initializer.cpp @@ -1060,7 +1060,6 @@ bcos::Error::Ptr Initializer::importSnapshotToRocksDB( return BCOS_ERROR_PTR(-1, metaFilePath.string() + " does not exist"); } auto tomlTable = toml::parse_file(metaFilePath.string()); - std::cout << tomlTable << std::endl; auto snapshotWithTxAndReceipts = tomlTable["snapshot"]["withTxAndReceipts"].value(); auto snapshotBlockNumber = tomlTable["snapshot"]["blockNumber"].value(); auto stateSstCount = tomlTable["snapshot"]["stateSstCount"].value(); diff --git a/libinitializer/PBFTInitializer.cpp b/libinitializer/PBFTInitializer.cpp index b8eae331e6..87886320fa 100644 --- a/libinitializer/PBFTInitializer.cpp +++ b/libinitializer/PBFTInitializer.cpp @@ -453,7 +453,8 @@ void PBFTInitializer::createSync() auto blockSyncFactory = std::make_shared(keyPair->publicKey(), m_protocolInitializer->blockFactory(), m_protocolInitializer->txResultFactory(), m_ledger, m_txpool, m_frontService, m_scheduler, m_pbft, m_nodeTimeMaintenance, - m_nodeConfig->enableSendBlockStatusByTree(), m_nodeConfig->treeWidth()); + m_nodeConfig->enableSendBlockStatusByTree(), m_nodeConfig->treeWidth(), + m_nodeConfig->syncArchivedBlocks()); m_blockSync = blockSyncFactory->createBlockSync(); m_blockSync->setFaultyNodeBlockDelta(m_nodeConfig->pipelineSize()); m_blockSync->setAllowFreeNodeSync(m_nodeConfig->allowFreeNodeSync());