Skip to content

Commit

Permalink
Mempool syncer: subscribe to consensus sync events in order add new p…
Browse files Browse the repository at this point in the history
…eers dynamically
  • Loading branch information
Eligioo committed Sep 10, 2024
1 parent ec8f4ed commit ff31189
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 21 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions mempool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ nimiq-account = { workspace = true }
nimiq-block = { workspace = true }
nimiq-blockchain = { workspace = true }
nimiq-blockchain-interface = { workspace = true }
nimiq-consensus = { workspace = true }
nimiq-database = { workspace = true }
nimiq-hash = { workspace = true }
nimiq-keys = { workspace = true }
Expand Down
27 changes: 17 additions & 10 deletions mempool/mempool-task/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use futures::{stream::BoxStream, Future, Stream, StreamExt};
use log::{debug, trace, warn};
use nimiq_blockchain::Blockchain;
use nimiq_blockchain_interface::{AbstractBlockchain, BlockchainEvent};
use nimiq_consensus::{sync::syncer::SyncerEvent, Consensus, ConsensusEvent, ConsensusProxy};
use nimiq_consensus::{sync::syncer::SyncEvent, Consensus, ConsensusEvent, ConsensusProxy};
use nimiq_mempool::{config::MempoolConfig, mempool::Mempool};
use nimiq_network_interface::network::{Network, NetworkEvent, SubscribeEvents};
use nimiq_utils::spawn;
Expand All @@ -36,7 +36,7 @@ pub struct MempoolTask<N: Network> {
consensus_event_rx: BroadcastStream<ConsensusEvent>,
blockchain_event_rx: BoxStream<'static, BlockchainEvent>,
network_event_rx: SubscribeEvents<N::PeerId>,
syncer_event_rx: BroadcastStream<SyncerEvent<N::PeerId>>,
sync_event_rx: BroadcastStream<SyncEvent<<N as Network>::PeerId>>,

peers_in_live_sync: HashSet<N::PeerId>,

Expand All @@ -56,9 +56,11 @@ impl<N: Network> MempoolTask<N> {
) -> Self {
let consensus_event_rx = consensus.subscribe_events();
let network_event_rx = consensus.network.subscribe_events();
let syncer_event_rx = consensus.sync.subscribe_events();
let blockchain_event_rx = blockchain.read().notifier_as_stream();

let proxy = consensus.proxy();
let sync_event_rx = proxy.subscribe_sync_events();

let peers_in_live_sync = HashSet::from_iter(consensus.sync.peers());
let mempool = Arc::new(Mempool::new(
Arc::clone(&blockchain),
Expand All @@ -68,13 +70,13 @@ impl<N: Network> MempoolTask<N> {
let mempool_active = false;

Self {
consensus: consensus.proxy(),
consensus: proxy,
peers_in_live_sync,

consensus_event_rx,
blockchain_event_rx,
network_event_rx,
syncer_event_rx,
sync_event_rx,

mempool: Arc::clone(&mempool),
mempool_active,
Expand Down Expand Up @@ -109,17 +111,21 @@ impl<N: Network> MempoolTask<N> {
let peers = self.peers_in_live_sync.clone().into_iter().collect();
#[cfg(not(feature = "metrics"))]
spawn({
let consensus = self.consensus.clone();
async move {
// The mempool is not updated while consensus is lost.
// Thus, we need to check all transactions if they are still valid.
mempool.cleanup();
mempool.start_executors(network, None, None, peers).await;
mempool
.start_executors(network, None, None, peers, consensus)
.await;
}
});
#[cfg(feature = "metrics")]
spawn({
let mempool_monitor = self.mempool_monitor.clone();
let ctrl_mempool_monitor = self.control_mempool_monitor.clone();
let consensus = self.consensus.clone();
async move {
// The mempool is not updated while consensus is lost.
// Thus, we need to check all transactions if they are still valid.
Expand All @@ -131,6 +137,7 @@ impl<N: Network> MempoolTask<N> {
Some(mempool_monitor),
Some(ctrl_mempool_monitor),
peers,
consensus,
)
.await;
}
Expand Down Expand Up @@ -188,7 +195,7 @@ impl<N: Network> MempoolTask<N> {
}
}

fn on_syncer_event(&mut self, event: SyncEvent<N::PeerId>) {
fn on_consensus_sync_event(&mut self, event: SyncEvent<N::PeerId>) {
match event {
SyncEvent::AddLiveSync(peer_id) => {
self.peers_in_live_sync.insert(peer_id);
Expand All @@ -210,9 +217,9 @@ impl<N: Network> Stream for MempoolTask<N> {
type Item = MempoolEvent;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Process syncer updates.
while let Poll::Ready(Some(Ok(event))) = self.syncer_event_rx.poll_next_unpin(cx) {
self.on_syncer_event(event)
// Process consensus sync updates.
while let Poll::Ready(Some(Ok(event))) = self.sync_event_rx.poll_next_unpin(cx) {
self.on_consensus_sync_event(event)
}

// Process network updates.
Expand Down
6 changes: 4 additions & 2 deletions mempool/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use nimiq_account::ReservedBalance;
use nimiq_block::Block;
use nimiq_blockchain::{Blockchain, TransactionVerificationCache};
use nimiq_blockchain_interface::AbstractBlockchain;
use nimiq_consensus::ConsensusProxy;
use nimiq_hash::{Blake2bHash, Hash};
use nimiq_keys::Address;
use nimiq_network_interface::{
Expand Down Expand Up @@ -146,6 +147,7 @@ impl Mempool {
monitor: Option<TaskMonitor>,
control_monitor: Option<TaskMonitor>,
mut peers: Vec<N::PeerId>,
consensus: ConsensusProxy<N>,
) {
let executor_handle = self.executor_handle.lock().await;
let control_executor_handle = self.control_executor_handle.lock().await;
Expand All @@ -161,8 +163,8 @@ impl Mempool {
let regular_transactions_syncer = MempoolSyncer::new(
peers.clone(),
MempoolTransactionType::Regular,
Arc::clone(&network),
Arc::clone(&self.blockchain),
consensus.clone(),
Arc::clone(&self.state),
);

Expand All @@ -185,8 +187,8 @@ impl Mempool {
let control_transactions_syncer = MempoolSyncer::new(
peers,
MempoolTransactionType::Control,
Arc::clone(&network),
Arc::clone(&self.blockchain),
consensus.clone(),
Arc::clone(&self.state),
);

Expand Down
23 changes: 18 additions & 5 deletions mempool/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use messages::{
ResponseMempoolHashes, ResponseMempoolTransactions,
};
use nimiq_blockchain::Blockchain;
use nimiq_consensus::{sync::syncer::SyncEvent, ConsensusProxy};
use nimiq_hash::{Blake2bHash, Hash};
use nimiq_network_interface::{
network::Network,
Expand All @@ -28,6 +29,7 @@ use nimiq_transaction::{historic_transaction::RawTransactionHash, Transaction};
use nimiq_utils::{spawn, stream::FuturesUnordered};
use parking_lot::RwLock;
use tokio::time::Sleep;
use tokio_stream::wrappers::BroadcastStream;

use crate::{executor::PubsubIdOrPeerId, mempool_state::MempoolState};

Expand Down Expand Up @@ -77,11 +79,14 @@ const MAX_HASHES_PER_REQUEST: usize = 500;
const MAX_TOTAL_TRANSACTIONS: usize = 5000;
const SHUTDOWN_TIMEOUT_DURATION: Duration = Duration::from_secs(10 * 60); // 10 minutes

/// Struct responsible for discovering hashes and retrieving transactions from the mempool of other nodes that have mempool
/// Struct responsible for discovering hashes and retrieving transactions from the mempool of other nodes that have a mempool
pub(crate) struct MempoolSyncer<N: Network> {
/// Timeout to gracefully shutdown the mempool syncer entirely
shutdown_timer: Pin<Box<Sleep>>,

/// Consensus sync event receiver
consensus_sync_event_rx: BroadcastStream<SyncEvent<<N as Network>::PeerId>>,

/// Blockchain reference
blockchain: Arc<RwLock<Blockchain>>,

Expand Down Expand Up @@ -118,15 +123,15 @@ impl<N: Network> MempoolSyncer<N> {
pub fn new(
peers: Vec<N::PeerId>,
transaction_type: MempoolTransactionType,
network: Arc<N>,
blockchain: Arc<RwLock<Blockchain>>,
consensus: ConsensusProxy<N>,
mempool_state: Arc<RwLock<MempoolState>>,
) -> Self {
let hashes_requests = peers
.iter()
.map(|peer_id| {
let peer_id = *peer_id;
let network = Arc::clone(&network);
let network = Arc::clone(&consensus.network);
let transaction_type = transaction_type.to_owned();
async move {
(
Expand All @@ -142,9 +147,10 @@ impl<N: Network> MempoolSyncer<N> {

Self {
shutdown_timer: Box::pin(sleep_until(Instant::now() + SHUTDOWN_TIMEOUT_DURATION)),
consensus_sync_event_rx: consensus.subscribe_sync_events(),
blockchain,
hashes_requests,
network: Arc::clone(&network),
network: consensus.network,
peers,
mempool_state: Arc::clone(&mempool_state),
unknown_hashes: HashMap::new(),
Expand Down Expand Up @@ -332,6 +338,13 @@ impl<N: Network> Stream for MempoolSyncer<N> {
return Poll::Ready(None);
}

// Then we check if peers got added to live sync in the mean time
while let Poll::Ready(Some(Ok(event))) = self.consensus_sync_event_rx.poll_next_unpin(cx) {
match event {
SyncEvent::AddLiveSync(peer_id) => self.add_peer(peer_id),
}
}

// Then we check our RequestMempoolHashes responses
while let Poll::Ready(Some((peer_id, result))) = self.hashes_requests.poll_next_unpin(cx) {
match result {
Expand Down Expand Up @@ -375,7 +388,7 @@ impl<N: Network> Stream for MempoolSyncer<N> {
continue;
}

info!(num = %transactions.len(), "Pushed synced mempool transactions into the mempool");
info!(num = %transactions.len(), "Synced mempool transactions");
self.transactions.extend(transactions);
}
Err(err) => {
Expand Down
21 changes: 17 additions & 4 deletions mempool/src/sync/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod tests {
use nimiq_network_mock::MockHub;
use nimiq_test_log::test;
use nimiq_test_utils::{
consensus::consensus,
test_rng::test_rng,
test_transaction::{generate_transactions, TestAccount, TestTransaction},
};
Expand Down Expand Up @@ -70,6 +71,10 @@ mod tests {
net1.dial_mock(&net2);
let peer_ids = net1.get_peers();

// Setup consensus proxy
let consensus = consensus(Arc::clone(&blockchain), Arc::clone(&net1)).await;
let proxy = consensus.proxy();

// Setup streams to respond to requests
let hash_stream = net2.receive_requests::<RequestMempoolHashes>();
let txns_stream = net2.receive_requests::<RequestMempoolTransactions>();
Expand Down Expand Up @@ -111,8 +116,8 @@ mod tests {
let mut syncer = MempoolSyncer::new(
peer_ids,
MempoolTransactionType::Regular,
net1,
Arc::clone(&blockchain),
proxy,
Arc::clone(&state),
);

Expand Down Expand Up @@ -186,6 +191,10 @@ mod tests {
net1.dial_mock(&net2);
let peer_ids = net1.get_peers();

// Setup consensus proxy
let consensus = consensus(Arc::clone(&blockchain), Arc::clone(&net1)).await;
let proxy = consensus.proxy();

// Setup stream to respond to requests
let hash_stream = net2.receive_requests::<RequestMempoolHashes>();

Expand All @@ -212,8 +221,8 @@ mod tests {
let mut syncer = MempoolSyncer::new(
peer_ids,
MempoolTransactionType::Regular,
net1,
Arc::clone(&blockchain),
proxy,
Arc::clone(&state),
);

Expand Down Expand Up @@ -260,12 +269,16 @@ mod tests {
let mut hub = MockHub::default();
let net1 = Arc::new(hub.new_network());

// Setup consensus proxy
let consensus = consensus(Arc::clone(&blockchain), Arc::clone(&net1)).await;
let proxy = consensus.proxy();

// Create a new mempool syncer with 0 peers to sync with
let mut syncer = MempoolSyncer::new(
vec![],
MempoolTransactionType::Regular,
Arc::clone(&net1),
Arc::clone(&blockchain),
proxy,
Arc::clone(&state),
);

Expand Down Expand Up @@ -306,7 +319,7 @@ mod tests {
let _ = poll!(syncer.next());
sleep(Duration::from_millis(200)).await;

// we have received unknown hashes from the added peer
// We have received unknown hashes from the added peer
let _ = poll!(syncer.next());
assert_eq!(syncer.unknown_hashes.len(), num_transactions);
}
Expand Down

0 comments on commit ff31189

Please sign in to comment.