Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: change add block to be async #230

Merged
merged 2 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 143 additions & 54 deletions src/server/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ use crate::{
},
sharechain::{
p2block::{P2Block, CURRENT_CHAIN_ID},
p2chain::ChainAddResult,
ShareChain,
},
};
Expand Down Expand Up @@ -215,6 +214,7 @@ struct PerformCatchUpSync {
pub peer: PeerId,
pub last_block_from_them: Option<(u64, FixedHash)>,
pub their_height: u64,
// pub their_pow: u128,
pub permit: Option<OwnedSemaphorePermit>,
}

Expand Down Expand Up @@ -287,6 +287,11 @@ pub(crate) struct ConnectionCounters {
enum InnerRequest {
DoSyncMissingBlocks(SyncMissingBlocks),
PerformCatchUpSync(PerformCatchUpSync),
AddSyncedBlock {
algo: PowAlgorithm,
block: Arc<P2Block>,
source_peer: PeerId,
},
}

/// Service is the implementation that holds every peer-to-peer related logic
Expand Down Expand Up @@ -928,6 +933,7 @@ where S: ShareChain
peer: peer_id,
last_block_from_them: None,
their_height: response.info.current_sha3x_height,
// their_pow: response.info.current_sha3x_pow,
permit: None,
};
let _unused = self
Expand All @@ -942,6 +948,7 @@ where S: ShareChain
peer: peer_id,
last_block_from_them: None,
their_height: response.info.current_random_x_height,
// their_pow: response.info.current_random_x_pow,
permit: None,
};
let _unused = self
Expand Down Expand Up @@ -1066,7 +1073,7 @@ where S: ShareChain
let SyncMissingBlocks {
peer,
algo,
missing_parents,
mut missing_parents,
is_from_new_block_notify,
depth,
} = sync_share_chain;
Expand All @@ -1087,6 +1094,22 @@ where S: ShareChain
return;
}

let share_chain = match algo {
PowAlgorithm::RandomX => self.share_chain_random_x.clone(),
PowAlgorithm::Sha3x => self.share_chain_sha3x.clone(),
};
let blocks_already_received = share_chain.get_blocks(&missing_parents).await;
missing_parents.retain(|(height, hash)| {
!blocks_already_received
.iter()
.any(|b| b.height == *height && b.hash == *hash)
});

if missing_parents.is_empty() {
info!(target: SYNC_REQUEST_LOG_TARGET, squad = &self.config.squad; "All missing blocks have already been received");
return;
}

