Skip to content

Commit

Permalink
test: add tests for WalEntryDistributor
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed May 30, 2024
1 parent 9b20d49 commit 6d1f8c9
Show file tree
Hide file tree
Showing 2 changed files with 312 additions and 16 deletions.
46 changes: 40 additions & 6 deletions src/mito2/src/test_util/wal_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use async_stream::stream;
use api::v1::WalEntry;
use futures::stream;
use store_api::logstore::entry::Entry;
use prost::Message;
use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader};
use store_api::logstore::provider::Provider;
use store_api::logstore::EntryId;
use store_api::storage::RegionId;

use crate::error::Result;
use crate::wal::raw_entry_reader::{EntryStream, RawEntryReader};
Expand All @@ -26,9 +28,41 @@ pub(crate) struct MockRawEntryStream {
}

impl RawEntryReader for MockRawEntryStream {
fn read(&self, ns: &Provider, start_id: EntryId) -> Result<EntryStream<'static>> {
let entries = self.entries.clone().into_iter().map(Ok);

Ok(Box::pin(stream::iter(entries)))
fn read(&self, _ns: &Provider, _start_id: EntryId) -> Result<EntryStream<'static>> {
Ok(Box::pin(stream::iter(
self.entries.clone().into_iter().map(Ok),
)))
}
}

