Skip to content

Commit

Permalink
add storage.sync_archived_blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
bxq2011hust committed Jul 29, 2024
1 parent cee39ba commit 808934a
Show file tree
Hide file tree
Showing 15 changed files with 206 additions and 10 deletions.
2 changes: 2 additions & 0 deletions bcos-framework/bcos-framework/ledger/LedgerInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ class LedgerInterface
virtual void asyncGetCurrentStateByKey(std::string_view const& _key,
std::function<void(Error::Ptr&&, std::optional<bcos::storage::Entry>&&)> _callback) = 0;

virtual Error::Ptr setCurrentStateByKey(
std::string_view const& _key, bcos::storage::Entry entry) = 0;
/**
* @brief async get system config by table key
* @param _key the key of row, you can checkout all key in LedgerTypeDef.h
Expand Down
5 changes: 5 additions & 0 deletions bcos-framework/bcos-framework/testutils/faker/FakeLedger.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,11 @@ class FakeLedger : public LedgerInterface, public std::enable_shared_from_this<F
{
_callback(nullptr, {});
}
Error::Ptr setCurrentStateByKey(
std::string_view const& _key, bcos::storage::Entry entry) override
{
return nullptr;
}

void asyncGetSystemConfigByKey(std::string_view const& _key,
std::function<void(Error::Ptr, std::string, BlockNumber)> _onGetConfig) override
Expand Down
31 changes: 31 additions & 0 deletions bcos-ledger/src/libledger/Ledger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,29 @@ void Ledger::asyncGetBlockDataByNumber(bcos::protocol::BlockNumber _blockNumber,
return;
}

if ((_blockFlag & TRANSACTIONS) != 0 || (_blockFlag & RECEIPTS) != 0)
{
protocol::BlockNumber archivedBlockNumber = 0;
std::promise<std::pair<Error::Ptr, std::optional<bcos::storage::Entry>>> statePromise;
asyncGetCurrentStateByKey(ledger::SYS_KEY_ARCHIVED_NUMBER,
[&statePromise](Error::Ptr&& err, std::optional<bcos::storage::Entry>&& entry) {
statePromise.set_value(std::make_pair(std::move(err), std::move(entry)));
});
auto archiveRet = statePromise.get_future().get();
if (!archiveRet.first && archiveRet.second.has_value())
{
archivedBlockNumber = boost::lexical_cast<int64_t>(archiveRet.second->get());
}
if (_blockNumber < archivedBlockNumber)
{
LEDGER_LOG(INFO) << "GetBlockDataByNumber, block number is larger than archived number";
_onGetBlock(BCOS_ERROR_PTR(LedgerError::ErrorArgument,
"Wrong argument, this block's transactions and receipts are archived"),
nullptr);
return;
}
}

std::list<std::function<void()>> fetchers;
auto block = m_blockFactory->createBlock();
auto total = std::make_shared<size_t>(0);
Expand Down Expand Up @@ -2335,3 +2358,11 @@ void Ledger::asyncGetCurrentStateByKey(std::string_view const& _key,
});
});
}

Error::Ptr Ledger::setCurrentStateByKey(std::string_view const& _key, bcos::storage::Entry entry)
{
std::promise<Error::UniquePtr> setPromise;
m_stateStorage->asyncSetRow(ledger::SYS_CURRENT_STATE, _key, std::move(entry),
[&](Error::UniquePtr err) { setPromise.set_value(std::move(err)); });
return setPromise.get_future().get();
}
2 changes: 2 additions & 0 deletions bcos-ledger/src/libledger/Ledger.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ class Ledger : public LedgerInterface
void asyncGetCurrentStateByKey(std::string_view const& _key,
std::function<void(Error::Ptr&&, std::optional<bcos::storage::Entry>&&)> _callback)
override;
Error::Ptr setCurrentStateByKey(
std::string_view const& _key, bcos::storage::Entry entry) override;

task::Task<std::optional<storage::Entry>> getStorageAt(std::string_view _address,
std::string_view _key, protocol::BlockNumber _blockNumber) override;
Expand Down
129 changes: 129 additions & 0 deletions bcos-sync/bcos-sync/BlockSync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
* @date 2021-05-24
*/
#include "bcos-sync/BlockSync.h"
#include "bcos-framework/ledger/LedgerTypeDef.h"
#include "bcos-framework/protocol/CommonError.h"
#include "bcos-framework/protocol/ProtocolTypeDef.h"
#include <bcos-tool/LedgerConfigFetcher.h>
#include <json/json.h>
#include <boost/bind/bind.hpp>
#include <string>

