From 0f8449050786f77aa82d88bca8ac1fa57a79f70a Mon Sep 17 00:00:00 2001 From: Stefan Date: Tue, 10 Sep 2024 16:27:33 +0200 Subject: [PATCH] Mempool syncer: minor improvements - Only try to construct mempool transaction requests if new hashes are discovered - Check the mempool state first before checking the blockchain if a transaction is already known/included - Bound the amount of received hashes that will be processed per peer --- mempool/mempool-task/src/lib.rs | 11 ++++---- mempool/src/sync/mod.rs | 47 ++++++++++++++++++++------------- 2 files changed, 34 insertions(+), 24 deletions(-) diff --git a/mempool/mempool-task/src/lib.rs b/mempool/mempool-task/src/lib.rs index a00bfb7c5f..7511f0a6f2 100644 --- a/mempool/mempool-task/src/lib.rs +++ b/mempool/mempool-task/src/lib.rs @@ -56,12 +56,7 @@ impl MempoolTask { ) -> Self { let consensus_event_rx = consensus.subscribe_events(); let network_event_rx = consensus.network.subscribe_events(); - let blockchain_event_rx = blockchain.read().notifier_as_stream(); - - let proxy = consensus.proxy(); - let sync_event_rx = proxy.subscribe_sync_events(); - let peers_in_live_sync = HashSet::from_iter(consensus.sync.peers()); let mempool = Arc::new(Mempool::new( Arc::clone(&blockchain), mempool_config, @@ -69,6 +64,12 @@ impl MempoolTask { )); let mempool_active = false; + let blockchain_event_rx = blockchain.read().notifier_as_stream(); + + let proxy = consensus.proxy(); + let sync_event_rx = proxy.subscribe_sync_events(); + let peers_in_live_sync = HashSet::from_iter(consensus.sync.peers()); + Self { consensus: proxy, peers_in_live_sync, diff --git a/mempool/src/sync/mod.rs b/mempool/src/sync/mod.rs index 95ed53a125..b7f873fa3f 100644 --- a/mempool/src/sync/mod.rs +++ b/mempool/src/sync/mod.rs @@ -76,7 +76,7 @@ impl HashRequestStatus { } const MAX_HASHES_PER_REQUEST: usize = 500; -const MAX_TOTAL_TRANSACTIONS: usize = 5000; +const MAX_TOTAL_HASHES: usize = 25_000; const SHUTDOWN_TIMEOUT_DURATION: Duration = Duration::from_secs(10 * 60); // 10 minutes /// Struct responsible for discovering hashes and retrieving transactions from the mempool of other nodes that have a mempool @@ -85,7 +85,7 @@ pub(crate) struct MempoolSyncer { shutdown_timer: Pin>, /// Consensus sync event receiver - consensus_sync_event_rx: BroadcastStream::PeerId>>, + consensus_sync_event_rx: BroadcastStream::PeerId>>, /// Blockchain reference blockchain: Arc>, @@ -161,30 +161,34 @@ impl MempoolSyncer { } /// Push newly discovered hashes into the `unknown_hashes` and keep track which peers have those hashes - fn push_unknown_hashes(&mut self, hashes: Vec, peer_id: N::PeerId) { + fn push_unknown_hashes(&mut self, hashes: Vec, peer_id: N::PeerId) -> bool { let blockchain = self.blockchain.read(); let state = self.mempool_state.read(); debug!(peer_id = %peer_id, num = %hashes.len(), "Received unknown mempool hashes"); + let mut new_hashes_discovered = false; - hashes.into_iter().for_each(|hash| { + hashes.into_iter().take(MAX_TOTAL_HASHES).for_each(|hash| { // Perform some basic checks to reduce the amount of transactions we are going to request later - // TODO: what if I respond with MAX_TOTAL_TRANSACTIONS fake hashes - if self.unknown_hashes.len() < MAX_TOTAL_TRANSACTIONS - && !blockchain + if state.contains(&hash) + || blockchain .contains_tx_in_validity_window(&RawTransactionHash::from((hash).clone()), None) - && !state.contains(&hash) { - match self.unknown_hashes.entry(hash) { - Occupied(mut entry) => { - entry.get_mut().add_peer(peer_id); - } - Vacant(entry) => { - entry.insert(HashRequestStatus::new(vec![peer_id])); - } - }; + return; } - }) + + match self.unknown_hashes.entry(hash) { + Occupied(mut entry) => { + entry.get_mut().add_peer(peer_id); + } + Vacant(entry) => { + entry.insert(HashRequestStatus::new(vec![peer_id])); + new_hashes_discovered = true; + } + }; + }); + + new_hashes_discovered } /// Add peer to discover its mempool @@ -346,10 +350,13 @@ impl Stream for MempoolSyncer { } // Then we check our RequestMempoolHashes responses + let mut new_hashes_discovered = false; while let Poll::Ready(Some((peer_id, result))) = self.hashes_requests.poll_next_unpin(cx) { match result { Ok(hashes) => { - self.push_unknown_hashes(hashes.hashes, peer_id); + if self.push_unknown_hashes(hashes.hashes, peer_id) { + new_hashes_discovered = true; + } } Err(err) => { error!(%err, %peer_id, "Failed to fetch mempool hashes"); @@ -358,7 +365,9 @@ impl Stream for MempoolSyncer { } // Then we construct our RequestMempoolTransactions requests and send them over the network to our peers - self.send_mempool_transactions_requests(); + if new_hashes_discovered { + self.send_mempool_transactions_requests(); + } // Then we check our RequestMempoolTransactions responses while let Poll::Ready(Some((peer_id, result))) =