Skip to content

Commit

Permalink
fix duplicate requests
Browse files Browse the repository at this point in the history
clarify logs
  • Loading branch information
SWvheerden committed Nov 22, 2024
1 parent 9173b62 commit b441839
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 33 deletions.
37 changes: 17 additions & 20 deletions src/server/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,6 @@ where S: ShareChain
return Ok(MessageAcceptance::Ignore);
}

// if payload.
let max_payload_height = payload
.new_blocks
.iter()
Expand Down Expand Up @@ -677,7 +676,7 @@ where S: ShareChain
algo,
peer: propagation_source,
missing_parents,
is_from_new_block_notify: false,
is_from_new_block_notify: true,
};

let _ = self.inner_request_tx.send(InnerRequest::DoSyncChain(sync_share_chain));
Expand Down Expand Up @@ -950,15 +949,15 @@ where S: ShareChain
let timer = Instant::now();
// if !self.sync_in_progress.load(Ordering::SeqCst) {
// return;
// }
// },
let algo = response.algo();
let share_chain = match algo {
PowAlgorithm::RandomX => self.share_chain_random_x.clone(),
PowAlgorithm::Sha3x => self.share_chain_sha3x.clone(),
};
let blocks: Vec<_> = response.into_blocks().into_iter().map(Arc::new).collect();
let squad = self.config.squad.clone();
info!(target: LOG_TARGET, squad; "Received sync response for chain {} from {} with blocks {}", algo, peer, blocks.iter().map(|a| a.height.to_string()).join(", "));
info!(target: LOG_TARGET, squad; "Received sync response for chain {} from {} with blocks {:?}", algo, peer, blocks.iter().map(|a| format!("{}({:x}{:x}{:x}{:x})",a.height.to_string(), a.hash[0], a.hash[1], a.hash[2], a.hash[3])).collect::<Vec<String>>());
let tx = self.inner_request_tx.clone();
let peer_store = self.network_peer_store.clone();
tokio::spawn(async move {
Expand All @@ -967,18 +966,17 @@ where S: ShareChain
info!(target: LOG_TARGET, squad; "[{:?}] Synced blocks added to share chain, new tip added [{}]",algo, new_tip);
info!(target: LOG_TARGET, squad; "[{:?}] blocks for the following heights where added : {:?}", algo, blocks.iter().map(|block| block.height.to_string()).collect::<Vec<String>>());
},
// Err(crate::sharechain::error::ShareChainError::BlockParentDoesNotExist { missing_parents }) => {
// // TODO: We should not keep asking for missing blocks otherwise we can get into an infinite loop
// of // asking. let sync_share_chain = SyncShareChain {
// // algo,
// // peer,
// // missing_parents,
// // is_from_new_block_notify: false,
// // };

// // let _ = tx.send(InnerRequest::DoSyncChain(sync_share_chain));
// return;
// },
Err(crate::sharechain::error::ShareChainError::BlockParentDoesNotExist { missing_parents }) => {
info!(target: LOG_TARGET, squad; "[{:?}] missing parents {:?}", algo, missing_parents.iter().map(|(height, hash)| format!("{}({:x}{:x}{:x}{:x})",height.to_string(), hash[0], hash[1], hash[2], hash[3])).collect::<Vec<String>>());
let sync_share_chain = SyncShareChain {
algo,
peer,
missing_parents,
is_from_new_block_notify: false,
};

let _ = tx.send(InnerRequest::DoSyncChain(sync_share_chain));
},
Err(error) => {
error!(target: LOG_TARGET, squad; "Failed to add synced blocks to share chain: {error:?}");
peer_store
Expand Down Expand Up @@ -1016,11 +1014,10 @@ where S: ShareChain
drop(peer_store_read_lock);

if is_from_new_block_notify {
info!(target: SYNC_REQUEST_LOG_TARGET, "[{}] Sending sync to {} for blocks {} from notify", algo, peer, missing_parents.iter().map(|a| a.0.to_string()).join(", "));
info!(target: SYNC_REQUEST_LOG_TARGET, "[{}] Sending sync to connected peers for blocks {:?} from notify", algo, missing_parents.iter().map(|(height, hash)|format!("{}({:x}{:x}{:x}{:x})",height.to_string(), hash[0], hash[1], hash[2], hash[3])).collect::<Vec<String>>());
} else {
info!(target: SYNC_REQUEST_LOG_TARGET, "[{}] Sending sync to {} for blocks {} from catchup", algo, peer, missing_parents.iter().map(|a| a.0.to_string()).join(", "));
info!(target: SYNC_REQUEST_LOG_TARGET, "[{}] Sending sync to connected peers for blocks {:?} from sync", algo, missing_parents.iter().map(|(height, hash)|format!("{}({:x}{:x}{:x}{:x})",height.to_string(), hash[0], hash[1], hash[2], hash[3])).collect::<Vec<String>>());
}

if missing_parents.is_empty() {
warn!(target: LOG_TARGET, squad = &self.config.squad; "Sync called but with no missing parents.");
// panic!("Sync called but with no missing parents.");
Expand Down Expand Up @@ -1456,7 +1453,7 @@ where S: ShareChain
for b in &blocks {
match share_chain.add_synced_blocks(&[b.clone()]).await {
Ok(result) => {
info!(target: LOG_TARGET, "Block added {}:{}:{}", algo, b.height, &b.hash.to_hex()[0..8]);
info!(target: LOG_TARGET, "[new tip: {}] Block added {}:{}:{}", result, algo, b.height, &b.hash.to_hex()[0..8]);
},
Err(error) => match error {
crate::sharechain::error::ShareChainError::BlockParentDoesNotExist { missing_parents } => {
Expand Down
26 changes: 13 additions & 13 deletions src/sharechain/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,15 @@ impl InMemoryShareChain {
) -> Result<bool, ShareChainError> {
let new_block_p2pool_height = block.height;

if p2_chain.get_tip().is_none() || block.height == 0 || syncing {
// Check if already added.
if let Some(level) = p2_chain.level_at_height(new_block_p2pool_height) {
if level.blocks.contains_key(&block.hash) {
info!(target: LOG_TARGET, "[{:?}] ✅ Block already added: {:?}", self.pow_algo, block.height);
return Ok(false);
}
}

if p2_chain.get_tip().is_none() {
let _validate_result = self.validate_claimed_difficulty(&block, params).await?;
return p2_chain.add_block_to_chain(block.clone());
}
Expand All @@ -193,14 +201,6 @@ impl InMemoryShareChain {
));
}

// Check if already added.
if let Some(level) = p2_chain.level_at_height(new_block_p2pool_height) {
if level.blocks.contains_key(&block.hash) {
info!(target: LOG_TARGET, "[{:?}] ✅ Block already added: {:?}", self.pow_algo, block.height);
return Ok(false);
}
}

// validate
self.validate_block(&block).await?;
let _validate_result = self.validate_claimed_difficulty(&block, params).await?;
Expand Down Expand Up @@ -432,19 +432,19 @@ impl ShareChain for InMemoryShareChain {
{
Ok(tip_change) => {
debug!(target: LOG_TARGET, "[{:?}] ✅ added Block: {:?} successfully. Tip changed: {}", self.pow_algo, height, tip_change);
new_tip = tip_change;
if tip_change {
new_tip = true;
}
},
Err(e) => {
if let ShareChainError::BlockParentDoesNotExist {
missing_parents: new_missing_parents,
} = e
{
let mut missing_heights = Vec::new();
for new_missing_parent in new_missing_parents {
if known_blocks_incoming.contains(&new_missing_parent.1) {
continue;
}
missing_heights.push(new_missing_parent.0);
missing_parents.insert(new_missing_parent.1, new_missing_parent.0);
if missing_parents.len() > MAX_MISSING_PARENTS {
break 'outer;
Expand All @@ -464,7 +464,7 @@ impl ShareChain for InMemoryShareChain {
);

if !missing_parents.is_empty() {
info!(target: LOG_TARGET, "[{:?}] Missing blocks for the following heights: {:?}", self.pow_algo, missing_parents.values().map(|height| height.to_string()).collect::<Vec<String>>());
info!(target: LOG_TARGET, "[{:?}] Missing blocks for the following heights: {:?}", self.pow_algo, missing_parents.iter().map(|(hash,height)| format!("{}({:x}{:x}{:x}{:x})",height.to_string(), hash[0], hash[1], hash[2], hash[3])).collect::<Vec<String>>());
return Err(ShareChainError::BlockParentDoesNotExist {
missing_parents: missing_parents
.into_iter()
Expand Down

0 comments on commit b441839

Please sign in to comment.