/// Puts an incomplete [`Entry`] at the end of `input`.
pub(crate) fn generate_tail_corrupted_stream(
provider: Provider,
region_id: RegionId,
input: &WalEntry,
num_parts: usize,
) -> Vec<Entry> {
let encoded_entry = input.encode_to_vec();
let parts = encoded_entry
.chunks(encoded_entry.len() / num_parts)
.map(Into::into)
.collect::<Vec<_>>();

vec![
Entry::MultiplePart(MultiplePartEntry {
provider: provider.clone(),
region_id,
entry_id: 0,
headers: vec![MultiplePartHeader::First, MultiplePartHeader::Last],
parts,
}),
// The tail corrupted data.
Entry::MultiplePart(MultiplePartEntry {
provider: provider.clone(),
region_id,
entry_id: 0,
headers: vec![MultiplePartHeader::First],
parts: vec![vec![1; 100]],
}),
]
}
282 changes: 272 additions & 10 deletions src/mito2/src/wal/entry_distributor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,18 @@ pub(crate) struct WalEntryDistributor {
impl WalEntryDistributor {
/// Distributes entries to specific [WalEntryReceiver]s based on [RegionId].
pub async fn distribute(mut self) -> Result<()> {
let mut arg_receivers = std::mem::take(&mut self.arg_receivers);
let args_futures = arg_receivers.iter_mut().map(|(region_id, receiver)| async {
{
if let Ok(start_id) = receiver.await {
(*region_id, Some(start_id))
} else {
(*region_id, None)
let args_futures = self
.arg_receivers
.iter_mut()
.map(|(region_id, receiver)| async {
{
if let Ok(start_id) = receiver.await {
(*region_id, Some(start_id))
} else {
(*region_id, None)
}
}
}
});
});
let args = join_all(args_futures)
.await
.into_iter()
Expand Down Expand Up @@ -106,7 +108,9 @@ impl WalEntryDistributor {
#[derive(Debug)]
pub(crate) struct WalEntryReceiver {
region_id: RegionId,
/// Receives the [Entry] from the [WalEntryDistributor].
entry_receiver: UnboundedReceiver<Entry>,
/// Sends the `start_id` to the [WalEntryDistributor].
arg_sender: oneshot::Sender<EntryId>,
}

Expand Down Expand Up @@ -218,12 +222,16 @@ pub fn build_wal_entry_distributor_and_receivers(

#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;

use api::v1::{Mutation, OpType};
use futures::executor::EnterError;
use futures::{stream, TryStreamExt};
use prost::Message;
use store_api::logstore::entry::{Entry, NaiveEntry};
use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader, NaiveEntry};

use super::*;
use crate::test_util::wal_util::generate_tail_corrupted_stream;
use crate::wal::raw_entry_reader::{EntryStream, RawEntryReader};
use crate::wal::EntryId;

Expand Down Expand Up @@ -372,4 +380,258 @@ mod tests {
)]
);
}

#[tokio::test]
async fn test_tail_corrupted_stream() {
let mut entries = vec![];
let region1 = RegionId::new(1, 1);
let region1_expected_wal_entry = WalEntry {
mutations: vec![Mutation {
op_type: OpType::Put as i32,
sequence: 1u64,
rows: None,
}],
};
let region2 = RegionId::new(1, 2);
let region2_expected_wal_entry = WalEntry {
mutations: vec![Mutation {
op_type: OpType::Put as i32,
sequence: 3u64,
rows: None,
}],
};
let region3 = RegionId::new(1, 3);
let region3_expected_wal_entry = WalEntry {
mutations: vec![Mutation {
op_type: OpType::Put as i32,
sequence: 3u64,
rows: None,
}],
};
let provider = Provider::kafka_provider("my_topic".to_string());
entries.extend(generate_tail_corrupted_stream(
provider.clone(),
region1,
&region1_expected_wal_entry,
3,
));
entries.extend(generate_tail_corrupted_stream(
provider.clone(),
region2,
&region2_expected_wal_entry,
2,
));
entries.extend(generate_tail_corrupted_stream(
provider.clone(),
region3,
&region3_expected_wal_entry,
4,
));

let corrupted_stream = MockRawEntryReader { entries };
// Builds distributor and receivers
let (distributor, mut receivers) = build_wal_entry_distributor_and_receivers(
provider.clone(),
Arc::new(corrupted_stream),
vec![region1, region2, region3],
);
assert_eq!(receivers.len(), 3);
let mut streams = receivers
.into_iter()
.map(|receiver| receiver.read(&provider, 0).unwrap())
.collect::<Vec<_>>();
distributor.distribute().await.unwrap();

assert_eq!(
streams
.get_mut(0)
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap(),
vec![(0, region1_expected_wal_entry)]
);

assert_eq!(
streams
.get_mut(1)
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap(),
vec![(0, region2_expected_wal_entry)]
);

assert_eq!(
streams
.get_mut(2)
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap(),
vec![(0, region3_expected_wal_entry)]
);
}

#[tokio::test]
async fn test_part_corrupted_stream() {
let mut entries = vec![];
let region1 = RegionId::new(1, 1);
let region1_expected_wal_entry = WalEntry {
mutations: vec![Mutation {
op_type: OpType::Put as i32,
sequence: 1u64,
rows: None,
}],
};
let region2 = RegionId::new(1, 2);
let provider = Provider::kafka_provider("my_topic".to_string());
entries.extend(generate_tail_corrupted_stream(
provider.clone(),
region1,
&region1_expected_wal_entry,
3,
));
entries.extend(vec![
// The corrupted data.
Entry::MultiplePart(MultiplePartEntry {
provider: provider.clone(),
region_id: region2,
entry_id: 0,
headers: vec![MultiplePartHeader::First],
parts: vec![vec![1; 100]],
}),
Entry::MultiplePart(MultiplePartEntry {
provider: provider.clone(),
region_id: region2,
entry_id: 0,
headers: vec![MultiplePartHeader::First],
parts: vec![vec![1; 100]],
}),
]);

let corrupted_stream = MockRawEntryReader { entries };
// Builds distributor and receivers
let (distributor, mut receivers) = build_wal_entry_distributor_and_receivers(
provider.clone(),
Arc::new(corrupted_stream),
vec![region1, region2],
);
assert_eq!(receivers.len(), 2);
let mut streams = receivers
.into_iter()
.map(|receiver| receiver.read(&provider, 0).unwrap())
.collect::<Vec<_>>();
distributor.distribute().await.unwrap();
assert_eq!(
streams
.get_mut(0)
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap(),
vec![(0, region1_expected_wal_entry)]
);

assert_matches!(
streams
.get_mut(1)
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap_err(),
error::Error::CorruptedEntry { .. }
);
}

#[tokio::test]
async fn test_wal_entry_receiver_start_id() {
let provider = Provider::kafka_provider("my_topic".to_string());
let reader = Arc::new(MockRawEntryReader::new(vec![
Entry::Naive(NaiveEntry {
provider: provider.clone(),
region_id: RegionId::new(1024, 1),
entry_id: 1,
data: WalEntry {
mutations: vec![Mutation {
op_type: OpType::Put as i32,
sequence: 1u64,
rows: None,
}],
}
.encode_to_vec(),
}),
Entry::Naive(NaiveEntry {
provider: provider.clone(),
region_id: RegionId::new(1024, 2),
entry_id: 2,
data: WalEntry {
mutations: vec![Mutation {
op_type: OpType::Put as i32,
sequence: 2u64,
rows: None,
}],
}
.encode_to_vec(),
}),
Entry::Naive(NaiveEntry {
provider: provider.clone(),
region_id: RegionId::new(1024, 1),
entry_id: 3,
data: WalEntry {
mutations: vec![Mutation {
op_type: OpType::Put as i32,
sequence: 3u64,
rows: None,
}],
}
.encode_to_vec(),
}),
Entry::Naive(NaiveEntry {
provider: provider.clone(),
region_id: RegionId::new(1024, 2),
entry_id: 4,
data: WalEntry {
mutations: vec![Mutation {
op_type: OpType::Put as i32,
sequence: 4u64,
rows: None,
}],
}
.encode_to_vec(),
}),
]));

// Builds distributor and receivers
let (distributor, mut receivers) = build_wal_entry_distributor_and_receivers(
provider.clone(),
reader,
vec![RegionId::new(1024, 1), RegionId::new(1024, 2)],
);
assert_eq!(receivers.len(), 2);
let mut streams = receivers
.into_iter()
.map(|receiver| receiver.read(&provider, 4).unwrap())
.collect::<Vec<_>>();
distributor.distribute().await.unwrap();

assert_eq!(
streams
.get_mut(1)
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap(),
vec![(
4,
WalEntry {
mutations: vec![Mutation {
op_type: OpType::Put as i32,
sequence: 4u64,
rows: None,
}],
}
)]
);
}
}

0 comments on commit 6d1f8c9

Please sign in to comment.