Skip to content

Commit

Permalink
fix: disconnected and connected block events emitting order for reorg…
Browse files Browse the repository at this point in the history
… events
  • Loading branch information
oleonardolima committed Jul 6, 2022
1 parent 9ce981b commit a0106af
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 27 deletions.
38 changes: 17 additions & 21 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,35 +90,30 @@ impl BlockHeadersCache {
._get_block(common_ancestor.prev_blockhash)
.await?;
}
log::debug!("[common_ancestor] {:?}", common_ancestor);
log::debug!("[fork_branch] {:?}", fork_branch);
Ok((common_ancestor, fork_branch))
}

/// Rollback active chain in [`BlockHeadersCache`] to passed block
/// Rollback active chain in [`BlockHeadersCache`] back to passed block
///
/// Returns all stale, and to be disconnected blocks as a `VecDeque<BlockExtended>`
pub async fn rollback_active_chain(
&mut self,
block: BlockExtended,
) -> anyhow::Result<VecDeque<BlockExtended>> {
let mut all_disconnected = VecDeque::new();
let mut disconnected = VecDeque::new();
while block.id != self.tip {
let (disconnected_hash, disconnected_header) =
self.active_headers.remove_entry(&self.tip).unwrap();
all_disconnected.push_back(disconnected_header);
self.stale_headers
.insert(disconnected_hash, disconnected_header);
self.tip = disconnected_header.prev_blockhash;
let (stale_hash, stale_header) = self.active_headers.remove_entry(&self.tip).unwrap();
disconnected.push_back(stale_header);

self.stale_headers.insert(stale_hash, stale_header);
self.tip = stale_header.prev_blockhash;
}
log::info!("[all_disconnected] {:?}", all_disconnected);
log::info!("[self.tip] {:?}", self.tip);
Ok(all_disconnected)
Ok(disconnected)
}

/// Apply fork branch to active chain, and update tip to new `BlockExtended`
///
/// Returns the new tip `BlockHash`
/// Returns the new tip `BlockHash`, and the connected blocks as a `VecDeque<BlockExtended>`
pub fn apply_fork_chain(
&mut self,
mut fork_branch: VecDeque<BlockExtended>,
Expand All @@ -127,6 +122,7 @@ impl BlockHeadersCache {
while !fork_branch.is_empty() {
let block = fork_branch.pop_front().unwrap();
connected.push_back(block);

self.active_headers.insert(block.id, block);
self.tip = block.id;
}
Expand Down Expand Up @@ -198,19 +194,19 @@ async fn process_candidates(

// rollback current active chain, moving blocks to staled field
// yields BlockEvent::Disconnected((u32, BlockHash))
for disconnected in cache.rollback_active_chain(common_ancestor).await.unwrap().iter() {
yield BlockEvent::Disconnected((disconnected.height, disconnected.id));
let mut disconnected: VecDeque<BlockExtended> = cache.rollback_active_chain(common_ancestor).await.unwrap();
while !disconnected.is_empty() {
let block: BlockExtended = disconnected.pop_back().unwrap();
yield BlockEvent::Disconnected((block.height, block.id));
}
// TODO: (@leonardo.lima) fix order of return

// iterate over forked chain candidates
// update [`Cache`] active_headers field with candidates
let (_, connected) = cache.apply_fork_chain(fork_chain).unwrap();
for block in connected {
let (_, mut connected) = cache.apply_fork_chain(fork_chain).unwrap();
while !connected.is_empty() {
let block = connected.pop_back().unwrap();
yield BlockEvent::Connected(BlockHeader::from(block.clone()));
}
// TODO: (@leonardo.lima fix order of return)

}
};
Ok(stream)
Expand Down
1 change: 0 additions & 1 deletion src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ pub async fn subscribe_to_blocks(
continue
}
let res_msg: MempoolSpaceWebSocketMessage = serde_json::from_str(&text).unwrap();
log::debug!("[res_msg.block] {:?}", res_msg.block);
yield res_msg.block;
},
Message::Close(_) => {
Expand Down
6 changes: 1 addition & 5 deletions tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,23 +460,19 @@ async fn test_block_events_stream_with_reorg() {
rpc_client.invalidate_block(block).unwrap();
invalidated_blocks.push_front(block);
}
log::debug!("invalidated_blocks {:?}", invalidated_blocks);

// generate 2 new blocks
let mut new_blocks = VecDeque::from(
rpc_client
.generate_to_address(3, &rpc_client.get_new_address(None, None).unwrap())
.unwrap(),
);
log::debug!("new_blocks {:?}", new_blocks);

// should disconnect invalidated blocks
while !invalidated_blocks.is_empty() {
log::info!("len {:?}", invalidated_blocks.len());
let invalidated = invalidated_blocks.pop_front().unwrap();
let invalidated = invalidated_blocks.pop_back().unwrap();
let block_event = block_events.next().await.unwrap();

log::info!("{:?}", block_event);
// should produce a BlockEvent::Connected result for each block event
assert!(matches!(block_event, BlockEvent::Disconnected(..)));

Expand Down

0 comments on commit a0106af

Please sign in to comment.