using namespace bcos;
using namespace bcos::sync;
Expand Down Expand Up @@ -226,6 +229,12 @@ void BlockSync::executeWorker()

// send block-download-request to peers if this node is behind others
tryToRequestBlocks();

if (m_config->syncArchivedBlockBody())
{
syncArchivedBlockBody();
verifyAndCommitArchivedBlock();
}
}
catch (std::exception const& e)
{
Expand Down Expand Up @@ -455,7 +464,33 @@ void BlockSync::onPeerStatus(NodeIDPtr _nodeID, BlockSyncMsgInterface::Ptr _sync
void BlockSync::onPeerBlocks(NodeIDPtr _nodeID, BlockSyncMsgInterface::Ptr _syncMsg)
{
auto number = _syncMsg->number();
auto archivedNumber = m_config->archiveBlockNumber();
auto blockMsg = m_config->msgFactory()->createBlocksMsg(std::move(_syncMsg));
if (number < archivedNumber)
{
size_t downloadedBlockCount = 0;
{
ReadGuard lock(x_archivedBlockQueue);
downloadedBlockCount = m_archivedBlockQueue.size();
}
if (downloadedBlockCount >= m_config->maxDownloadingBlockQueueSize())
{
BLKSYNC_LOG(WARNING) << LOG_BADGE("Download") << LOG_BADGE("BlockSync")
<< LOG_DESC("archivedBlockQueue is full")
<< LOG_KV("queueSize", downloadedBlockCount);
return;
}
auto block = m_config->blockFactory()->createBlock(blockMsg->blockData(0), true, true);
BLKSYNC_LOG(DEBUG) << LOG_BADGE("Download") << BLOCK_NUMBER(number)
<< LOG_BADGE("BlockSync")
<< LOG_DESC("Receive peer block packet(archived)")
<< LOG_KV("peer", _nodeID->shortHex());
{
WriteGuard lock(x_archivedBlockQueue);
m_archivedBlockQueue.push(block);
}
return;
}
BLKSYNC_LOG(DEBUG) << LOG_BADGE("Download") << BLOCK_NUMBER(number) << LOG_BADGE("BlockSync")
<< LOG_DESC("Receive peer block packet")
<< LOG_KV("peer", _nodeID->shortHex());
Expand Down Expand Up @@ -1019,3 +1054,97 @@ std::vector<PeerStatus::Ptr> BlockSync::getPeerStatus()
});
return statuses;
}

void BlockSync::syncArchivedBlockBody()
{
BlockNumber topBlockNumber = 0;
{
ReadGuard lock(x_archivedBlockQueue);
topBlockNumber = m_archivedBlockQueue.top()->blockHeader()->number();
}
auto from = topBlockNumber;
auto archivedBlockNumber = m_config->archiveBlockNumber();
// use 1/4 of the maxDownloadingBlockQueueSize to limit the memory usage
auto maxCount = m_config->maxDownloadingBlockQueueSize() / 4;
if (archivedBlockNumber - from > (BlockNumber)maxCount)
{
from = archivedBlockNumber - (BlockNumber)maxCount;
}
requestBlocks(from, archivedBlockNumber - 1);
}

