Skip to content

Commit

Permalink
fix archiveTool and archive block sync
Browse files Browse the repository at this point in the history
  • Loading branch information
bxq2011hust committed Aug 12, 2024
1 parent 852ffa3 commit e7601c6
Show file tree
Hide file tree
Showing 10 changed files with 214 additions and 99 deletions.
4 changes: 4 additions & 0 deletions bcos-framework/bcos-framework/protocol/Block.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ class Block
}) |
RANGES::to<NonceList>());
}
bool operator<(const Block& block) const
{
return blockHeaderConst()->number() < block.blockHeaderConst()->number();
}
};
using Blocks = std::vector<Block::Ptr>;
using BlocksPtr = std::shared_ptr<Blocks>;
Expand Down
2 changes: 1 addition & 1 deletion bcos-ledger/src/libledger/Ledger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1533,7 +1533,7 @@ void Ledger::asyncBatchGetReceipts(std::shared_ptr<std::vector<std::string>> has
{
if (!entry.has_value())
{
LEDGER_LOG(DEBUG) << "Get receipt with empty entry: " << (*hashes)[i];
LEDGER_LOG(DEBUG) << "Get receipt with empty entry: " << toHex((*hashes)[i]);
callback(BCOS_ERROR_PTR(
LedgerError::GetStorageError, "Batch get transaction failed"),
std::vector<protocol::TransactionReceipt::Ptr>());
Expand Down
229 changes: 150 additions & 79 deletions bcos-sync/bcos-sync/BlockSync.cpp

Large diffs are not rendered by default.

26 changes: 21 additions & 5 deletions bcos-sync/bcos-sync/BlockSync.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* @date 2021-05-24
*/
#pragma once
#include "bcos-framework/protocol/ProtocolTypeDef.h"
#include "bcos-sync/BlockSyncConfig.h"
#include "bcos-sync/state/DownloadingQueue.h"
#include "bcos-sync/state/SyncPeerStatus.h"
Expand Down Expand Up @@ -133,12 +134,13 @@ class BlockSync : public BlockSyncInterface,
// update SyncTreeTopology node info
virtual void updateTreeTopologyNodeInfo();

virtual void syncArchivedBlockBody();
virtual void verifyAndCommitArchivedBlock();
virtual void syncArchivedBlockBody(bcos::protocol::BlockNumber archivedBlockNumber);
virtual void verifyAndCommitArchivedBlock(bcos::protocol::BlockNumber archivedBlockNumber);

void requestBlocks(bcos::protocol::BlockNumber _from, bcos::protocol::BlockNumber _to);
void requestBlocks(
bcos::protocol::BlockNumber _from, bcos::protocol::BlockNumber _to, int32_t blockDataFlag);
void fetchAndSendBlock(
bcos::crypto::PublicPtr const& _peer, bcos::protocol::BlockNumber _number);
bcos::crypto::PublicPtr const& _peer, bcos::protocol::BlockNumber _number, int32_t _blockDataFlag);
void printSyncInfo();

protected:
Expand Down Expand Up @@ -166,7 +168,21 @@ class BlockSync : public BlockSyncInterface,
bool m_allowFreeNode = false;

mutable SharedMutex x_archivedBlockQueue;
BlockQueue m_archivedBlockQueue;

private:
// struct BlockLess
// {
// bool operator()(bcos::protocol::Block::Ptr const& _first,
// bcos::protocol::Block::Ptr const& _second) const
// {
// return _first->blockHeader()->number() < _second->blockHeader()->number();
// }
// };
// top block is the max number block
// std::priority_queue<bcos::protocol::Block::Ptr, bcos::protocol::Blocks, BlockLess>
std::priority_queue<bcos::protocol::Block::Ptr, bcos::protocol::Blocks> m_archivedBlockQueue;
std::atomic<bcos::protocol::BlockNumber> m_lastArchivedRequest = {0};
std::chrono::system_clock::time_point m_lastArchivedRequestTime = std::chrono::system_clock::now();

SyncTreeTopology::Ptr m_syncTreeTopology{nullptr};
};
Expand Down
2 changes: 2 additions & 0 deletions bcos-sync/bcos-sync/interfaces/BlockRequestInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class BlockRequestInterface : virtual public BlockSyncMsgInterface
using Ptr = std::shared_ptr<BlockRequestInterface>;
BlockRequestInterface() = default;
virtual ~BlockRequestInterface() {}
virtual int32_t blockDataFlag() const = 0;
virtual void setBlockDataFlag(int32_t _flag) = 0;

virtual size_t size() const = 0;
virtual void setSize(size_t _size) = 0;
Expand Down
3 changes: 3 additions & 0 deletions bcos-sync/bcos-sync/protocol/PB/BlockRequestImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ class BlockRequestImpl : public BlockRequestInterface, public BlockSyncMsgImpl
size_t size() const override { return m_syncMessage->size(); }
void setSize(size_t _size) override { m_syncMessage->set_size(_size); }

int32_t blockDataFlag() const override { return m_syncMessage->block_data_flag(); }
void setBlockDataFlag(int32_t _flag) override { m_syncMessage->set_block_data_flag(_flag); }

protected:
explicit BlockRequestImpl(std::shared_ptr<BlockSyncMessage> _syncMessage)
{
Expand Down
1 change: 1 addition & 0 deletions bcos-sync/bcos-sync/protocol/proto/BlockSync.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ message BlockSyncMessage

//for block sync optimize
int64 block_interval = 10;
int32 block_data_flag = 11;
}
22 changes: 14 additions & 8 deletions bcos-sync/bcos-sync/state/DownloadRequestQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
* @date 2021-05-24
*/
#include "DownloadRequestQueue.h"
#include "bcos-framework/ledger/LedgerTypeDef.h"
#include "bcos-sync/utilities/Common.h"

using namespace bcos;
using namespace bcos::sync;
using namespace bcos::protocol;

void DownloadRequestQueue::push(BlockNumber _fromNumber, size_t _size, size_t _interval)
void DownloadRequestQueue::push(
BlockNumber _fromNumber, size_t _size, int32_t _dataFlag, size_t _interval)
{
UpgradableGuard lock(x_reqQueue);
// Note: the requester must have retry logic
Expand All @@ -39,7 +41,7 @@ void DownloadRequestQueue::push(BlockNumber _fromNumber, size_t _size, size_t _i
return;
}
UpgradeGuard ulock(lock);
m_reqQueue.push(std::make_shared<DownloadRequest>(_fromNumber, _size, _interval));
m_reqQueue.push(std::make_shared<DownloadRequest>(_fromNumber, _size, _dataFlag, _interval));
BLKSYNC_LOG(DEBUG) << LOG_BADGE("Download") << LOG_BADGE("Request")
<< LOG_DESC("Push request in reqQueue req") << LOG_KV("from", _fromNumber)
<< LOG_KV("size", _size) << LOG_KV("interval", _interval)
Expand All @@ -61,6 +63,7 @@ DownloadRequest::UniquePtr DownloadRequestQueue::topAndPop()
size_t size = m_reqQueue.top()->size();
size_t interval = m_reqQueue.top()->interval();
BlockNumber toNumber = m_reqQueue.top()->toNumber();
int32_t dataFlag = m_reqQueue.top()->blockDataFlag();
UpgradeGuard ulock(lock);
while (!m_reqQueue.empty() &&
std::cmp_greater_equal(toNumber + interval, m_reqQueue.top()->fromNumber()) &&
Expand Down Expand Up @@ -118,28 +121,31 @@ DownloadRequest::UniquePtr DownloadRequestQueue::topAndPop()
{
BLKSYNC_LOG(TRACE) << LOG_BADGE("Download") << LOG_BADGE("Request")
<< LOG_DESC("Pop reqQueue top req") << LOG_KV("from", fromNumber)
<< LOG_KV("size", size) << LOG_KV("interval", interval);
<< LOG_KV("size", size) << LOG_KV("interval", interval)
<< LOG_KV("dataFlag", dataFlag);
}
return std::make_unique<DownloadRequest>(fromNumber, size, interval);
return std::make_unique<DownloadRequest>(fromNumber, size, dataFlag, interval);
}

std::set<protocol::BlockNumber, std::less<>> DownloadRequestQueue::mergeAndPop()
std::set<DownloadRequestQueue::BlockRequest> DownloadRequestQueue::mergeAndPop()
{
UpgradableGuard lock(x_reqQueue);
if (m_reqQueue.empty())
{
return {{}, 0};
return {};
}
UpgradeGuard ulock(lock);
std::set<BlockNumber, std::less<>> fetchSet{};
std::set<DownloadRequestQueue::BlockRequest> fetchSet{};

while (!m_reqQueue.empty())
{
auto topReq = m_reqQueue.top();
auto interval = (topReq->interval() == 0 ? 1 : topReq->interval());
for (BlockNumber i = topReq->fromNumber(); i <= topReq->toNumber(); i += interval)
{
fetchSet.insert(i);
fetchSet.insert({i, topReq->blockDataFlag() > 0 ?
topReq->blockDataFlag() :
(bcos::ledger::HEADER | bcos::ledger::TRANSACTIONS)});
}
m_reqQueue.pop();
}
Expand Down
20 changes: 16 additions & 4 deletions bcos-sync/bcos-sync/state/DownloadRequestQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* @date 2021-05-24
*/
#pragma once
#include "bcos-framework/ledger/LedgerTypeDef.h"
#include "bcos-sync/BlockSyncConfig.h"
#include <queue>
#include <utility>
Expand All @@ -30,8 +31,10 @@ class DownloadRequest
public:
using Ptr = std::shared_ptr<DownloadRequest>;
using UniquePtr = std::unique_ptr<DownloadRequest>;
DownloadRequest(bcos::protocol::BlockNumber _fromNumber, size_t _size, size_t _interval = 0)
: m_fromNumber(_fromNumber), m_size(_size), m_interval(_interval)
DownloadRequest(bcos::protocol::BlockNumber _fromNumber, size_t _size,
int32_t _dataFlag = (bcos::ledger::HEADER | bcos::ledger::TRANSACTIONS),
size_t _interval = 0)
: m_fromNumber(_fromNumber), m_size(_size), m_dataFlag(_dataFlag), m_interval(_interval)
{}

bcos::protocol::BlockNumber fromNumber() const noexcept { return m_fromNumber; }
Expand All @@ -42,10 +45,12 @@ class DownloadRequest
return (m_interval == 0) ? (protocol::BlockNumber)m_fromNumber + m_size - 1 :
(protocol::BlockNumber)m_fromNumber + (m_size - 1) * m_interval;
}
int32_t blockDataFlag() const noexcept { return m_dataFlag; }

private:
bcos::protocol::BlockNumber m_fromNumber;
size_t m_size;
int32_t m_dataFlag = 0;
size_t m_interval = 0;
};

Expand All @@ -70,8 +75,15 @@ class DownloadRequestQueue
{}
virtual ~DownloadRequestQueue() = default;

[[maybe_unused]] virtual std::set<protocol::BlockNumber, std::less<>> mergeAndPop();
virtual void push(bcos::protocol::BlockNumber _fromNumber, size_t _size, size_t interval = 0);
struct BlockRequest
{
protocol::BlockNumber number;
int32_t dataFlag;
bool operator<(const BlockRequest& _other) const { return number < _other.number; }
};
[[maybe_unused]] virtual std::set<BlockRequest> mergeAndPop();
virtual void push(bcos::protocol::BlockNumber _fromNumber, size_t _size,
int _dataFlag = (bcos::ledger::HEADER | bcos::ledger::TRANSACTIONS), size_t interval = 0);
virtual DownloadRequest::UniquePtr topAndPop(); // Must call use disablePush() before
virtual bool empty();

Expand Down
4 changes: 2 additions & 2 deletions tools/archive-tool/archiveTool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,27 +174,27 @@ createBackendStorage(std::shared_ptr<bcos::tool::NodeConfig> nodeConfig, const s
StorageInitializer::createRocksDB(
stateDBPath, option, nodeConfig->enableStatistics(), nodeConfig->keyPageSize()),
dataEncryption);
blockStorage = storage;
if (nodeConfig->enableSeparateBlockAndState())
{
auto blockDB = StorageInitializer::createRocksDB(
nodeConfig->blockDBPath(), option, nodeConfig->enableStatistics());
blockStorage = StorageInitializer::build(std::move(blockDB), dataEncryption);
}
blockStorage = storage;
}
else
{
auto* rocksdb = createSecondaryRocksDB(stateDBPath, secondaryPath);
storage = std::make_shared<RocksDBStorage>(
std::unique_ptr<rocksdb::DB>(rocksdb), dataEncryption);
blockStorage = storage;
if (nodeConfig->enableSeparateBlockAndState())
{
auto* blockRocksDB =
createSecondaryRocksDB(nodeConfig->blockDBPath(), secondaryPath);
blockStorage = std::make_shared<RocksDBStorage>(
std::unique_ptr<rocksdb::DB>(blockRocksDB), dataEncryption);
}
blockStorage = storage;
}
}
else if (boost::iequals(nodeConfig->storageType(), "TiKV"))
Expand Down

0 comments on commit e7601c6

Please sign in to comment.