// If it's not from new_block_notify, ask only the peer that sent the blocks
if !is_from_new_block_notify {
info!(target: SYNC_REQUEST_LOG_TARGET, squad = &self.config.squad; "Sending sync request direct to peer {} for blocks {:?} because we did not receive it from sync", peer, missing_parents.iter().map(|(height, hash)|format!("{}({:x}{:x}{:x}{:x})",height, hash[0], hash[1], hash[2], hash[3])).collect::<Vec<String>>());
Expand Down Expand Up @@ -1583,30 +1606,25 @@ where S: ShareChain

let timer = Instant::now();
let algo = response.algo();
let (share_chain, synced_bool) = match algo {
let share_chain = match algo {
PowAlgorithm::RandomX => {
self.randomx_last_sync_requested_block = None;
(
self.share_chain_random_x.clone(),
self.are_we_synced_with_randomx_p2pool.clone(),
)
self.share_chain_random_x.clone()
},
PowAlgorithm::Sha3x => {
self.sha3x_last_sync_requested_block = None;
(
self.share_chain_sha3x.clone(),
self.are_we_synced_with_sha3x_p2pool.clone(),
)
self.share_chain_sha3x.clone()
},
};
let their_tip_hash = *response.tip_hash();
let their_height = response.tip_height();
let their_pow = response.achieved_pow();
let mut blocks: Vec<_> = response.into_blocks().into_iter().map(Arc::new).collect();
info!(target: SYNC_REQUEST_LOG_TARGET, "Received catch up sync response for chain {} from {} with blocks {}. Their tip: {}:{}", algo, peer, blocks.iter().map(|a| a.height.to_string()).join(", "), their_height, &their_tip_hash.to_hex()[0..8]);
if blocks.is_empty() {
warn!(target: SYNC_REQUEST_LOG_TARGET, "Peer {} sent 0 blocks for catch up sync", peer);
return;
}
info!(target: SYNC_REQUEST_LOG_TARGET, "Received catch up sync response for chain {} from {} with blocks {}. Their tip: {}:{}", algo, peer, blocks.iter().map(|a| a.height.to_string()).join(", "), their_height, &their_tip_hash.to_hex()[0..8]);
let tx = self.inner_request_tx.clone();
let squad = self.config.squad.clone();
let network_peer_store = self.network_peer_store.clone();
Expand All @@ -1616,23 +1634,29 @@ where S: ShareChain
tokio::spawn(async move {
blocks.sort_by(|a, b| a.height.cmp(&b.height));
let last_block_from_them = blocks.last().map(|b| (b.height, b.hash));
let mut new_tip = ChainAddResult::default();
let mut blocks_added = Vec::new();
// let mut new_tip = ChainAddResult::default();
// let mut blocks_added = Vec::new();
for b in &blocks {
match share_chain.add_synced_blocks(&[b.clone()]).await {
Ok(result) => {
blocks_added.push(format!("{}({})", b.height, &b.hash.to_hex()[0..8]));
new_tip.combine(result);
},
Err(error) => {
error!(target: SYNC_REQUEST_LOG_TARGET, squad; "Failed to add Catchup synced blocks to share chain: {error:?}");
network_peer_store
.write()
.await
.move_to_grey_list(peer, format!("Block failed validation: {error}"));
return;
},
}
let message = InnerRequest::AddSyncedBlock {
algo,
block: b.clone(),
source_peer: peer,
};
let _unused = tx.send(message);
// match share_chain.add_synced_blocks(&[b.clone()]).await {
// Ok(result) => {
// blocks_added.push(format!("{}({})", b.height, &b.hash.to_hex()[0..8]));
// new_tip.combine(result);
// },
// Err(error) => {
// error!(target: SYNC_REQUEST_LOG_TARGET, squad; "Failed to add Catchup synced blocks to share
// chain: {error:?}"); network_peer_store
// .write()
// .await
// .move_to_grey_list(peer, format!("Block failed validation: {error}"));
// return;
// },
// }
}
{
if let Some(ref last_block) = last_block_from_them {
Expand All @@ -1641,21 +1665,21 @@ where S: ShareChain
}
}

info!(target: LOG_TARGET, "[{:?}] Blocks via catchup sync added {:?}", algo, blocks_added);
info!(target: LOG_TARGET, "[{:?}] Blocks via catchup sync result {}", algo, new_tip);
let missing_parents = new_tip.into_missing_parents_vec();
if !missing_parents.is_empty() {
let sync_share_chain = SyncMissingBlocks {
algo,
peer,
missing_parents,
is_from_new_block_notify: false,
depth: 0,
};
let _unused = tx.send(InnerRequest::DoSyncMissingBlocks(sync_share_chain));
}

info!(target: SYNC_REQUEST_LOG_TARGET, squad = &squad; "Synced blocks added to share chain");
info!(target: LOG_TARGET, "[{:?}] Blocks via catchup sync added {:?}", algo, blocks.iter().map(|a| format!("{}({:x}{:x}{:x}{:x})",a.height, a.hash[0], a.hash[1], a.hash[2], a.hash[3])).collect::<Vec<String>>());
// info!(target: LOG_TARGET, "[{:?}] Blocks via catchup sync result {}", algo, new_tip);
// let missing_parents = new_tip.into_missing_parents_vec();
// if !missing_parents.is_empty() {
// let sync_share_chain = SyncMissingBlocks {
// algo,
// peer,
// missing_parents,
// is_from_new_block_notify: false,
// depth: 0,
// };
// let _unused = tx.send(InnerRequest::DoSyncMissingBlocks(sync_share_chain));
// }

// info!(target: SYNC_REQUEST_LOG_TARGET, squad = &squad; "Synced blocks added to share chain");
let our_pow = share_chain.get_total_chain_pow().await;
let mut must_continue_sync = their_pow > our_pow.as_u128();
info!(target: SYNC_REQUEST_LOG_TARGET, "[{:?}] must continue: {}", algo, must_continue_sync);
Expand All @@ -1675,21 +1699,13 @@ where S: ShareChain
peer,
last_block_from_them,
their_height,
// their_pow,
permit,
};
let _unused = tx.send(InnerRequest::PerformCatchUpSync(perform_catch_up_sync));
} else {
info!(target: SYNC_REQUEST_LOG_TARGET, "Catch up sync completed for chain {} from {} after {} catchups", algo, peer, num_catchups.map(|s| s.to_string()).unwrap_or_else(|| "None".to_string()));
// this only gets called after sync completes, lets set synced status = true
let (max_known_network_height, max_known_network_pow, peer_with_best) =
peer_store_write_lock.max_known_network_height(algo);

if our_pow.as_u128() >= max_known_network_pow || Some(&peer) == peer_with_best.as_ref() {
info!(target: SYNC_REQUEST_LOG_TARGET, "[{}] our pow is greater than max known network pow, we are now synced", algo);
synced_bool.store(true, std::sync::atomic::Ordering::SeqCst);
} else {
info!(target: SYNC_REQUEST_LOG_TARGET, "[{}] our pow is less than max known network pow, we are not synced, will continue to search for better POW. Best peer is {} at height {}", algo, peer_with_best.map(|p| p.to_base58()).unwrap_or_else(|| "None".to_string()), max_known_network_height);
}

peer_store_write_lock.reset_catch_up_attempts(&peer);
}
if timer.elapsed() > MAX_ACCEPTABLE_P2P_MESSAGE_TIMEOUT {
Expand Down Expand Up @@ -1756,6 +1772,7 @@ where S: ShareChain
peer,
last_block_from_them,
their_height,
// their_pow: _,
mut permit,
} = perform_catch_up_sync;

Expand All @@ -1775,6 +1792,7 @@ where S: ShareChain
self.sha3x_last_sync_requested_block.take(),
),
};
// let our_pow = share_chain.get_total_chain_pow().await;

