From e7601c66d6d2f498b75951c2d0c8f423bf4ccadb Mon Sep 17 00:00:00 2001 From: bxq2011hust Date: Mon, 12 Aug 2024 09:34:16 +0800 Subject: [PATCH] fix archiveTool and archive block sync --- .../bcos-framework/protocol/Block.h | 4 + bcos-ledger/src/libledger/Ledger.cpp | 2 +- bcos-sync/bcos-sync/BlockSync.cpp | 229 ++++++++++++------ bcos-sync/bcos-sync/BlockSync.h | 26 +- .../interfaces/BlockRequestInterface.h | 2 + .../bcos-sync/protocol/PB/BlockRequestImpl.h | 3 + .../bcos-sync/protocol/proto/BlockSync.proto | 1 + .../bcos-sync/state/DownloadRequestQueue.cpp | 22 +- .../bcos-sync/state/DownloadRequestQueue.h | 20 +- tools/archive-tool/archiveTool.cpp | 4 +- 10 files changed, 214 insertions(+), 99 deletions(-) diff --git a/bcos-framework/bcos-framework/protocol/Block.h b/bcos-framework/bcos-framework/protocol/Block.h index 6fbded2f61..7ada87a532 100644 --- a/bcos-framework/bcos-framework/protocol/Block.h +++ b/bcos-framework/bcos-framework/protocol/Block.h @@ -113,6 +113,10 @@ class Block }) | RANGES::to()); } + bool operator<(const Block& block) const + { + return blockHeaderConst()->number() < block.blockHeaderConst()->number(); + } }; using Blocks = std::vector; using BlocksPtr = std::shared_ptr; diff --git a/bcos-ledger/src/libledger/Ledger.cpp b/bcos-ledger/src/libledger/Ledger.cpp index 61ca8cf0a9..7b9092020f 100644 --- a/bcos-ledger/src/libledger/Ledger.cpp +++ b/bcos-ledger/src/libledger/Ledger.cpp @@ -1533,7 +1533,7 @@ void Ledger::asyncBatchGetReceipts(std::shared_ptr> has { if (!entry.has_value()) { - LEDGER_LOG(DEBUG) << "Get receipt with empty entry: " << (*hashes)[i]; + LEDGER_LOG(DEBUG) << "Get receipt with empty entry: " << toHex((*hashes)[i]); callback(BCOS_ERROR_PTR( LedgerError::GetStorageError, "Batch get transaction failed"), std::vector()); diff --git a/bcos-sync/bcos-sync/BlockSync.cpp b/bcos-sync/bcos-sync/BlockSync.cpp index 5efae06de3..06f09d994a 100644 --- a/bcos-sync/bcos-sync/BlockSync.cpp +++ b/bcos-sync/bcos-sync/BlockSync.cpp @@ -232,8 +232,13 @@ void BlockSync::executeWorker() if (m_config->syncArchivedBlockBody()) { - syncArchivedBlockBody(); - verifyAndCommitArchivedBlock(); + auto archivedBlockNumber = m_config->archiveBlockNumber(); + if (archivedBlockNumber == 0) + { + return; + } + syncArchivedBlockBody(archivedBlockNumber); + verifyAndCommitArchivedBlock(archivedBlockNumber); } } catch (std::exception const& e) @@ -468,22 +473,36 @@ void BlockSync::onPeerBlocks(NodeIDPtr _nodeID, BlockSyncMsgInterface::Ptr _sync auto blockMsg = m_config->msgFactory()->createBlocksMsg(std::move(_syncMsg)); if (number < archivedNumber) { + bcos::protocol::BlockNumber topNumber = 0; size_t downloadedBlockCount = 0; { ReadGuard lock(x_archivedBlockQueue); downloadedBlockCount = m_archivedBlockQueue.size(); + if (!m_archivedBlockQueue.empty()) + { + topNumber = m_archivedBlockQueue.top()->blockHeader()->number(); + if (topNumber == number) + { + return; + } + } } - if (downloadedBlockCount >= m_config->maxDownloadingBlockQueueSize()) + if (downloadedBlockCount >= m_config->maxDownloadingBlockQueueSize() && + number != archivedNumber - 1) { BLKSYNC_LOG(WARNING) << LOG_BADGE("Download") << LOG_BADGE("BlockSync") << LOG_DESC("archivedBlockQueue is full") - << LOG_KV("queueSize", downloadedBlockCount); + << LOG_KV("queueSize", downloadedBlockCount) + << LOG_KV("receivedBlockNumber", number) + << LOG_KV("topArchivedQueue", topNumber) + << LOG_KV("archivedBlockNumber", archivedNumber); return; } auto block = m_config->blockFactory()->createBlock(blockMsg->blockData(0), true, true); BLKSYNC_LOG(DEBUG) << LOG_BADGE("Download") << BLOCK_NUMBER(number) - << LOG_BADGE("BlockSync") << LOG_DESC("Receive peer block packet(archived)") + << LOG_KV("topArchivedQueue", topNumber) + << LOG_KV("archivedBlockNumber", archivedNumber) << LOG_KV("peer", _nodeID->shortHex()); { WriteGuard lock(x_archivedBlockQueue); @@ -505,6 +524,7 @@ void BlockSync::onPeerBlocksRequest(NodeIDPtr _nodeID, BlockSyncMsgInterface::Pt << LOG_DESC("Receive block request") << LOG_KV("peer", _nodeID->shortHex()) << LOG_KV("from", blockRequest->number()) << LOG_KV("size", blockRequest->size()) + << LOG_KV("flag", blockRequest->blockDataFlag()) << LOG_KV("interval", blockRequest->blockInterval()); auto peerStatus = m_syncStatus->peerStatus(_nodeID); if (!peerStatus && (m_config->existsInGroup(_nodeID) || m_allowFreeNode)) @@ -522,8 +542,8 @@ void BlockSync::onPeerBlocksRequest(NodeIDPtr _nodeID, BlockSyncMsgInterface::Pt } if (peerStatus) { - peerStatus->downloadRequests()->push( - blockRequest->number(), blockRequest->size(), blockRequest->blockInterval()); + peerStatus->downloadRequests()->push(blockRequest->number(), blockRequest->size(), + blockRequest->blockDataFlag(), blockRequest->blockInterval()); m_signalled.notify_all(); return; } @@ -587,10 +607,11 @@ void BlockSync::tryToRequestBlocks() { return; } - requestBlocks(currentNumber, requestToNumber); + requestBlocks( + currentNumber, requestToNumber, bcos::ledger::TRANSACTIONS | bcos::ledger::HEADER); } -void BlockSync::requestBlocks(BlockNumber _from, BlockNumber _to) +void BlockSync::requestBlocks(BlockNumber _from, BlockNumber _to, int32_t blockDataFlag) { BLKSYNC_LOG(INFO) << LOG_BADGE("Download") << LOG_BADGE("requestBlocks") << LOG_KV("from", _from) << LOG_KV("to", _to); @@ -607,62 +628,63 @@ void BlockSync::requestBlocks(BlockNumber _from, BlockNumber _to) { bool findPeer = false; // shard: [from, to] - m_syncStatus->foreachPeerRandom([this, &_from, &shard, &interval, &blockSizePerShard, &_to, - &findPeer, &shardNumber](PeerStatus::Ptr _p) { - if (_p->number() < m_config->knownHighestNumber()) - { - // Only send request to nodes which are not syncing(has max number) - return true; - } - // BlockNumber from = _from + 1 + shard * blockSizePerShard; - // BlockNumber to = std::min((BlockNumber)(from + blockSizePerShard - 1), _to); + m_syncStatus->foreachPeerRandom( + [this, &_from, &shard, &interval, &blockSizePerShard, &_to, &findPeer, &shardNumber, + blockDataFlag](PeerStatus::Ptr _p) { + if (_p->number() < m_config->knownHighestNumber()) + { + // Only send request to nodes which are not syncing(has max number) + return true; + } + // BlockNumber from = _from + 1 + shard * blockSizePerShard; + // BlockNumber to = std::min((BlockNumber)(from + blockSizePerShard - 1), _to); - /// example: _from=0, interval=3, blockSizePerShard=4, _to=unlimited, then loop twice - /// peer0: [1, 4, 7, 10], [13, 16, 19, 22] - /// peer1: [2, 5, 8, 11], [14, 17, 20, 23] - /// peer2: [3, 6, 9, 12], [15, 18, 21, 24] - BlockNumber from = _from + 1 + (shard % interval) + - (shard / interval) * (blockSizePerShard * interval); - BlockNumber to = - std::min((BlockNumber)(from + (blockSizePerShard - 1) * interval), _to); - BlockNumber size = (to - from) / interval + 1; - if (_p->number() < to || _p->archivedBlockNumber() >= from) - { - return true; // to next peer - } - // found a peer - findPeer = true; - auto blockRequest = m_config->msgFactory()->createBlockRequest(); - if (size <= 1) [[unlikely]] - { - blockRequest->setNumber(from); - blockRequest->setSize(to - from + 1); - } - else [[likely]] - { - blockRequest->setNumber(from); - blockRequest->setSize(size); - blockRequest->setBlockInterval(interval); - } - auto encodedData = blockRequest->encode(); - m_config->frontService()->asyncSendMessageByNodeID( - ModuleID::BlockSync, _p->nodeId(), ref(*encodedData), 0, nullptr); + /// example: _from=0, interval=3, blockSizePerShard=4, _to=unlimited, then loop + /// twice peer0: [1, 4, 7, 10], [13, 16, 19, 22] peer1: [2, 5, 8, 11], [14, 17, 20, + /// 23] peer2: [3, 6, 9, 12], [15, 18, 21, 24] + BlockNumber from = _from + 1 + (shard % interval) + + (shard / interval) * (blockSizePerShard * interval); + BlockNumber to = + std::min((BlockNumber)(from + (blockSizePerShard - 1) * interval), _to); + BlockNumber size = (to - from) / interval + 1; + if (_p->number() < to || _p->archivedBlockNumber() >= from) + { + return true; // to next peer + } + // found a peer + findPeer = true; + auto blockRequest = m_config->msgFactory()->createBlockRequest(); + blockRequest->setBlockDataFlag(blockDataFlag); + if (size <= 1) [[unlikely]] + { + blockRequest->setNumber(from); + blockRequest->setSize(to - from + 1); + } + else [[likely]] + { + blockRequest->setNumber(from); + blockRequest->setSize(size); + blockRequest->setBlockInterval(interval); + } + auto encodedData = blockRequest->encode(); + m_config->frontService()->asyncSendMessageByNodeID( + ModuleID::BlockSync, _p->nodeId(), ref(*encodedData), 0, nullptr); - m_maxRequestNumber = std::max(m_maxRequestNumber.load(), to); + m_maxRequestNumber = std::max(m_maxRequestNumber.load(), to); - BLKSYNC_LOG(INFO) << LOG_BADGE("Download") << LOG_BADGE("Request") - << LOG_DESC("Request blocks") << LOG_KV("from", from) - << LOG_KV("to", to) - << LOG_KV("interval", blockRequest->blockInterval()) - << LOG_KV("curNum", m_config->blockNumber()) - << LOG_KV("peerArchived", _p->archivedBlockNumber()) - << LOG_KV("peer", _p->nodeId()->shortHex()) - << LOG_KV("maxRequestNumber", m_maxRequestNumber) - << LOG_KV("node", m_config->nodeID()->shortHex()); + BLKSYNC_LOG(INFO) << LOG_BADGE("Download") << LOG_BADGE("Request") + << LOG_DESC("Request blocks") << LOG_KV("from", from) + << LOG_KV("to", to) + << LOG_KV("interval", blockRequest->blockInterval()) + << LOG_KV("curNum", m_config->blockNumber()) + << LOG_KV("peerArchived", _p->archivedBlockNumber()) + << LOG_KV("peer", _p->nodeId()->shortHex()) + << LOG_KV("maxRequestNumber", m_maxRequestNumber) + << LOG_KV("node", m_config->nodeID()->shortHex()); - ++shard; // shard move - return shard < shardNumber; - }); + ++shard; // shard move + return shard < shardNumber; + }); if (!findPeer) { BlockNumber from = _from + shard * blockSizePerShard; @@ -771,13 +793,13 @@ void BlockSync::maintainBlockRequest() auto archivedBlockNumber = m_config->archiveBlockNumber(); auto fetchSet = reqQueue->mergeAndPop(); - for (const auto& number : fetchSet) + for (const auto& req : fetchSet) { - if (std::cmp_less(number, archivedBlockNumber)) + if (std::cmp_less(req.number, archivedBlockNumber)) { continue; } - fetchAndSendBlock(_p->nodeId(), number); + fetchAndSendBlock(_p->nodeId(), req.number, req.dataFlag); } BLKSYNC_LOG(DEBUG) << LOG_BADGE("Download Request: response blocks") << LOG_KV("size", fetchSet.size()) @@ -827,13 +849,14 @@ void BlockSync::maintainBlockRequest() }); } -void BlockSync::fetchAndSendBlock(PublicPtr const& _peer, BlockNumber _number) +void BlockSync::fetchAndSendBlock( + PublicPtr const& _peer, BlockNumber _number, int32_t _blockDataFlag = HEADER | TRANSACTIONS) { // only fetch blockHeader and transactions - auto blockFlag = HEADER | TRANSACTIONS; auto self = weak_from_this(); - m_config->ledger()->asyncGetBlockDataByNumber(_number, blockFlag, - [self, _peer = std::move(_peer), _number](auto&& _error, Block::Ptr _block) { + m_config->ledger()->asyncGetBlockDataByNumber(_number, _blockDataFlag, + [self, _peer = std::move(_peer), _number, _blockDataFlag]( + auto&& _error, Block::Ptr _block) { if (_error != nullptr) { BLKSYNC_LOG(WARNING) @@ -864,6 +887,7 @@ void BlockSync::fetchAndSendBlock(PublicPtr const& _peer, BlockNumber _number) << LOG_KV("toPeer", _peer->shortHex()) << LOG_KV("hash", blockHeader->hash().abridged()) << LOG_KV("signatureSize", signature.size()) + << LOG_KV("blockFlag", _blockDataFlag) << LOG_KV("transactionsSize", _block->transactionsSize()); } catch (std::exception const& e) @@ -1055,32 +1079,62 @@ std::vector BlockSync::getPeerStatus() return statuses; } -void BlockSync::syncArchivedBlockBody() +void BlockSync::syncArchivedBlockBody(bcos::protocol::BlockNumber archivedBlockNumber) { - BlockNumber topBlockNumber = 0; + size_t millseconds = std::chrono::duration_cast( + std::chrono::system_clock::now() - m_lastArchivedRequestTime) + .count(); + if (millseconds < m_config->downloadTimeout()) + { + return; + } + // use 1/4 of the maxDownloadingBlockQueueSize to limit the memory usage + auto maxCount = (BlockNumber)(m_config->maxDownloadingBlockQueueSize() / 4); + BlockNumber topBlockNumber = + std::max(archivedBlockNumber - (BlockNumber)maxCount, (BlockNumber)0); { ReadGuard lock(x_archivedBlockQueue); - topBlockNumber = m_archivedBlockQueue.top()->blockHeader()->number(); + if (!m_archivedBlockQueue.empty()) + { + topBlockNumber = m_archivedBlockQueue.top()->blockHeader()->number(); + if (topBlockNumber >= archivedBlockNumber - 1) + { + return; + } + } + } + if (topBlockNumber == archivedBlockNumber - 1) + { + return; } auto from = topBlockNumber; - auto archivedBlockNumber = m_config->archiveBlockNumber(); - // use 1/4 of the maxDownloadingBlockQueueSize to limit the memory usage - auto maxCount = m_config->maxDownloadingBlockQueueSize() / 4; - if (archivedBlockNumber - from > (BlockNumber)maxCount) + if (archivedBlockNumber - from > maxCount) { from = archivedBlockNumber - (BlockNumber)maxCount; } - requestBlocks(from, archivedBlockNumber - 1); + BLKSYNC_LOG(INFO) << LOG_DESC("BlockSync: syncArchivedBlockBody") + << LOG_KV("archivedBlockNumber", archivedBlockNumber) + << LOG_KV("topBlockNumber", topBlockNumber) << LOG_KV("from", from) + << LOG_KV("to", archivedBlockNumber - 1); + m_lastArchivedRequest = from; + m_lastArchivedRequestTime = std::chrono::system_clock::now(); + requestBlocks(from, archivedBlockNumber - 1, bcos::ledger::FULL_BLOCK); } -void BlockSync::verifyAndCommitArchivedBlock() +void BlockSync::verifyAndCommitArchivedBlock(bcos::protocol::BlockNumber archivedBlockNumber) { BlockNumber topBlockNumber = 0; { ReadGuard lock(x_archivedBlockQueue); - topBlockNumber = m_archivedBlockQueue.top()->blockHeader()->number(); + if (!m_archivedBlockQueue.empty()) + { + topBlockNumber = m_archivedBlockQueue.top()->blockHeader()->number(); + } + else + { + return; + } } - auto archivedBlockNumber = m_config->archiveBlockNumber(); if (topBlockNumber != archivedBlockNumber - 1) { return; @@ -1124,7 +1178,11 @@ void BlockSync::verifyAndCommitArchivedBlock() BLKSYNC_LOG(ERROR) << LOG_DESC("BlockSync verify archived block failed") << LOG_KV("number", topBlockNumber) << LOG_KV("transactionRoot", *toHexString(transactionRoot)) + << LOG_KV( + "localTransactionRoot", *toHexString(localBlockHeader->txsRoot())) << LOG_KV("receiptRoot", *toHexString(receiptRoot)) + << LOG_KV("localReceiptRoot", + *toHexString(localBlockHeader->receiptsRoot())) << LOG_KV("reason", "transactionRoot or receiptRoot not match"); WriteGuard lock(x_archivedBlockQueue); m_archivedBlockQueue.pop(); @@ -1146,5 +1204,18 @@ void BlockSync::verifyAndCommitArchivedBlock() { WriteGuard lock(x_archivedBlockQueue); m_archivedBlockQueue.pop(); + for (auto topNumber = m_archivedBlockQueue.top()->blockHeader()->number(); + topNumber >= topBlockNumber;) + { + m_archivedBlockQueue.pop(); + if (!m_archivedBlockQueue.empty()) + { + topNumber = m_archivedBlockQueue.top()->blockHeader()->number(); + } + else + { + break; + } + } } } diff --git a/bcos-sync/bcos-sync/BlockSync.h b/bcos-sync/bcos-sync/BlockSync.h index 82b8ece831..515fa8ab9c 100644 --- a/bcos-sync/bcos-sync/BlockSync.h +++ b/bcos-sync/bcos-sync/BlockSync.h @@ -19,6 +19,7 @@ * @date 2021-05-24 */ #pragma once +#include "bcos-framework/protocol/ProtocolTypeDef.h" #include "bcos-sync/BlockSyncConfig.h" #include "bcos-sync/state/DownloadingQueue.h" #include "bcos-sync/state/SyncPeerStatus.h" @@ -133,12 +134,13 @@ class BlockSync : public BlockSyncInterface, // update SyncTreeTopology node info virtual void updateTreeTopologyNodeInfo(); - virtual void syncArchivedBlockBody(); - virtual void verifyAndCommitArchivedBlock(); + virtual void syncArchivedBlockBody(bcos::protocol::BlockNumber archivedBlockNumber); + virtual void verifyAndCommitArchivedBlock(bcos::protocol::BlockNumber archivedBlockNumber); - void requestBlocks(bcos::protocol::BlockNumber _from, bcos::protocol::BlockNumber _to); + void requestBlocks( + bcos::protocol::BlockNumber _from, bcos::protocol::BlockNumber _to, int32_t blockDataFlag); void fetchAndSendBlock( - bcos::crypto::PublicPtr const& _peer, bcos::protocol::BlockNumber _number); + bcos::crypto::PublicPtr const& _peer, bcos::protocol::BlockNumber _number, int32_t _blockDataFlag); void printSyncInfo(); protected: @@ -166,7 +168,21 @@ class BlockSync : public BlockSyncInterface, bool m_allowFreeNode = false; mutable SharedMutex x_archivedBlockQueue; - BlockQueue m_archivedBlockQueue; + +private: + // struct BlockLess + // { + // bool operator()(bcos::protocol::Block::Ptr const& _first, + // bcos::protocol::Block::Ptr const& _second) const + // { + // return _first->blockHeader()->number() < _second->blockHeader()->number(); + // } + // }; + // top block is the max number block + // std::priority_queue + std::priority_queue m_archivedBlockQueue; + std::atomic m_lastArchivedRequest = {0}; + std::chrono::system_clock::time_point m_lastArchivedRequestTime = std::chrono::system_clock::now(); SyncTreeTopology::Ptr m_syncTreeTopology{nullptr}; }; diff --git a/bcos-sync/bcos-sync/interfaces/BlockRequestInterface.h b/bcos-sync/bcos-sync/interfaces/BlockRequestInterface.h index 6129ff31ca..0e36a73f4e 100644 --- a/bcos-sync/bcos-sync/interfaces/BlockRequestInterface.h +++ b/bcos-sync/bcos-sync/interfaces/BlockRequestInterface.h @@ -31,6 +31,8 @@ class BlockRequestInterface : virtual public BlockSyncMsgInterface using Ptr = std::shared_ptr; BlockRequestInterface() = default; virtual ~BlockRequestInterface() {} + virtual int32_t blockDataFlag() const = 0; + virtual void setBlockDataFlag(int32_t _flag) = 0; virtual size_t size() const = 0; virtual void setSize(size_t _size) = 0; diff --git a/bcos-sync/bcos-sync/protocol/PB/BlockRequestImpl.h b/bcos-sync/bcos-sync/protocol/PB/BlockRequestImpl.h index 6ca9db8972..07e51106fc 100644 --- a/bcos-sync/bcos-sync/protocol/PB/BlockRequestImpl.h +++ b/bcos-sync/bcos-sync/protocol/PB/BlockRequestImpl.h @@ -41,6 +41,9 @@ class BlockRequestImpl : public BlockRequestInterface, public BlockSyncMsgImpl size_t size() const override { return m_syncMessage->size(); } void setSize(size_t _size) override { m_syncMessage->set_size(_size); } + int32_t blockDataFlag() const override { return m_syncMessage->block_data_flag(); } + void setBlockDataFlag(int32_t _flag) override { m_syncMessage->set_block_data_flag(_flag); } + protected: explicit BlockRequestImpl(std::shared_ptr _syncMessage) { diff --git a/bcos-sync/bcos-sync/protocol/proto/BlockSync.proto b/bcos-sync/bcos-sync/protocol/proto/BlockSync.proto index 31628d23b2..65a7b0468b 100644 --- a/bcos-sync/bcos-sync/protocol/proto/BlockSync.proto +++ b/bcos-sync/bcos-sync/protocol/proto/BlockSync.proto @@ -22,4 +22,5 @@ message BlockSyncMessage //for block sync optimize int64 block_interval = 10; + int32 block_data_flag = 11; } \ No newline at end of file diff --git a/bcos-sync/bcos-sync/state/DownloadRequestQueue.cpp b/bcos-sync/bcos-sync/state/DownloadRequestQueue.cpp index 170feaae04..00875c89c1 100644 --- a/bcos-sync/bcos-sync/state/DownloadRequestQueue.cpp +++ b/bcos-sync/bcos-sync/state/DownloadRequestQueue.cpp @@ -19,13 +19,15 @@ * @date 2021-05-24 */ #include "DownloadRequestQueue.h" +#include "bcos-framework/ledger/LedgerTypeDef.h" #include "bcos-sync/utilities/Common.h" using namespace bcos; using namespace bcos::sync; using namespace bcos::protocol; -void DownloadRequestQueue::push(BlockNumber _fromNumber, size_t _size, size_t _interval) +void DownloadRequestQueue::push( + BlockNumber _fromNumber, size_t _size, int32_t _dataFlag, size_t _interval) { UpgradableGuard lock(x_reqQueue); // Note: the requester must have retry logic @@ -39,7 +41,7 @@ void DownloadRequestQueue::push(BlockNumber _fromNumber, size_t _size, size_t _i return; } UpgradeGuard ulock(lock); - m_reqQueue.push(std::make_shared(_fromNumber, _size, _interval)); + m_reqQueue.push(std::make_shared(_fromNumber, _size, _dataFlag, _interval)); BLKSYNC_LOG(DEBUG) << LOG_BADGE("Download") << LOG_BADGE("Request") << LOG_DESC("Push request in reqQueue req") << LOG_KV("from", _fromNumber) << LOG_KV("size", _size) << LOG_KV("interval", _interval) @@ -61,6 +63,7 @@ DownloadRequest::UniquePtr DownloadRequestQueue::topAndPop() size_t size = m_reqQueue.top()->size(); size_t interval = m_reqQueue.top()->interval(); BlockNumber toNumber = m_reqQueue.top()->toNumber(); + int32_t dataFlag = m_reqQueue.top()->blockDataFlag(); UpgradeGuard ulock(lock); while (!m_reqQueue.empty() && std::cmp_greater_equal(toNumber + interval, m_reqQueue.top()->fromNumber()) && @@ -118,20 +121,21 @@ DownloadRequest::UniquePtr DownloadRequestQueue::topAndPop() { BLKSYNC_LOG(TRACE) << LOG_BADGE("Download") << LOG_BADGE("Request") << LOG_DESC("Pop reqQueue top req") << LOG_KV("from", fromNumber) - << LOG_KV("size", size) << LOG_KV("interval", interval); + << LOG_KV("size", size) << LOG_KV("interval", interval) + << LOG_KV("dataFlag", dataFlag); } - return std::make_unique(fromNumber, size, interval); + return std::make_unique(fromNumber, size, dataFlag, interval); } -std::set> DownloadRequestQueue::mergeAndPop() +std::set DownloadRequestQueue::mergeAndPop() { UpgradableGuard lock(x_reqQueue); if (m_reqQueue.empty()) { - return {{}, 0}; + return {}; } UpgradeGuard ulock(lock); - std::set> fetchSet{}; + std::set fetchSet{}; while (!m_reqQueue.empty()) { @@ -139,7 +143,9 @@ std::set> DownloadRequestQueue::mergeAndPop() auto interval = (topReq->interval() == 0 ? 1 : topReq->interval()); for (BlockNumber i = topReq->fromNumber(); i <= topReq->toNumber(); i += interval) { - fetchSet.insert(i); + fetchSet.insert({i, topReq->blockDataFlag() > 0 ? + topReq->blockDataFlag() : + (bcos::ledger::HEADER | bcos::ledger::TRANSACTIONS)}); } m_reqQueue.pop(); } diff --git a/bcos-sync/bcos-sync/state/DownloadRequestQueue.h b/bcos-sync/bcos-sync/state/DownloadRequestQueue.h index 383fdda307..11f189533e 100644 --- a/bcos-sync/bcos-sync/state/DownloadRequestQueue.h +++ b/bcos-sync/bcos-sync/state/DownloadRequestQueue.h @@ -19,6 +19,7 @@ * @date 2021-05-24 */ #pragma once +#include "bcos-framework/ledger/LedgerTypeDef.h" #include "bcos-sync/BlockSyncConfig.h" #include #include @@ -30,8 +31,10 @@ class DownloadRequest public: using Ptr = std::shared_ptr; using UniquePtr = std::unique_ptr; - DownloadRequest(bcos::protocol::BlockNumber _fromNumber, size_t _size, size_t _interval = 0) - : m_fromNumber(_fromNumber), m_size(_size), m_interval(_interval) + DownloadRequest(bcos::protocol::BlockNumber _fromNumber, size_t _size, + int32_t _dataFlag = (bcos::ledger::HEADER | bcos::ledger::TRANSACTIONS), + size_t _interval = 0) + : m_fromNumber(_fromNumber), m_size(_size), m_dataFlag(_dataFlag), m_interval(_interval) {} bcos::protocol::BlockNumber fromNumber() const noexcept { return m_fromNumber; } @@ -42,10 +45,12 @@ class DownloadRequest return (m_interval == 0) ? (protocol::BlockNumber)m_fromNumber + m_size - 1 : (protocol::BlockNumber)m_fromNumber + (m_size - 1) * m_interval; } + int32_t blockDataFlag() const noexcept { return m_dataFlag; } private: bcos::protocol::BlockNumber m_fromNumber; size_t m_size; + int32_t m_dataFlag = 0; size_t m_interval = 0; }; @@ -70,8 +75,15 @@ class DownloadRequestQueue {} virtual ~DownloadRequestQueue() = default; - [[maybe_unused]] virtual std::set> mergeAndPop(); - virtual void push(bcos::protocol::BlockNumber _fromNumber, size_t _size, size_t interval = 0); + struct BlockRequest + { + protocol::BlockNumber number; + int32_t dataFlag; + bool operator<(const BlockRequest& _other) const { return number < _other.number; } + }; + [[maybe_unused]] virtual std::set mergeAndPop(); + virtual void push(bcos::protocol::BlockNumber _fromNumber, size_t _size, + int _dataFlag = (bcos::ledger::HEADER | bcos::ledger::TRANSACTIONS), size_t interval = 0); virtual DownloadRequest::UniquePtr topAndPop(); // Must call use disablePush() before virtual bool empty(); diff --git a/tools/archive-tool/archiveTool.cpp b/tools/archive-tool/archiveTool.cpp index 3c54274d64..4e2df94b65 100644 --- a/tools/archive-tool/archiveTool.cpp +++ b/tools/archive-tool/archiveTool.cpp @@ -174,19 +174,20 @@ createBackendStorage(std::shared_ptr nodeConfig, const s StorageInitializer::createRocksDB( stateDBPath, option, nodeConfig->enableStatistics(), nodeConfig->keyPageSize()), dataEncryption); + blockStorage = storage; if (nodeConfig->enableSeparateBlockAndState()) { auto blockDB = StorageInitializer::createRocksDB( nodeConfig->blockDBPath(), option, nodeConfig->enableStatistics()); blockStorage = StorageInitializer::build(std::move(blockDB), dataEncryption); } - blockStorage = storage; } else { auto* rocksdb = createSecondaryRocksDB(stateDBPath, secondaryPath); storage = std::make_shared( std::unique_ptr(rocksdb), dataEncryption); + blockStorage = storage; if (nodeConfig->enableSeparateBlockAndState()) { auto* blockRocksDB = @@ -194,7 +195,6 @@ createBackendStorage(std::shared_ptr nodeConfig, const s blockStorage = std::make_shared( std::unique_ptr(blockRocksDB), dataEncryption); } - blockStorage = storage; } } else if (boost::iequals(nodeConfig->storageType(), "TiKV"))