Skip to content

Commit

Permalink
Mempool syncer: minor improvements
Browse files Browse the repository at this point in the history
- Only try to construct mempool transaction requests if new hashes are discovered
- Check the mempool state first before checking the blockchain if a transaction is already known/included
- Bound the amount of received hashes that will be processed per peer
  • Loading branch information
Eligioo committed Sep 11, 2024
1 parent ec1c394 commit c43f4ae
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 24 deletions.
11 changes: 6 additions & 5 deletions mempool/mempool-task/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,20 @@ impl<N: Network> MempoolTask<N> {
) -> Self {
let consensus_event_rx = consensus.subscribe_events();
let network_event_rx = consensus.network.subscribe_events();
let blockchain_event_rx = blockchain.read().notifier_as_stream();

let proxy = consensus.proxy();
let sync_event_rx = proxy.subscribe_sync_events();

let peers_in_live_sync = HashSet::from_iter(consensus.sync.peers());
let mempool = Arc::new(Mempool::new(
Arc::clone(&blockchain),
mempool_config,
Arc::clone(&consensus.network),
));
let mempool_active = false;

let blockchain_event_rx = blockchain.read().notifier_as_stream();

let proxy = consensus.proxy();
let sync_event_rx = proxy.subscribe_sync_events();
let peers_in_live_sync = HashSet::from_iter(consensus.sync.peers());

Self {
consensus: proxy,
peers_in_live_sync,
Expand Down
48 changes: 29 additions & 19 deletions mempool/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl<N: Network> HashRequestStatus<N> {
}

const MAX_HASHES_PER_REQUEST: usize = 500;
const MAX_TOTAL_TRANSACTIONS: usize = 5000;
const MAX_TOTAL_HASHES: usize = 25_000;
const SHUTDOWN_TIMEOUT_DURATION: Duration = Duration::from_secs(10 * 60); // 10 minutes

/// Struct responsible for discovering hashes and retrieving transactions from the mempool of other nodes that have a mempool
Expand All @@ -85,7 +85,7 @@ pub(crate) struct MempoolSyncer<N: Network> {
shutdown_timer: Pin<Box<Sleep>>,

/// Consensus sync event receiver
consensus_sync_event_rx: BroadcastStream<SyncEvent<<N as Network>::PeerId>>,
consensus_sync_event_rx: BroadcastStream<SyncEvent<<N>::PeerId>>,

/// Blockchain reference
blockchain: Arc<RwLock<Blockchain>>,
Expand Down Expand Up @@ -161,30 +161,34 @@ impl<N: Network> MempoolSyncer<N> {
}

/// Push newly discovered hashes into the `unknown_hashes` and keep track which peers have those hashes
fn push_unknown_hashes(&mut self, hashes: Vec<Blake2bHash>, peer_id: N::PeerId) {
fn push_unknown_hashes(&mut self, hashes: Vec<Blake2bHash>, peer_id: N::PeerId) -> bool {
let blockchain = self.blockchain.read();
let state = self.mempool_state.read();

debug!(peer_id = %peer_id, num = %hashes.len(), "Received unknown mempool hashes");
let mut new_hashes_discovered = false;

hashes.into_iter().for_each(|hash| {
hashes.into_iter().take(MAX_TOTAL_HASHES).for_each(|hash| {
// Perform some basic checks to reduce the amount of transactions we are going to request later
// TODO: what if I respond with MAX_TOTAL_TRANSACTIONS fake hashes
if self.unknown_hashes.len() < MAX_TOTAL_TRANSACTIONS
&& !blockchain
if state.contains(&hash)
|| blockchain
.contains_tx_in_validity_window(&RawTransactionHash::from((hash).clone()), None)
&& !state.contains(&hash)
{
match self.unknown_hashes.entry(hash) {
Occupied(mut entry) => {
entry.get_mut().add_peer(peer_id);
}
Vacant(entry) => {
entry.insert(HashRequestStatus::new(vec![peer_id]));
}
};
return;
}
})

match self.unknown_hashes.entry(hash) {
Occupied(mut entry) => {
entry.get_mut().add_peer(peer_id);
}
Vacant(entry) => {
entry.insert(HashRequestStatus::new(vec![peer_id]));
new_hashes_discovered = true;
}
};
});

new_hashes_discovered
}

/// Add peer to discover its mempool
Expand All @@ -193,6 +197,7 @@ impl<N: Network> MempoolSyncer<N> {
return;
}

debug!(%peer_id, "Peer added to mempool sync");
self.peers.push(peer_id);
let network = Arc::clone(&self.network);
let transaction_type = self.mempool_transaction_type.clone();
Expand Down Expand Up @@ -346,10 +351,13 @@ impl<N: Network> Stream for MempoolSyncer<N> {
}

// Then we check our RequestMempoolHashes responses
let mut new_hashes_discovered = false;
while let Poll::Ready(Some((peer_id, result))) = self.hashes_requests.poll_next_unpin(cx) {
match result {
Ok(hashes) => {
self.push_unknown_hashes(hashes.hashes, peer_id);
if self.push_unknown_hashes(hashes.hashes, peer_id) {
new_hashes_discovered = true;
}
}
Err(err) => {
error!(%err, %peer_id, "Failed to fetch mempool hashes");
Expand All @@ -358,7 +366,9 @@ impl<N: Network> Stream for MempoolSyncer<N> {
}

// Then we construct our RequestMempoolTransactions requests and send them over the network to our peers
self.send_mempool_transactions_requests();
if new_hashes_discovered {
self.send_mempool_transactions_requests();
}

// Then we check our RequestMempoolTransactions responses
while let Poll::Ready(Some((peer_id, result))) =
Expand Down

0 comments on commit c43f4ae

Please sign in to comment.