if in_progress_syncs.contains_key(&peer) {
warn!(target: SYNC_REQUEST_LOG_TARGET, "Already in progress with sync from {}", peer);
Expand All @@ -1789,10 +1807,25 @@ where S: ShareChain
return Ok(());
},
};
} else {
// Check to see if there is a better peer and release the permit if that is the case
// {
// let peer_store_lock = self.network_peer_store.read().await;
// // this only gets called after sync completes, lets set synced status = true
// let (_max_known_network_height, max_known_network_pow, peer_with_best) =
// peer_store_lock.max_known_network_height(algo);

// if let Some(peer_with_best) = peer_with_best {
// if their_pow < max_known_network_pow {
// warn!(target: SYNC_REQUEST_LOG_TARGET, "There is a peer with better POW than {}, so not
// trying to sync with them. Best peer is currently:{}", peer, peer_with_best);
// return Ok(());
// }
// }
// }
}

let permit = permit.unwrap();

let (mut i_have_blocks, last_block_from_them) = match (last_block_from_them, last_progress) {
(None, Some(last_progress)) => {
// this is most likely a new catchup sync request, while the previous attempt failed so we ask with both
Expand Down Expand Up @@ -2053,6 +2086,61 @@ where S: ShareChain
error!(target: LOG_TARGET, squad = &self.config.squad; "Failed to perform catch up sync: {e:?}");
}
},
InnerRequest::AddSyncedBlock {
algo,
block,
source_peer,
} => {
let (share_chain, synced_bool) = match algo {
PowAlgorithm::RandomX => (
self.share_chain_random_x.clone(),
self.are_we_synced_with_randomx_p2pool.clone(),
),
PowAlgorithm::Sha3x => (
self.share_chain_sha3x.clone(),
self.are_we_synced_with_sha3x_p2pool.clone(),
),
};
info!(target: SYNC_REQUEST_LOG_TARGET, "Adding block {}({:x}{:x}{:x}{:x}) to share chain from peer {}", block.height, block.hash[0], block.hash[1], block.hash[2], block.hash[3], source_peer);
match share_chain.add_synced_blocks(&[block]).await {
Ok(result) => {
info!(target: LOG_TARGET, "[{:?}] Blocks via catchup sync result {}", algo, result);
let missing_parents = result.into_missing_parents_vec();
if !missing_parents.is_empty() {
let sync_share_chain = SyncMissingBlocks {
algo,
peer: source_peer,
missing_parents,
is_from_new_block_notify: false,
depth: 0,
};
let tx = self.inner_request_tx.clone();
let _unused = tx.send(InnerRequest::DoSyncMissingBlocks(sync_share_chain));
}
let our_pow = share_chain.get_total_chain_pow().await;
let peer_store_lock = self.network_peer_store.read().await;
// this only gets called after sync completes, lets set synced status = true
let (max_known_network_height, max_known_network_pow, peer_with_best) =
peer_store_lock.max_known_network_height(algo);

if our_pow.as_u128() >= max_known_network_pow {
info!(target: SYNC_REQUEST_LOG_TARGET, "[{}] our pow is greater than max known network pow, we are now synced", algo);
synced_bool.store(true, std::sync::atomic::Ordering::SeqCst);
} else {
info!(target: SYNC_REQUEST_LOG_TARGET, "[{}] our pow is less than max known network pow, we are not synced, will continue to search for better POW. Best peer is {} at height {}", algo, peer_with_best.map(|p| p.to_base58()).unwrap_or_else(|| "None".to_string()), max_known_network_height);
}
},
Err(error) => {
error!(target: SYNC_REQUEST_LOG_TARGET, "Failed to add Catchup synced blocks to share
chain: {error:?}");
// network_peer_store
// .write()
// .await
// .move_to_grey_list(peer, format!("Block failed validation: {error}"));
},
}
// info!(target: LOG_TARGET, "[{:?}] Blocks via catchup sync added {:?}", algo, blocks_added);
},
}
}

