Skip to content

Commit

Permalink
fix: pending transactions subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
kstdl committed Jan 21, 2025
1 parent c3de41b commit d117267
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ class TransactionManager : public std::enable_shared_from_this<TransactionManage
addr_t getFullNodeAddress() const;

public:
util::Event<TransactionManager, h256> const transaction_accepted_{};
util::Event<TransactionManager, std::shared_ptr<Transaction>> const transaction_added_{};

private:
const FullNodeConfig &kConf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ TransactionStatus TransactionManager::insertValidatedTransaction(std::shared_ptr

const auto last_block_number = final_chain_->lastBlockNumber();
LOG(log_dg_) << "Transaction " << trx_hash << " inserted in trx pool";
if (proposable) {
transaction_added_.emit(tx);
}
return transactions_pool_.insert(std::move(tx), proposable, last_block_number);
}

Expand Down Expand Up @@ -189,8 +192,6 @@ std::shared_ptr<Transaction> TransactionManager::getTransaction(trx_hash_t const
}

void TransactionManager::saveTransactionsFromDagBlock(SharedTransactions const &trxs) {
std::vector<trx_hash_t> accepted_transactions;
accepted_transactions.reserve(trxs.size());
auto write_batch = db_->createWriteBatch();
vec_trx_t trx_hashes;
std::transform(trxs.begin(), trxs.end(), std::back_inserter(trx_hashes),
Expand All @@ -202,26 +203,21 @@ void TransactionManager::saveTransactionsFromDagBlock(SharedTransactions const &
std::unique_lock transactions_lock(transactions_mutex_);

for (auto t : trxs) {
const auto tx_hash = t->getHash();
const auto trx_hash = t->getHash();

if (!recently_finalized_transactions_.contains(tx_hash) && !nonfinalized_transactions_in_dag_.contains(tx_hash) &&
!db_->transactionFinalized(tx_hash)) {
if (!recently_finalized_transactions_.contains(trx_hash) &&
!nonfinalized_transactions_in_dag_.contains(trx_hash) && !db_->transactionFinalized(trx_hash)) {
db_->addTransactionToBatch(*t, write_batch);
nonfinalized_transactions_in_dag_.emplace(tx_hash, t);
if (transactions_pool_.erase(tx_hash)) {
LOG(log_dg_) << "Transaction " << tx_hash << " removed from trx pool ";
// Transactions are counted when included in DAG
accepted_transactions.emplace_back(tx_hash);
nonfinalized_transactions_in_dag_.emplace(trx_hash, t);
if (transactions_pool_.erase(trx_hash)) {
LOG(log_dg_) << "Transaction " << trx_hash << " removed from trx pool ";
}
trx_count_++;
}
}
db_->addStatusFieldToBatch(StatusDbField::TrxCount, trx_count_, write_batch);
db_->commitWriteBatch(write_batch);
}
for (const auto &trx_hash : accepted_transactions) {
transaction_accepted_.emit(trx_hash);
}
}

void TransactionManager::recoverNonfinalizedTransactions() {
Expand Down
15 changes: 9 additions & 6 deletions libraries/core_libs/node/src/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,13 +263,16 @@ void FullNode::start() {
*rpc_thread_pool_);
}

trx_mgr_->transaction_accepted_.subscribe(
[eth_json_rpc = as_weak(eth_json_rpc), ws = as_weak(jsonrpc_ws_)](auto const &trx_hash) {
trx_mgr_->transaction_added_.subscribe(
[eth_json_rpc = as_weak(eth_json_rpc), ws = as_weak(jsonrpc_ws_),
node_addr = getAddress()](const std::shared_ptr<Transaction> &trx) {
if (auto _eth_json_rpc = eth_json_rpc.lock()) {
_eth_json_rpc->note_pending_transaction(trx_hash);
_eth_json_rpc->note_pending_transaction(trx->getHash());
}
if (auto _ws = ws.lock()) {
_ws->newPendingTransaction(trx_hash);
if (trx->getSender() == node_addr) {
_ws->newPendingTransaction(trx->getHash());
}
}
},
*rpc_thread_pool_);
Expand Down Expand Up @@ -409,8 +412,8 @@ void FullNode::rebuildDb() {
}
} else {
next_period_data = std::make_shared<PeriodData>(std::move(data));
// More efficient to get sender(which is expensive) on this thread which is not as busy as the thread that pushes
// blocks to chain
// More efficient to get sender(which is expensive) on this thread which is not as busy as the thread that
// pushes blocks to chain
for (auto &t : next_period_data->transactions) t->getSender();
cert_votes = next_period_data->previous_block_cert_votes;
}
Expand Down

0 comments on commit d117267

Please sign in to comment.