void BlockSync::verifyAndCommitArchivedBlock()
{
BlockNumber topBlockNumber = 0;
{
ReadGuard lock(x_archivedBlockQueue);
topBlockNumber = m_archivedBlockQueue.top()->blockHeader()->number();
}
auto archivedBlockNumber = m_config->archiveBlockNumber();
if (topBlockNumber != archivedBlockNumber - 1)
{
return;
}
// verify the tx root and receipt root
bcos::protocol::Block::Ptr block = nullptr;
{
WriteGuard lock(x_archivedBlockQueue);
block = m_archivedBlockQueue.top();
// m_archivedBlockQueue.pop();
}

std::promise<protocol::BlockHeader::Ptr> blockHeaderFuture;
m_config->ledger()->asyncGetBlockDataByNumber(topBlockNumber, bcos::ledger::HEADER,
[&blockHeaderFuture](const Error::Ptr& error, const Block::Ptr& block) {
if (error)
{
blockHeaderFuture.set_value(nullptr);
}
else
{
blockHeaderFuture.set_value(block->blockHeader());
}
});
auto localBlockHeader = blockHeaderFuture.get_future().get();
if (!localBlockHeader)
{
BLKSYNC_LOG(ERROR) << LOG_DESC("BlockSync get local block header failed")
<< LOG_KV("number", topBlockNumber)
<< LOG_KV("reason", "get local block header failed");
return;
}

auto transactionRoot =
block->calculateTransactionRoot(*m_config->blockFactory()->cryptoSuite()->hashImpl());
auto receiptRoot =
block->calculateReceiptRoot(*m_config->blockFactory()->cryptoSuite()->hashImpl());
if (transactionRoot != localBlockHeader->txsRoot() ||
receiptRoot != localBlockHeader->receiptsRoot())
{
BLKSYNC_LOG(ERROR) << LOG_DESC("BlockSync verify archived block failed")
<< LOG_KV("number", topBlockNumber)
<< LOG_KV("transactionRoot", *toHexString(transactionRoot))
<< LOG_KV("receiptRoot", *toHexString(receiptRoot))
<< LOG_KV("reason", "transactionRoot or receiptRoot not match");
WriteGuard lock(x_archivedBlockQueue);
m_archivedBlockQueue.pop();
return;
}
auto err = m_config->ledger()->storeTransactionsAndReceipts(nullptr, block);
if (err)
{
BLKSYNC_LOG(ERROR) << LOG_DESC("BlockSync commit archived block failed")
<< LOG_KV("number", topBlockNumber)
<< LOG_KV("reason", "storeTransactionsAndReceipts failed");
return;
}
BLKSYNC_LOG(INFO) << LOG_DESC("BlockSync commit archived block success")
<< LOG_KV("number", topBlockNumber);
// update the archived number
m_config->ledger()->setCurrentStateByKey(bcos::ledger::SYS_KEY_ARCHIVED_NUMBER,
bcos::storage::Entry(std::to_string(topBlockNumber)));
{
WriteGuard lock(x_archivedBlockQueue);
m_archivedBlockQueue.pop();
}
}
8 changes: 7 additions & 1 deletion bcos-sync/bcos-sync/BlockSync.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,15 @@ class BlockSync : public BlockSyncInterface,
// update SyncTreeTopology node info
virtual void updateTreeTopologyNodeInfo();

protected:
virtual void syncArchivedBlockBody();
virtual void verifyAndCommitArchivedBlock();

void requestBlocks(bcos::protocol::BlockNumber _from, bcos::protocol::BlockNumber _to);
void fetchAndSendBlock(
bcos::crypto::PublicPtr const& _peer, bcos::protocol::BlockNumber _number);
void printSyncInfo();

private:
BlockSyncConfig::Ptr m_config;
SyncPeerStatus::Ptr m_syncStatus;
DownloadingQueue::Ptr m_downloadingQueue;
Expand All @@ -162,6 +165,9 @@ class BlockSync : public BlockSyncInterface,
std::atomic_bool m_masterNode = {false};
bool m_allowFreeNode = false;

mutable SharedMutex x_archivedBlockQueue;
BlockQueue m_archivedBlockQueue;