Expand Down Expand Up @@ -2093,6 +2181,7 @@ where S: ShareChain
tokio::pin!(seek_connections_interval);

loop {
// info!(target: LOG_TARGET, "P2P service main loop iter");
select! {
// biased;
_ = &mut shutdown_signal => {
Expand Down
3 changes: 3 additions & 0 deletions src/server/p2p/peer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ impl PeerStore {
if record.last_ping.map(|t| t.as_u64() < now - 60).unwrap_or(true) {
continue;
}
if record.peer_info.timestamp < now - 60 {
continue;
}
match algo {
PowAlgorithm::RandomX => {
let achieved_pow = record.peer_info.current_random_x_pow;
Expand Down
12 changes: 5 additions & 7 deletions src/sharechain/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,22 +321,20 @@ impl InMemoryShareChain {
fn all_blocks_with_lock(
&self,
p2_chain: &RwLockReadGuard<'_, P2Chain<LmdbBlockStorage>>,
start_height: Option<u64>,
mut start_height: Option<u64>,
page_size: usize,
main_chain_only: bool,
) -> Result<Vec<Arc<P2Block>>, ShareChainError> {
let mut res = Vec::with_capacity(page_size);
let mut num_actual_blocks = 0;
let lowest_height = p2_chain.lowest_chain_level_height().unwrap_or(0);
if start_height.unwrap_or(0) < lowest_height {
start_height = Some(lowest_height);
}
let mut level = if let Some(level) = p2_chain.level_at_height(start_height.unwrap_or(0)) {
level
} else {
// we dont have that block, see if we have a higher lowest block than they are asking for and start there
if start_height.unwrap_or(0) < lowest_height {
p2_chain.level_at_height(lowest_height).unwrap()
} else {
return Ok(res);
}
return Ok(res);
};

loop {
Expand Down
Loading