diff --git a/src/inmem/mutable.rs b/src/inmem/mutable.rs index 2a59a497..9e19ea16 100644 --- a/src/inmem/mutable.rs +++ b/src/inmem/mutable.rs @@ -114,7 +114,7 @@ where #[cfg(test)] mod tests { - use std::ops::Bound; + use std::collections::Bound; use super::Mutable; use crate::{ @@ -157,4 +157,66 @@ mod tests { } ) } + + #[test] + fn range() { + let mutable = Mutable::::new(); + + mutable.insert("1".into(), 0_u32.into()); + mutable.insert("2".into(), 0_u32.into()); + mutable.insert("2".into(), 1_u32.into()); + mutable.insert("3".into(), 1_u32.into()); + mutable.insert("4".into(), 0_u32.into()); + + let mut scan = mutable.scan((Bound::Unbounded, Bound::Unbounded), 0_u32.into()); + + assert_eq!( + scan.next().unwrap().key(), + &Timestamped::new("1".into(), 0_u32.into()) + ); + assert_eq!( + scan.next().unwrap().key(), + &Timestamped::new("2".into(), 1_u32.into()) + ); + assert_eq!( + scan.next().unwrap().key(), + &Timestamped::new("2".into(), 0_u32.into()) + ); + assert_eq!( + scan.next().unwrap().key(), + &Timestamped::new("3".into(), 1_u32.into()) + ); + assert_eq!( + scan.next().unwrap().key(), + &Timestamped::new("4".into(), 0_u32.into()) + ); + + let lower = "1".to_string(); + let upper = "4".to_string(); + let mut scan = mutable.scan( + (Bound::Included(&lower), Bound::Included(&upper)), + 1_u32.into(), + ); + + assert_eq!( + scan.next().unwrap().key(), + &Timestamped::new("1".into(), 0_u32.into()) + ); + assert_eq!( + scan.next().unwrap().key(), + &Timestamped::new("2".into(), 1_u32.into()) + ); + assert_eq!( + scan.next().unwrap().key(), + &Timestamped::new("2".into(), 0_u32.into()) + ); + assert_eq!( + scan.next().unwrap().key(), + &Timestamped::new("3".into(), 1_u32.into()) + ); + assert_eq!( + scan.next().unwrap().key(), + &Timestamped::new("4".into(), 0_u32.into()) + ); + } } diff --git a/src/stream/merge.rs b/src/stream/merge.rs index 38f823cb..bb1d8de3 100644 --- a/src/stream/merge.rs +++ b/src/stream/merge.rs @@ -170,4 +170,36 @@ mod tests { dbg!(merge.next().await); dbg!(merge.next().await); } + + #[tokio::test] + async fn merge_mutable_remove_duplicates() { + let m1 = Mutable::::new(); + m1.insert("1".into(), 0_u32.into()); + m1.insert("2".into(), 0_u32.into()); + m1.insert("2".into(), 1_u32.into()); + m1.insert("3".into(), 1_u32.into()); + m1.insert("4".into(), 0_u32.into()); + + let lower = "1".to_string(); + let upper = "4".to_string(); + let bound = (Bound::Included(&lower), Bound::Included(&upper)); + let mut merge = MergeStream::from_vec(vec![m1.scan(bound, 0.into()).into()]) + .await + .unwrap(); + + dbg!(merge.next().await); + dbg!(merge.next().await); + dbg!(merge.next().await); + + let lower = "1".to_string(); + let upper = "4".to_string(); + let bound = (Bound::Included(&lower), Bound::Included(&upper)); + let mut merge = MergeStream::from_vec(vec![m1.scan(bound, 1.into()).into()]) + .await + .unwrap(); + + dbg!(merge.next().await); + dbg!(merge.next().await); + dbg!(merge.next().await); + } }