SyncTreeTopology::Ptr m_syncTreeTopology{nullptr};
};
} // namespace bcos::sync
9 changes: 7 additions & 2 deletions bcos-sync/bcos-sync/BlockSyncConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ class BlockSyncConfig : public SyncConfig
bcos::scheduler::SchedulerInterface::Ptr _scheduler,
bcos::consensus::ConsensusInterface::Ptr _consensus, BlockSyncMsgFactory::Ptr _msgFactory,
bcos::tool::NodeTimeMaintenance::Ptr _nodeTimeMaintenance,
bool _enableSendBlockStatusByTree = false, std::uint32_t _syncTreeWidth = 3)
bool _enableSendBlockStatusByTree = false, std::uint32_t _syncTreeWidth = 3,
bool _syncArchivedBlockBody = false)
: SyncConfig(std::move(_nodeId)),
m_ledger(std::move(_ledger)),
m_txpool(std::move(_txpool)),
Expand All @@ -57,7 +58,8 @@ class BlockSyncConfig : public SyncConfig
m_msgFactory(std::move(_msgFactory)),
m_nodeTimeMaintenance(std::move(_nodeTimeMaintenance)),
m_enableSendBlockStatusByTree(_enableSendBlockStatusByTree),
m_syncTreeWidth(_syncTreeWidth)
m_syncTreeWidth(_syncTreeWidth),
m_syncArchivedBlockBody(_syncArchivedBlockBody)
{}
~BlockSyncConfig() override = default;

Expand Down Expand Up @@ -140,6 +142,7 @@ class BlockSyncConfig : public SyncConfig
}

bool masterNode() const { return m_masterNode; }
bool syncArchivedBlockBody() const { return m_syncArchivedBlockBody; }

bcos::protocol::BlockNumber archiveBlockNumber() const;

Expand Down Expand Up @@ -205,5 +208,7 @@ class BlockSyncConfig : public SyncConfig

bool m_enableSendBlockStatusByTree = false;
std::uint32_t m_syncTreeWidth;

bool m_syncArchivedBlockBody = false;
};
} // namespace bcos::sync
8 changes: 5 additions & 3 deletions bcos-sync/bcos-sync/BlockSyncFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ BlockSyncFactory::BlockSyncFactory(bcos::crypto::PublicPtr _nodeId,
bcos::scheduler::SchedulerInterface::Ptr _scheduler,
bcos::consensus::ConsensusInterface::Ptr _consensus,
bcos::tool::NodeTimeMaintenance::Ptr _nodeTimeMaintenance, bool _enableSendBlockStatusByTree,
std::uint32_t _syncTreeWidth)
std::uint32_t _syncTreeWidth, bool _syncArchivedBlockBody)
: m_nodeId(std::move(_nodeId)),
m_blockFactory(std::move(_blockFactory)),
m_txResultFactory(std::move(_txResultFactory)),
Expand All @@ -43,14 +43,16 @@ BlockSyncFactory::BlockSyncFactory(bcos::crypto::PublicPtr _nodeId,
m_consensus(std::move(_consensus)),
m_nodeTimeMaintenance(std::move(_nodeTimeMaintenance)),
m_enableSendBlockStatusByTree(_enableSendBlockStatusByTree),
m_syncTreeWidth(_syncTreeWidth)
m_syncTreeWidth(_syncTreeWidth),
m_syncArchivedBlockBody(_syncArchivedBlockBody)
{}

BlockSync::Ptr BlockSyncFactory::createBlockSync()
{
auto msgFactory = std::make_shared<BlockSyncMsgFactoryImpl>();
auto syncConfig = std::make_shared<BlockSyncConfig>(m_nodeId, m_ledger, m_txpool,
m_blockFactory, m_txResultFactory, m_frontService, m_scheduler, m_consensus, msgFactory,
m_nodeTimeMaintenance, m_enableSendBlockStatusByTree, m_syncTreeWidth);
m_nodeTimeMaintenance, m_enableSendBlockStatusByTree, m_syncTreeWidth,
m_syncArchivedBlockBody);
return std::make_shared<BlockSync>(syncConfig);
}
6 changes: 4 additions & 2 deletions bcos-sync/bcos-sync/BlockSyncFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ class BlockSyncFactory
bcos::scheduler::SchedulerInterface::Ptr _scheduler,
bcos::consensus::ConsensusInterface::Ptr _consensus,
bcos::tool::NodeTimeMaintenance::Ptr _nodeTimeMaintenance,
bool enableSendBlockStatusByTree = false, std::uint32_t syncTreeWidth = 3);
bool enableSendBlockStatusByTree = false, std::uint32_t syncTreeWidth = 3,
bool _syncArchivedBlockBody = false);
virtual ~BlockSyncFactory() = default;

