diff --git a/src/protocols/filter/components/block_filters_process.rs b/src/protocols/filter/components/block_filters_process.rs index 22f39c9..267bb4a 100644 --- a/src/protocols/filter/components/block_filters_process.rs +++ b/src/protocols/filter/components/block_filters_process.rs @@ -6,7 +6,7 @@ use ckb_network::{CKBProtocolContext, PeerIndex}; use ckb_types::core::BlockNumber; use ckb_types::utilities::calc_filter_hash; use ckb_types::{packed, prelude::*}; -use log::{info, trace, warn}; +use log::{debug, info, trace, warn}; use rand::seq::SliceRandom; use std::{cmp, sync::Arc}; @@ -56,8 +56,16 @@ impl<'a> BlockFiltersProcess<'a> { let block_filters = self.message.to_entity(); let start_number: BlockNumber = block_filters.start_number().unpack(); + let filters_count = block_filters.filters().len(); + let blocks_count = block_filters.block_hashes().len(); + + debug!( + "recieved block filters: start number: {start_number}, \ + filters count: {filters_count}, blocks count: {blocks_count}." + ); let min_filtered_block_number = self.filter.storage.get_min_filtered_block_number(); + debug!("current min filtered block number: {min_filtered_block_number}"); if min_filtered_block_number + 1 != start_number { info!( "ignoring, the start_number of block_filters message {} is not continuous with min_filtered_block_number: {}", @@ -73,9 +81,6 @@ impl<'a> BlockFiltersProcess<'a> { return Status::ok(); } - let filters_count = block_filters.filters().len(); - let blocks_count = block_filters.block_hashes().len(); - if filters_count != blocks_count { let error_message = format!( "filters length ({}) not equal to block_hashes length ({})", @@ -156,6 +161,17 @@ impl<'a> BlockFiltersProcess<'a> { (finalized_check_point_hash, latest_block_filter_hashes) } else { let start_index = (start_number - finalized_check_point_number) as usize - 2; + if start_index >= latest_block_filter_hashes.len() { + info!( + "ignoring, no enough data for block filter hashes, \ + finalized number: {finalized_check_point_number}, \ + finalized index: {finalized_check_point_index}, \ + start number: {start_number}, start index: {start_index}, \ + filter hashes length {}", + latest_block_filter_hashes.len() + ); + return Status::ok(); + } let parent_hash = latest_block_filter_hashes[start_index].clone(); latest_block_filter_hashes.drain(..=start_index); (parent_hash, latest_block_filter_hashes) diff --git a/src/protocols/light_client/peers.rs b/src/protocols/light_client/peers.rs index 8cc6072..d3c31a5 100644 --- a/src/protocols/light_client/peers.rs +++ b/src/protocols/light_client/peers.rs @@ -39,7 +39,12 @@ pub struct Peers { // - Include at the next cached check point. cached_block_filter_hashes: RwLock<(u32, Vec)>, + #[cfg(not(test))] max_outbound_peers: u32, + + #[cfg(test)] + max_outbound_peers: RwLock, + check_point_interval: BlockNumber, start_check_point: (u32, packed::Byte32), } @@ -1105,6 +1110,9 @@ impl Peers { check_point_interval: BlockNumber, start_check_point: (u32, packed::Byte32), ) -> Self { + #[cfg(test)] + let max_outbound_peers = RwLock::new(max_outbound_peers); + Self { inner: Default::default(), last_headers: Default::default(), @@ -1231,10 +1239,21 @@ impl Peers { &self.matched_blocks } + #[cfg(not(test))] pub(crate) fn get_max_outbound_peers(&self) -> u32 { self.max_outbound_peers } + #[cfg(test)] + pub(crate) fn get_max_outbound_peers(&self) -> u32 { + *self.max_outbound_peers.read().expect("poisoned") + } + + #[cfg(test)] + pub(crate) fn set_max_outbound_peers(&self, max_outbound_peers: u32) { + *self.max_outbound_peers.write().expect("poisoned") = max_outbound_peers; + } + pub(crate) fn add_peer(&self, index: PeerIndex) { let peer = Peer::new(self.check_point_interval, self.start_check_point.clone()); self.inner.insert(index, peer); diff --git a/src/tests/protocols/block_filter.rs b/src/tests/protocols/block_filter.rs index 616f1ed..1532bbb 100644 --- a/src/tests/protocols/block_filter.rs +++ b/src/tests/protocols/block_filter.rs @@ -726,3 +726,96 @@ async fn test_block_filter_notify_recover_matched_blocks() { ] ); } + +#[tokio::test(flavor = "multi_thread")] +async fn test_block_filter_without_enough_hashes() { + setup(); + + let chain = MockChain::new_with_dummy_pow("test-block-filter").start(); + let nc = MockNetworkContext::new(SupportProtocols::Filter); + + let min_filtered_block_number = 30; + let start_number = min_filtered_block_number + 1; + let proved_number = start_number + 5; + let script = Script::new_builder() + .code_hash(H256(rand::random()).pack()) + .build(); + chain.client_storage().update_filter_scripts( + vec![ScriptStatus { + script: script.clone(), + script_type: ScriptType::Lock, + block_number: 0, + }], + SetScriptsCommand::All, + ); + chain + .client_storage() + .update_min_filtered_block_number(min_filtered_block_number); + + chain.mine_to(start_number - 3); + + { + let tx = { + let tx = chain.get_cellbase_as_input(start_number - 5); + let output = tx.output(0).unwrap().as_builder().lock(script).build(); + tx.as_advanced_builder().set_outputs(vec![output]).build() + }; + chain.mine_block(|block| { + let ids = vec![tx.proposal_short_id()]; + block.as_advanced_builder().proposals(ids).build() + }); + chain.mine_blocks(1); + chain.mine_block(|block| block.as_advanced_builder().transaction(tx.clone()).build()); + chain.mine_blocks(1); + } + + chain.mine_to(proved_number); + + let snapshot = chain.shared().snapshot(); + + let tip_header: VerifiableHeader = snapshot + .get_verifiable_header_by_number(proved_number) + .expect("block stored") + .into(); + chain + .client_storage() + .update_last_state(&U256::one(), &tip_header.header().data(), &[]); + + let peer_index = PeerIndex::new(3); + let peers = { + let peers = chain.create_peers(); + peers.add_peer(peer_index); + peers.mock_prove_state(peer_index, tip_header).unwrap(); + peers.set_max_outbound_peers(3); + peers + }; + + let filter_data_1 = snapshot.get_block_filter_data(start_number).unwrap(); + let filter_data_2 = snapshot.get_block_filter_data(start_number + 1).unwrap(); + let block_hash_1 = snapshot.get_block_hash(start_number).unwrap(); + let block_hash_2 = snapshot.get_block_hash(start_number + 1).unwrap(); + let filter_hashes = { + let mut filter_hashes = snapshot + .get_block_filter_hashes_until(start_number + 3) + .unwrap(); + filter_hashes.remove(0); + filter_hashes + }; + + let content = packed::BlockFilters::new_builder() + .start_number(start_number.pack()) + .block_hashes(vec![block_hash_1.clone(), block_hash_2].pack()) + .filters(vec![filter_data_1, filter_data_2].pack()) + .build(); + let message = packed::BlockFilterMessage::new_builder() + .set(content) + .build() + .as_bytes(); + + let mut protocol = chain.create_filter_protocol(Arc::clone(&peers)); + peers.mock_latest_block_filter_hashes(peer_index, 0, filter_hashes); + protocol.received(nc.context(), peer_index, message).await; + assert!(nc.not_banned(peer_index)); + + assert!(nc.sent_messages().borrow().is_empty()); +}