From 6d1f8c991e6dc6b512d7e95c38f55507fdff4546 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 30 May 2024 07:16:03 +0000 Subject: [PATCH] test: add tests for `WalEntryDistributor` --- src/mito2/src/test_util/wal_util.rs | 46 +++- src/mito2/src/wal/entry_distributor.rs | 282 ++++++++++++++++++++++++- 2 files changed, 312 insertions(+), 16 deletions(-) diff --git a/src/mito2/src/test_util/wal_util.rs b/src/mito2/src/test_util/wal_util.rs index 0de04a215314..823242faae23 100644 --- a/src/mito2/src/test_util/wal_util.rs +++ b/src/mito2/src/test_util/wal_util.rs @@ -12,11 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use async_stream::stream; +use api::v1::WalEntry; use futures::stream; -use store_api::logstore::entry::Entry; +use prost::Message; +use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader}; use store_api::logstore::provider::Provider; use store_api::logstore::EntryId; +use store_api::storage::RegionId; use crate::error::Result; use crate::wal::raw_entry_reader::{EntryStream, RawEntryReader}; @@ -26,9 +28,41 @@ pub(crate) struct MockRawEntryStream { } impl RawEntryReader for MockRawEntryStream { - fn read(&self, ns: &Provider, start_id: EntryId) -> Result> { - let entries = self.entries.clone().into_iter().map(Ok); - - Ok(Box::pin(stream::iter(entries))) + fn read(&self, _ns: &Provider, _start_id: EntryId) -> Result> { + Ok(Box::pin(stream::iter( + self.entries.clone().into_iter().map(Ok), + ))) } } + +/// Puts an incomplete [`Entry`] at the end of `input`. +pub(crate) fn generate_tail_corrupted_stream( + provider: Provider, + region_id: RegionId, + input: &WalEntry, + num_parts: usize, +) -> Vec { + let encoded_entry = input.encode_to_vec(); + let parts = encoded_entry + .chunks(encoded_entry.len() / num_parts) + .map(Into::into) + .collect::>(); + + vec![ + Entry::MultiplePart(MultiplePartEntry { + provider: provider.clone(), + region_id, + entry_id: 0, + headers: vec![MultiplePartHeader::First, MultiplePartHeader::Last], + parts, + }), + // The tail corrupted data. + Entry::MultiplePart(MultiplePartEntry { + provider: provider.clone(), + region_id, + entry_id: 0, + headers: vec![MultiplePartHeader::First], + parts: vec![vec![1; 100]], + }), + ] +} diff --git a/src/mito2/src/wal/entry_distributor.rs b/src/mito2/src/wal/entry_distributor.rs index 62a79a35ddf1..5c261bbcc766 100644 --- a/src/mito2/src/wal/entry_distributor.rs +++ b/src/mito2/src/wal/entry_distributor.rs @@ -46,16 +46,18 @@ pub(crate) struct WalEntryDistributor { impl WalEntryDistributor { /// Distributes entries to specific [WalEntryReceiver]s based on [RegionId]. pub async fn distribute(mut self) -> Result<()> { - let mut arg_receivers = std::mem::take(&mut self.arg_receivers); - let args_futures = arg_receivers.iter_mut().map(|(region_id, receiver)| async { - { - if let Ok(start_id) = receiver.await { - (*region_id, Some(start_id)) - } else { - (*region_id, None) + let args_futures = self + .arg_receivers + .iter_mut() + .map(|(region_id, receiver)| async { + { + if let Ok(start_id) = receiver.await { + (*region_id, Some(start_id)) + } else { + (*region_id, None) + } } - } - }); + }); let args = join_all(args_futures) .await .into_iter() @@ -106,7 +108,9 @@ impl WalEntryDistributor { #[derive(Debug)] pub(crate) struct WalEntryReceiver { region_id: RegionId, + /// Receives the [Entry] from the [WalEntryDistributor]. entry_receiver: UnboundedReceiver, + /// Sends the `start_id` to the [WalEntryDistributor]. arg_sender: oneshot::Sender, } @@ -218,12 +222,16 @@ pub fn build_wal_entry_distributor_and_receivers( #[cfg(test)] mod tests { + use std::assert_matches::assert_matches; + use api::v1::{Mutation, OpType}; + use futures::executor::EnterError; use futures::{stream, TryStreamExt}; use prost::Message; - use store_api::logstore::entry::{Entry, NaiveEntry}; + use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader, NaiveEntry}; use super::*; + use crate::test_util::wal_util::generate_tail_corrupted_stream; use crate::wal::raw_entry_reader::{EntryStream, RawEntryReader}; use crate::wal::EntryId; @@ -372,4 +380,258 @@ mod tests { )] ); } + + #[tokio::test] + async fn test_tail_corrupted_stream() { + let mut entries = vec![]; + let region1 = RegionId::new(1, 1); + let region1_expected_wal_entry = WalEntry { + mutations: vec![Mutation { + op_type: OpType::Put as i32, + sequence: 1u64, + rows: None, + }], + }; + let region2 = RegionId::new(1, 2); + let region2_expected_wal_entry = WalEntry { + mutations: vec![Mutation { + op_type: OpType::Put as i32, + sequence: 3u64, + rows: None, + }], + }; + let region3 = RegionId::new(1, 3); + let region3_expected_wal_entry = WalEntry { + mutations: vec![Mutation { + op_type: OpType::Put as i32, + sequence: 3u64, + rows: None, + }], + }; + let provider = Provider::kafka_provider("my_topic".to_string()); + entries.extend(generate_tail_corrupted_stream( + provider.clone(), + region1, + ®ion1_expected_wal_entry, + 3, + )); + entries.extend(generate_tail_corrupted_stream( + provider.clone(), + region2, + ®ion2_expected_wal_entry, + 2, + )); + entries.extend(generate_tail_corrupted_stream( + provider.clone(), + region3, + ®ion3_expected_wal_entry, + 4, + )); + + let corrupted_stream = MockRawEntryReader { entries }; + // Builds distributor and receivers + let (distributor, mut receivers) = build_wal_entry_distributor_and_receivers( + provider.clone(), + Arc::new(corrupted_stream), + vec![region1, region2, region3], + ); + assert_eq!(receivers.len(), 3); + let mut streams = receivers + .into_iter() + .map(|receiver| receiver.read(&provider, 0).unwrap()) + .collect::>(); + distributor.distribute().await.unwrap(); + + assert_eq!( + streams + .get_mut(0) + .unwrap() + .try_collect::>() + .await + .unwrap(), + vec![(0, region1_expected_wal_entry)] + ); + + assert_eq!( + streams + .get_mut(1) + .unwrap() + .try_collect::>() + .await + .unwrap(), + vec![(0, region2_expected_wal_entry)] + ); + + assert_eq!( + streams + .get_mut(2) + .unwrap() + .try_collect::>() + .await + .unwrap(), + vec![(0, region3_expected_wal_entry)] + ); + } + + #[tokio::test] + async fn test_part_corrupted_stream() { + let mut entries = vec![]; + let region1 = RegionId::new(1, 1); + let region1_expected_wal_entry = WalEntry { + mutations: vec![Mutation { + op_type: OpType::Put as i32, + sequence: 1u64, + rows: None, + }], + }; + let region2 = RegionId::new(1, 2); + let provider = Provider::kafka_provider("my_topic".to_string()); + entries.extend(generate_tail_corrupted_stream( + provider.clone(), + region1, + ®ion1_expected_wal_entry, + 3, + )); + entries.extend(vec![ + // The corrupted data. + Entry::MultiplePart(MultiplePartEntry { + provider: provider.clone(), + region_id: region2, + entry_id: 0, + headers: vec![MultiplePartHeader::First], + parts: vec![vec![1; 100]], + }), + Entry::MultiplePart(MultiplePartEntry { + provider: provider.clone(), + region_id: region2, + entry_id: 0, + headers: vec![MultiplePartHeader::First], + parts: vec![vec![1; 100]], + }), + ]); + + let corrupted_stream = MockRawEntryReader { entries }; + // Builds distributor and receivers + let (distributor, mut receivers) = build_wal_entry_distributor_and_receivers( + provider.clone(), + Arc::new(corrupted_stream), + vec![region1, region2], + ); + assert_eq!(receivers.len(), 2); + let mut streams = receivers + .into_iter() + .map(|receiver| receiver.read(&provider, 0).unwrap()) + .collect::>(); + distributor.distribute().await.unwrap(); + assert_eq!( + streams + .get_mut(0) + .unwrap() + .try_collect::>() + .await + .unwrap(), + vec![(0, region1_expected_wal_entry)] + ); + + assert_matches!( + streams + .get_mut(1) + .unwrap() + .try_collect::>() + .await + .unwrap_err(), + error::Error::CorruptedEntry { .. } + ); + } + + #[tokio::test] + async fn test_wal_entry_receiver_start_id() { + let provider = Provider::kafka_provider("my_topic".to_string()); + let reader = Arc::new(MockRawEntryReader::new(vec![ + Entry::Naive(NaiveEntry { + provider: provider.clone(), + region_id: RegionId::new(1024, 1), + entry_id: 1, + data: WalEntry { + mutations: vec![Mutation { + op_type: OpType::Put as i32, + sequence: 1u64, + rows: None, + }], + } + .encode_to_vec(), + }), + Entry::Naive(NaiveEntry { + provider: provider.clone(), + region_id: RegionId::new(1024, 2), + entry_id: 2, + data: WalEntry { + mutations: vec![Mutation { + op_type: OpType::Put as i32, + sequence: 2u64, + rows: None, + }], + } + .encode_to_vec(), + }), + Entry::Naive(NaiveEntry { + provider: provider.clone(), + region_id: RegionId::new(1024, 1), + entry_id: 3, + data: WalEntry { + mutations: vec![Mutation { + op_type: OpType::Put as i32, + sequence: 3u64, + rows: None, + }], + } + .encode_to_vec(), + }), + Entry::Naive(NaiveEntry { + provider: provider.clone(), + region_id: RegionId::new(1024, 2), + entry_id: 4, + data: WalEntry { + mutations: vec![Mutation { + op_type: OpType::Put as i32, + sequence: 4u64, + rows: None, + }], + } + .encode_to_vec(), + }), + ])); + + // Builds distributor and receivers + let (distributor, mut receivers) = build_wal_entry_distributor_and_receivers( + provider.clone(), + reader, + vec![RegionId::new(1024, 1), RegionId::new(1024, 2)], + ); + assert_eq!(receivers.len(), 2); + let mut streams = receivers + .into_iter() + .map(|receiver| receiver.read(&provider, 4).unwrap()) + .collect::>(); + distributor.distribute().await.unwrap(); + + assert_eq!( + streams + .get_mut(1) + .unwrap() + .try_collect::>() + .await + .unwrap(), + vec![( + 4, + WalEntry { + mutations: vec![Mutation { + op_type: OpType::Put as i32, + sequence: 4u64, + rows: None, + }], + } + )] + ); + } }