virtual BlockSync::Ptr createBlockSync();

protected:
private:
bcos::crypto::PublicPtr m_nodeId;
bcos::protocol::BlockFactory::Ptr m_blockFactory;
bcos::protocol::TransactionSubmitResultFactory::Ptr m_txResultFactory;
Expand All @@ -54,5 +55,6 @@ class BlockSyncFactory
bcos::tool::NodeTimeMaintenance::Ptr m_nodeTimeMaintenance;
bool m_enableSendBlockStatusByTree;
std::int64_t m_syncTreeWidth;
bool m_syncArchivedBlockBody = false;
};
} // namespace bcos::sync
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ class LedgerServiceClient : public bcos::ledger::LedgerInterface
BCOS_LOG(ERROR) << LOG_DESC("unimplemented method asyncGetCurrentStateByKey");
}

bcos::Error::Ptr setCurrentStateByKey(
std::string_view const& _key, bcos::storage::Entry entry) override
{
BCOS_LOG(ERROR) << LOG_DESC("unimplemented method setCurrentStateByKey");
return nullptr;
}

void asyncGetSystemConfigByKey(std::string_view const& _key,
std::function<void(bcos::Error::Ptr, std::string, bcos::protocol::BlockNumber)>
_onGetConfig) override;
Expand Down
1 change: 1 addition & 0 deletions bcos-tool/bcos-tool/NodeConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,7 @@ void NodeConfig::loadStorageConfig(boost::property_tree::ptree const& _pt)
m_pdCertPath = _pt.get<std::string>("storage.pd_ssl_cert_path", "");
m_pdKeyPath = _pt.get<std::string>("storage.pd_ssl_key_path", "");
m_enableArchive = _pt.get<bool>("storage.enable_archive", false);
m_syncArchivedBlocks = _pt.get<bool>("storage.sync_archived_blocks", false);
m_enableSeparateBlockAndState = _pt.get<bool>("storage.enable_separate_block_state", false);
if (boost::iequals(m_storageType, bcos::storage::TiKV))
{
Expand Down
2 changes: 2 additions & 0 deletions bcos-tool/bcos-tool/NodeConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ class NodeConfig
std::string const& storageDBName() const { return m_storageDBName; }
std::string const& stateDBName() const { return m_stateDBName; }
bool enableArchive() const { return m_enableArchive; }
bool syncArchivedBlocks() const { return m_syncArchivedBlocks; }
bool enableSeparateBlockAndState() const { return m_enableSeparateBlockAndState; }
std::string const& archiveListenIP() const { return m_archiveListenIP; }
uint16_t archiveListenPort() const { return m_archiveListenPort; }
Expand Down Expand Up @@ -401,6 +402,7 @@ class NodeConfig
bool m_enableRocksDBBlob = false;

bool m_enableArchive = false;
bool m_syncArchivedBlocks = false;
bool m_enableSeparateBlockAndState = false;
std::string m_stateDBPath;
std::string m_blockDBPath;
Expand Down
2 changes: 2 additions & 0 deletions bcos-utilities/bcos-utilities/Worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ void Worker::terminate()
if (m_workerThread)
{
if (m_workerState.exchange(WorkerState::Killing) == WorkerState::Killing)
{
return; // Somebody else is doing this
}
l.unlock();
m_workerStateNotifier.notify_all();
m_workerThread->join();
Expand Down
Loading

0 comments on commit 808934a

Please sign in to comment.