Skip to content

Commit

Permalink
Wallet synchronization enhancements c.d.
Browse files Browse the repository at this point in the history
  • Loading branch information
aivve committed May 2, 2024
1 parent d6e3ffa commit e82e322
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 18 deletions.
49 changes: 40 additions & 9 deletions src/Transfers/BlockchainSynchronizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ bool BlockchainSynchronizer::setFutureStateIf(State s, std::function<bool(void)>

void BlockchainSynchronizer::actualizeFutureState() {
std::unique_lock<std::mutex> lk(m_stateMutex);
if (m_currentState == State::stopped && m_futureState == State::deleteOldTxs) { // start(), immideately attach observer
if (m_currentState == State::stopped && (m_futureState == State::deleteOldTxs || m_futureState == State::blockchainSync)) { // start(), immideately attach observer
m_node.addObserver(this);
}

Expand Down Expand Up @@ -352,7 +352,15 @@ void BlockchainSynchronizer::start() {
throw std::runtime_error(message);
}

if (!setFutureStateIf(State::deleteOldTxs, [this] { return m_currentState == State::stopped && m_futureState == State::stopped; })) {
State nextState;
if (!wasStarted) {
nextState = State::deleteOldTxs;
wasStarted = true;
} else {
nextState = State::blockchainSync;
}

if (!setFutureStateIf(nextState, [this] { return m_currentState == State::stopped && m_futureState == State::stopped; })) {
auto message = "Failed to start: already started";
m_logger(ERROR, BRIGHT_RED) << message;
throw std::runtime_error(message);
Expand Down Expand Up @@ -499,7 +507,6 @@ void BlockchainSynchronizer::processBlocks(GetBlocksResponse& response) {

CompleteBlock completeBlock;
completeBlock.blockHash = block.blockHash;
interval.blocks.push_back(completeBlock.blockHash);
if (block.hasBlock) {
completeBlock.block = std::move(block.block);
completeBlock.transactions.push_back(createTransactionPrefix(completeBlock.block->baseTransaction));
Expand All @@ -516,6 +523,7 @@ void BlockchainSynchronizer::processBlocks(GetBlocksResponse& response) {
}
}

interval.blocks.push_back(completeBlock.blockHash);
blocks.push_back(std::move(completeBlock));
}

Expand All @@ -534,7 +542,7 @@ void BlockchainSynchronizer::processBlocks(GetBlocksResponse& response) {
break;

case UpdateConsumersResult::nothingChanged:
if (m_node.getLastKnownBlockHeight() != m_node.getLastLocalBlockHeight()) {
if (m_node.getKnownBlockCount() != m_node.getLocalBlockCount()) {
m_logger(DEBUGGING) << "Blockchain updated, resume blockchain synchronization";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
} else {
Expand Down Expand Up @@ -564,8 +572,12 @@ void BlockchainSynchronizer::processBlocks(GetBlocksResponse& response) {

/// \pre m_consumersMutex is locked
BlockchainSynchronizer::UpdateConsumersResult BlockchainSynchronizer::updateConsumers(const BlockchainInterval& interval, const std::vector<CompleteBlock>& blocks) {
assert(interval.blocks.size() == blocks.size());

bool smthChanged = false;
bool hasErrors = false;

uint32_t lastBlockIndex = std::numeric_limits<uint32_t>::max();
for (auto& kv : m_consumers) {
auto result = kv.second->checkInterval(interval);

Expand All @@ -577,21 +589,40 @@ BlockchainSynchronizer::UpdateConsumersResult BlockchainSynchronizer::updateCons

if (result.hasNewBlocks) {
uint32_t startOffset = result.newBlockHeight - interval.startHeight;
// update consumer
uint32_t blockCount = static_cast<uint32_t>(blocks.size()) - startOffset;
// update consumer
m_logger(DEBUGGING) << "Adding blocks to consumer, consumer " << kv.first << ", start index " << result.newBlockHeight << ", count " << blockCount;
if (kv.first->onNewBlocks(blocks.data() + startOffset, result.newBlockHeight, blockCount)) {
uint32_t addedCount = kv.first->onNewBlocks(blocks.data() + startOffset, result.newBlockHeight, blockCount);
if (addedCount > 0) {
if (addedCount < blockCount) {
m_logger(ERROR, BRIGHT_RED) << "Failed to add " << (blockCount - addedCount) << " blocks of " << blockCount << " to consumer, consumer " << kv.first;
hasErrors = true;
}

// update state if consumer succeeded
kv.second->addBlocks(interval.blocks.data() + startOffset, result.newBlockHeight, static_cast<uint32_t>(interval.blocks.size()) - startOffset);
kv.second->addBlocks(interval.blocks.data() + startOffset, result.newBlockHeight, addedCount);
smthChanged = true;
} else {
m_logger(ERROR, BRIGHT_RED) << "Failed to add blocks to consumer, consumer " << kv.first;
return UpdateConsumersResult::errorOccurred;
hasErrors = true;
}

if (addedCount > 0) {
lastBlockIndex = std::min(lastBlockIndex, startOffset + addedCount - 1);
}
}
}

if (smthChanged) {
if (lastBlockIndex != std::numeric_limits<uint32_t>::max()) {
assert(lastBlockIndex < blocks.size());
lastBlockId = blocks[lastBlockIndex].blockHash;
m_logger(DEBUGGING) << "Last block hash " << lastBlockId << ", index " << (interval.startHeight + lastBlockIndex);
}

if (hasErrors) {
m_logger(DEBUGGING) << "Not all blocks were added to consumers, there were errors";
return UpdateConsumersResult::errorOccurred;
} else if (smthChanged) {
m_logger(DEBUGGING) << "Blocks added to consumers";
return UpdateConsumersResult::addedNewBlocks;
} else {
Expand Down
7 changes: 5 additions & 2 deletions src/Transfers/BlockchainSynchronizer.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers
// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers
// Copyright (c) 2016-2019, The Karbo developers
//
// This file is part of Karbo.
//
Expand Down Expand Up @@ -38,7 +39,7 @@ class BlockchainSynchronizer :
public:

BlockchainSynchronizer(INode& node, Logging::ILogger& logger, const Crypto::Hash& genesisBlockHash);
~BlockchainSynchronizer();
virtual ~BlockchainSynchronizer() override;

// IBlockchainSynchronizer
virtual void addConsumer(IBlockchainConsumer* consumer) override;
Expand Down Expand Up @@ -146,6 +147,8 @@ class BlockchainSynchronizer :
mutable std::mutex m_consumersMutex;
mutable std::mutex m_stateMutex;
std::condition_variable m_hasWork;

bool wasStarted = false;
};

}
6 changes: 4 additions & 2 deletions src/Transfers/IBlockchainSynchronizer.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers
// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers
// Copyright (c) 2016-2019, The Karbo developers
//
// This file is part of Karbo.
//
Expand Down Expand Up @@ -37,6 +38,7 @@ class IBlockchainSynchronizerObserver {
public:
virtual void synchronizationProgressUpdated(uint32_t processedBlockCount, uint32_t totalBlockCount) {}
virtual void synchronizationCompleted(std::error_code result) {}
virtual ~IBlockchainSynchronizerObserver() {}
};

class IBlockchainConsumerObserver;
Expand All @@ -47,7 +49,7 @@ class IBlockchainConsumer : public IObservable<IBlockchainConsumerObserver> {
virtual SynchronizationStart getSyncStart() = 0;
virtual const std::unordered_set<Crypto::Hash>& getKnownPoolTxIds() const = 0;
virtual void onBlockchainDetach(uint32_t height) = 0;
virtual bool onNewBlocks(const CompleteBlock* blocks, uint32_t startHeight, uint32_t count) = 0;
virtual uint32_t onNewBlocks(const CompleteBlock* blocks, uint32_t startHeight, uint32_t count) = 0;
virtual std::error_code onPoolUpdated(const std::vector<std::unique_ptr<ITransactionReader>>& addedTransactions, const std::vector<Crypto::Hash>& deletedTransactions) = 0;

virtual std::error_code addUnconfirmedTransaction(const ITransactionReader& transaction) = 0;
Expand Down
9 changes: 4 additions & 5 deletions src/Transfers/TransfersConsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace CryptoNote {

class INode;

class TransfersConsumer: public IObservableImpl<IBlockchainConsumerObserver, IBlockchainConsumer> {
class TransfersConsumer : public IObservableImpl<IBlockchainConsumerObserver, IBlockchainConsumer> {
public:

TransfersConsumer(const CryptoNote::Currency& currency, INode& node, Logging::ILogger& logger, const Crypto::SecretKey& viewSecret);
Expand All @@ -48,11 +48,11 @@ class TransfersConsumer: public IObservableImpl<IBlockchainConsumerObserver, IBl

void initTransactionPool(const std::unordered_set<Crypto::Hash>& uncommitedTransactions);
void addPublicKeysSeen(const Crypto::Hash& transactionHash, const Crypto::PublicKey& outputKey);

// IBlockchainConsumer
virtual SynchronizationStart getSyncStart() override;
virtual void onBlockchainDetach(uint32_t height) override;
virtual bool onNewBlocks(const CompleteBlock* blocks, uint32_t startHeight, uint32_t count) override;
virtual uint32_t onNewBlocks(const CompleteBlock* blocks, uint32_t startHeight, uint32_t count) override;
virtual std::error_code onPoolUpdated(const std::vector<std::unique_ptr<ITransactionReader>>& addedTransactions, const std::vector<Crypto::Hash>& deletedTransactions) override;
virtual const std::unordered_set<Crypto::Hash>& getKnownPoolTxIds() const override;

Expand All @@ -78,8 +78,7 @@ class TransfersConsumer: public IObservableImpl<IBlockchainConsumerObserver, IBl
void processTransaction(const TransactionBlockInfo& blockInfo, const ITransactionReader& tx, const PreprocessInfo& info);
void processOutputs(const TransactionBlockInfo& blockInfo, TransfersSubscription& sub, const ITransactionReader& tx,
const std::vector<TransactionOutputInformationIn>& outputs, const std::vector<uint32_t>& globalIdxs, bool& contains, bool& updated);
std::error_code createTransfers(const AccountKeys& account, const TransactionBlockInfo& blockInfo, const ITransactionReader& tx,
const std::vector<uint32_t>& outputs, const std::vector<uint32_t>& globalIdxs, std::vector<TransactionOutputInformationIn>& transfers);

std::error_code getGlobalIndices(const Crypto::Hash& transactionHash, std::vector<uint32_t>& outsGlobalIndices);

void updateSyncStart();
Expand Down

0 comments on commit e82e322

Please sign in to comment.