diff --git a/src/stream/src/executor/over_window/frame_finder.rs b/src/stream/src/executor/over_window/frame_finder.rs index 4438611b5d370..9cffb6e26d903 100644 --- a/src/stream/src/executor/over_window/frame_finder.rs +++ b/src/stream/src/executor/over_window/frame_finder.rs @@ -17,7 +17,7 @@ use std::ops::Bound; -use delta_btree_map::DeltaBTreeMap; +use delta_btree_map::{CursorWithDelta, DeltaBTreeMap}; use itertools::Itertools; use risingwave_common::row::OwnedRow; use risingwave_common::types::{Datum, Sentinelled, ToDatumRef}; @@ -284,6 +284,14 @@ fn find_curr_for_rows_frame<'cache, const LEFT: bool>( } else { part_with_delta.upper_bound(Bound::Included(delta_key)) }; + let pointed_key = |cursor: CursorWithDelta<'cache, CacheKey, OwnedRow>| { + if LEFT { + cursor.peek_next().map(|(k, _)| k) + } else { + cursor.peek_prev().map(|(k, _)| k) + } + }; + let n_rows_to_move = if LEFT { frame_bounds.n_following_rows().unwrap() } else { @@ -291,8 +299,7 @@ fn find_curr_for_rows_frame<'cache, const LEFT: bool>( }; if n_rows_to_move == 0 { - return cursor - .key() + return pointed_key(cursor) .or_else(|| { if LEFT { part_with_delta.last_key() @@ -304,28 +311,16 @@ fn find_curr_for_rows_frame<'cache, const LEFT: bool>( } for _ in 0..n_rows_to_move { - // Note that we have to move before check, to handle situation where the - // cursor is at ghost position at first. - if LEFT { - cursor.move_prev(); - } else { - cursor.move_next(); - } - if cursor.position().is_ghost() { + let res = if LEFT { cursor.prev() } else { cursor.next() }; + if res.is_none() { + // we reach the end break; } } - cursor - .key() - .or_else(|| { - // Note the difference between this with the `n_rows_to_move == 0` case. - if LEFT { - part_with_delta.first_key() - } else { - part_with_delta.last_key() - } - }) - .unwrap() + + // We always have a valid key here, because `part_with_delta` must not be empty, + // and `n_rows_to_move` is always larger than 0 when we reach here. + pointed_key(cursor).unwrap() } fn find_boundary_for_rows_frame<'cache, const LEFT: bool>( @@ -350,8 +345,18 @@ fn find_boundary_for_rows_frame<'cache, const LEFT: bool>( // have `curr_key` which definitely exists in the `part_with_delta`. We just find // the cursor pointing to it and move the cursor to frame boundary. - let mut cursor = part_with_delta.find(curr_key).unwrap(); - assert!(!cursor.position().is_ghost()); + let mut cursor = if LEFT { + part_with_delta.before(curr_key).unwrap() + } else { + part_with_delta.after(curr_key).unwrap() + }; + let pointed_key = |cursor: CursorWithDelta<'cache, CacheKey, OwnedRow>| { + if LEFT { + cursor.peek_next().map(|(k, _)| k) + } else { + cursor.peek_prev().map(|(k, _)| k) + } + }; let n_rows_to_move = if LEFT { frame_bounds.n_preceding_rows().unwrap() @@ -360,25 +365,16 @@ fn find_boundary_for_rows_frame<'cache, const LEFT: bool>( }; for _ in 0..n_rows_to_move { - if LEFT { - cursor.move_prev(); - } else { - cursor.move_next(); - } - if cursor.position().is_ghost() { + let res = if LEFT { cursor.prev() } else { cursor.next() }; + if res.is_none() { + // we reach the end break; } } - cursor - .key() - .or_else(|| { - if LEFT { - part_with_delta.first_key() - } else { - part_with_delta.last_key() - } - }) - .unwrap() + + // We always have a valid key here, because `cursor` must point to a valid key + // at the beginning. + pointed_key(cursor).unwrap() } /// Given a pair of left and right state keys, calculate the leftmost (smallest) and rightmost @@ -497,11 +493,15 @@ fn find_for_range_frames<'cache, const LEFT: bool>( // the curr key. prev_key } else { - // If cursor is in ghost position, it simply means that the search key is larger + // If there's nothing on the left, it simply means that the search key is larger // than any existing key. Returning the last key in this case does no harm. Especially, // if the last key is largest sentinel, the caller should extend the cache rightward // to get possible entries with the same order value into the cache. - cursor.key().or_else(|| part_with_delta.last_key()).unwrap() + cursor + .peek_next() + .map(|(k, _)| k) + .or_else(|| part_with_delta.last_key()) + .unwrap() } } else { let cursor = part_with_delta.upper_bound(Bound::Included(&search_key)); @@ -511,7 +511,8 @@ fn find_for_range_frames<'cache, const LEFT: bool>( next_key } else { cursor - .key() + .peek_prev() + .map(|(k, _)| k) .or_else(|| part_with_delta.first_key()) .unwrap() } diff --git a/src/stream/src/executor/over_window/over_partition.rs b/src/stream/src/executor/over_window/over_partition.rs index c2f8ea895dccb..dd90105024284 100644 --- a/src/stream/src/executor/over_window/over_partition.rs +++ b/src/stream/src/executor/over_window/over_partition.rs @@ -19,7 +19,7 @@ use std::collections::BTreeMap; use std::marker::PhantomData; use std::ops::{Bound, RangeInclusive}; -use delta_btree_map::{Change, DeltaBTreeMap, PositionType}; +use delta_btree_map::{Change, DeltaBTreeMap}; use educe::Educe; use futures_async_stream::for_await; use risingwave_common::array::stream_record::Record; @@ -470,12 +470,11 @@ impl<'a, S: StateStore> OverPartition<'a, S> { // Populate window states with the affected range of rows. { let mut cursor = part_with_delta - .find(first_frame_start) + .before(first_frame_start) .expect("first frame start key must exist"); - while { - let (key, row) = cursor - .key_value() - .expect("cursor must be valid until `last_frame_end`"); + + while let Some((key, row)) = cursor.next() { + accessed_entry_count += 1; for (call, state) in calls.iter().zip_eq_fast(states.iter_mut()) { // TODO(rc): batch appending @@ -488,28 +487,28 @@ impl<'a, S: StateStore> OverPartition<'a, S> { .into(), ); } - accessed_entry_count += 1; - cursor.move_next(); - key != last_frame_end - } {} + if key == last_frame_end { + break; + } + } } // Slide to the first affected key. We can safely pass in `first_curr_key` here // because it definitely exists in the states by the definition of affected range. states.just_slide_to(first_curr_key.as_normal_expect())?; - let mut curr_key_cursor = part_with_delta.find(first_curr_key).unwrap(); + let mut curr_key_cursor = part_with_delta.before(first_curr_key).unwrap(); assert_eq!( states.curr_key(), - curr_key_cursor.key().map(CacheKey::as_normal_expect) + curr_key_cursor + .peek_next() + .map(|(k, _)| k) + .map(CacheKey::as_normal_expect) ); // Slide and generate changes. - while { - let (key, row) = curr_key_cursor - .key_value() - .expect("cursor must be valid until `last_curr_key`"); - let mut should_continue = true; + while let Some((key, row)) = curr_key_cursor.next() { + let mut should_stop = false; let output = states.slide_no_evict_hint()?; compute_count += 1; @@ -524,11 +523,11 @@ impl<'a, S: StateStore> OverPartition<'a, S> { // all the following rows, so we need to check the `order_key`. if key.as_normal_expect().order_key > last_delta_key.order_key { // there won't be any more changes after this point, we can stop early - should_continue = false; + should_stop = true; } } else if key.as_normal_expect() >= last_delta_key { // there won't be any more changes after this point, we can stop early - should_continue = false; + should_stop = true; } } } @@ -542,29 +541,23 @@ impl<'a, S: StateStore> OverPartition<'a, S> { .collect(), ); - match curr_key_cursor.position() { - PositionType::Ghost => unreachable!(), - PositionType::Snapshot | PositionType::DeltaUpdate => { - // update - let old_row = snapshot.get(key).unwrap().clone(); - if old_row != new_row { - part_changes.insert( - key.as_normal_expect().clone(), - Record::Update { old_row, new_row }, - ); - } - } - PositionType::DeltaInsert => { - // insert - part_changes - .insert(key.as_normal_expect().clone(), Record::Insert { new_row }); + if let Some(old_row) = snapshot.get(key).cloned() { + // update + if old_row != new_row { + part_changes.insert( + key.as_normal_expect().clone(), + Record::Update { old_row, new_row }, + ); } + } else { + // insert + part_changes.insert(key.as_normal_expect().clone(), Record::Insert { new_row }); } - curr_key_cursor.move_next(); - - should_continue && key != last_curr_key - } {} + if should_stop || key == last_curr_key { + break; + } + } } self.stats.accessed_entry_count += accessed_entry_count; diff --git a/src/utils/delta_btree_map/src/lib.rs b/src/utils/delta_btree_map/src/lib.rs index 1f7e1a77190a0..afb49fcda469f 100644 --- a/src/utils/delta_btree_map/src/lib.rs +++ b/src/utils/delta_btree_map/src/lib.rs @@ -15,7 +15,7 @@ #![feature(btree_cursors)] use std::cmp::Ordering; -use std::collections::BTreeMap; +use std::collections::{btree_map, BTreeMap}; use std::ops::Bound; use educe::Educe; @@ -28,6 +28,9 @@ use enum_as_inner::EnumAsInner; pub struct DeltaBTreeMap<'a, K: Ord, V> { snapshot: &'a BTreeMap, delta: &'a BTreeMap>, + + first_key: Option<&'a K>, + last_key: Option<&'a K>, } #[derive(Debug, Clone, Copy, PartialEq, Eq, EnumAsInner)] @@ -37,8 +40,29 @@ pub enum Change { } impl<'a, K: Ord, V> DeltaBTreeMap<'a, K, V> { + /// Create a new [`DeltaBTreeMap`] from the given snapshot and delta. + /// Best case time complexity: O(1), worst case time complexity: O(m), where m is `delta.len()`. pub fn new(snapshot: &'a BTreeMap, delta: &'a BTreeMap>) -> Self { - Self { snapshot, delta } + let first_key = { + let cursor = CursorWithDelta { + ss_cursor: snapshot.lower_bound(Bound::Unbounded), + dt_cursor: delta.lower_bound(Bound::Unbounded), + }; + cursor.peek_next().map(|(key, _)| key) + }; + let last_key = { + let cursor = CursorWithDelta { + ss_cursor: snapshot.upper_bound(Bound::Unbounded), + dt_cursor: delta.upper_bound(Bound::Unbounded), + }; + cursor.peek_prev().map(|(key, _)| key) + }; + Self { + snapshot, + delta, + first_key, + last_key, + } } /// Get a reference to the snapshot. @@ -51,238 +75,170 @@ impl<'a, K: Ord, V> DeltaBTreeMap<'a, K, V> { self.delta } - /// Get the first key in the updated version of the snapshot. + /// Get the first key in the updated version of the snapshot. Complexity: O(1). pub fn first_key(&self) -> Option<&'a K> { - let cursor = CursorWithDelta { - snapshot: self.snapshot, - delta: self.delta, - curr_key_value: None, - }; - cursor.peek_next().map(|(key, _)| key) + self.first_key } - /// Get the last key in the updated version of the snapshot. + /// Get the last key in the updated version of the snapshot. Complexity: O(1). pub fn last_key(&self) -> Option<&'a K> { - let cursor = CursorWithDelta { - snapshot: self.snapshot, - delta: self.delta, - curr_key_value: None, - }; - cursor.peek_prev().map(|(key, _)| key) + self.last_key } - /// Get a [`CursorWithDelta`] pointing to the element corresponding to the given key. + /// Get a [`CursorWithDelta`] pointing at the gap before the given given key. /// If the given key is not found in either the snapshot or the delta, `None` is returned. - pub fn find(&self, key: &K) -> Option> { - let ss_cursor = self.snapshot.lower_bound(Bound::Included(key)); - let dt_cursor = self.delta.lower_bound(Bound::Included(key)); - let ss_cursor_kv = ss_cursor.peek_next(); - let dt_cursor_kv = dt_cursor.peek_next(); - let curr_key_value = if dt_cursor_kv.map(|(k, _)| k) == Some(key) { - match dt_cursor_kv.unwrap() { - (key, Change::Insert(value)) => (key, value), - (_key, Change::Delete) => { - // the key is deleted - return None; - } - } - } else if ss_cursor_kv.map(|(k, _)| k) == Some(key) { - ss_cursor_kv.unwrap() - } else { - // the key doesn't exist + pub fn before(&self, key: &K) -> Option> { + let cursor = self.lower_bound(Bound::Included(key)); + if cursor.peek_next().map(|(k, _)| k) != Some(key) { return None; - }; - Some(CursorWithDelta { - snapshot: self.snapshot, - delta: self.delta, - curr_key_value: Some(curr_key_value), - }) + } + Some(cursor) + } + + /// Get a [`CursorWithDelta`] pointing at the gap after the given given key. + /// If the given key is not found in either the snapshot or the delta, `None` is returned. + pub fn after(&self, key: &K) -> Option> { + let cursor = self.upper_bound(Bound::Included(key)); + if cursor.peek_prev().map(|(k, _)| k) != Some(key) { + return None; + } + Some(cursor) } - /// Get a [`CursorWithDelta`] pointing to the first element that is above the given bound. + /// Get a [`CursorWithDelta`] pointing at the gap before the smallest key greater than the given bound. pub fn lower_bound(&self, bound: Bound<&K>) -> CursorWithDelta<'a, K, V> { - // the implementation is very similar to `CursorWithDelta::peek_next` - let mut ss_cursor = self.snapshot.lower_bound(bound); - let mut dt_cursor = self.delta.lower_bound(bound); - let next_ss_entry = || ss_cursor.next(); - let next_dt_entry = || dt_cursor.next(); - let curr_key_value = - CursorWithDelta::peek_impl(PeekDirection::Next, next_ss_entry, next_dt_entry); + let ss_cursor = self.snapshot.lower_bound(bound); + let dt_cursor = self.delta.lower_bound(bound); CursorWithDelta { - snapshot: self.snapshot, - delta: self.delta, - curr_key_value, + ss_cursor, + dt_cursor, } } - /// Get a [`CursorWithDelta`] pointing to the first element that is below the given bound. + /// Get a [`CursorWithDelta`] pointing at the gap after the greatest key smaller than the given bound. pub fn upper_bound(&self, bound: Bound<&K>) -> CursorWithDelta<'a, K, V> { - // the implementation is very similar to `CursorWithDelta::peek_prev` - let mut ss_cursor = self.snapshot.upper_bound(bound); - let mut dt_cursor = self.delta.upper_bound(bound); - let prev_ss_entry = || ss_cursor.prev(); - let prev_dt_entry = || dt_cursor.prev(); - let curr_key_value = - CursorWithDelta::peek_impl(PeekDirection::Prev, prev_ss_entry, prev_dt_entry); + let ss_cursor = self.snapshot.upper_bound(bound); + let dt_cursor = self.delta.upper_bound(bound); CursorWithDelta { - snapshot: self.snapshot, - delta: self.delta, - curr_key_value, + ss_cursor, + dt_cursor, } } } /// Cursor that can iterate back and forth over the updated version of the snapshot. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +/// +/// A cursor always points at the gap of items in the map. For example: +/// +/// ```text +/// | Foo | Bar | +/// ^ ^ ^ +/// 1 2 3 +/// ``` +/// +/// The cursor can be at position 1, 2, or 3. +/// If it's at position 1, `peek_prev` will return `None`, and `peek_next` will return `Foo`. +/// If it's at position 3, `peek_prev` will return `Bar`, and `peek_next` will return `None`. +#[derive(Debug, Clone)] pub struct CursorWithDelta<'a, K: Ord, V> { - snapshot: &'a BTreeMap, - delta: &'a BTreeMap>, - curr_key_value: Option<(&'a K, &'a V)>, + ss_cursor: btree_map::Cursor<'a, K, V>, + dt_cursor: btree_map::Cursor<'a, K, Change>, } -/// Type of cursor position. [`PositionType::Ghost`] is a special position between the first and -/// the last item, where the key and value are `None`. -#[derive(Debug, Clone, Copy, PartialEq, Eq, EnumAsInner)] -pub enum PositionType { - Ghost, - Snapshot, - DeltaUpdate, - DeltaInsert, -} +impl<'a, K: Ord, V> CursorWithDelta<'a, K, V> { + pub fn peek_prev(&self) -> Option<(&'a K, &'a V)> { + self.peek::() + } -#[derive(PartialEq, Eq)] -enum PeekDirection { - Next, - Prev, -} + pub fn peek_next(&self) -> Option<(&'a K, &'a V)> { + self.peek::() + } -impl<'a, K: Ord, V> CursorWithDelta<'a, K, V> { - /// Get the cursor position type. - pub fn position(&self) -> PositionType { - let Some((key, _)) = self.curr_key_value else { - return PositionType::Ghost; - }; - if self.delta.contains_key(key) { - assert!(matches!(self.delta.get(key).unwrap(), Change::Insert(_))); - if self.snapshot.contains_key(key) { - PositionType::DeltaUpdate - } else { - PositionType::DeltaInsert - } - } else { - assert!(self.snapshot.contains_key(key)); - PositionType::Snapshot - } + pub fn prev(&mut self) -> Option<(&'a K, &'a V)> { + self.r#move::() } - /// Get the key pointed by the cursor. - pub fn key(&self) -> Option<&'a K> { - self.curr_key_value.map(|(k, _)| k) + #[allow(clippy::should_implement_trait)] + pub fn next(&mut self) -> Option<(&'a K, &'a V)> { + self.r#move::() } - /// Get the value pointed by the cursor. - pub fn value(&self) -> Option<&'a V> { - self.curr_key_value.map(|(_, v)| v) + fn peek(&self) -> Option<(&'a K, &'a V)> { + let mut ss_cursor = self.ss_cursor.clone(); + let mut dt_cursor = self.dt_cursor.clone(); + let res = Self::move_impl::(&mut ss_cursor, &mut dt_cursor); + res } - /// Get the key-value pair pointed by the cursor. - pub fn key_value(&self) -> Option<(&'a K, &'a V)> { - self.curr_key_value + fn r#move(&mut self) -> Option<(&'a K, &'a V)> { + let mut ss_cursor = self.ss_cursor.clone(); + let mut dt_cursor = self.dt_cursor.clone(); + let res = Self::move_impl::(&mut ss_cursor, &mut dt_cursor); + self.ss_cursor = ss_cursor; + self.dt_cursor = dt_cursor; + res } - fn peek_impl( - direction: PeekDirection, - mut next_ss_entry: impl FnMut() -> Option<(&'a K, &'a V)>, - mut next_dt_entry: impl FnMut() -> Option<(&'a K, &'a Change)>, + fn move_impl( + ss_cursor: &mut btree_map::Cursor<'a, K, V>, + dt_cursor: &mut btree_map::Cursor<'a, K, Change>, ) -> Option<(&'a K, &'a V)> { loop { - match (next_ss_entry(), next_dt_entry()) { + let ss_peek = if NEXT { + ss_cursor.peek_next() + } else { + ss_cursor.peek_prev() + }; + let dt_peek = if NEXT { + dt_cursor.peek_next() + } else { + dt_cursor.peek_prev() + }; + + let in_delta = match (ss_peek, dt_peek) { (None, None) => return None, - (None, Some((key, change))) => return Some((key, change.as_insert().unwrap())), - (Some((key, value)), None) => return Some((key, value)), - (Some((ss_key, ss_value)), Some((dt_key, dt_change))) => match ss_key.cmp(dt_key) { - Ordering::Less => { - if direction == PeekDirection::Next { - return Some((ss_key, ss_value)); + (None, Some(_)) => true, + (Some(_), None) => false, + (Some((ss_key, _)), Some((dt_key, dt_change))) => match ss_key.cmp(dt_key) { + Ordering::Less => !NEXT, // if NEXT { in snapshot } else { in delta } + Ordering::Greater => NEXT, // if NEXT { in delta } else { in snapshot } + Ordering::Equal => { + if NEXT { + ss_cursor.next().unwrap(); } else { - return Some((dt_key, dt_change.as_insert().unwrap())); + ss_cursor.prev().unwrap(); } - } - Ordering::Greater => { - if direction == PeekDirection::Next { - return Some((dt_key, dt_change.as_insert().unwrap())); - } else { - return Some((ss_key, ss_value)); + match dt_change { + Change::Insert(_) => true, // in delta + Change::Delete => { + if NEXT { + dt_cursor.next().unwrap(); + } else { + dt_cursor.prev().unwrap(); + } + continue; + } } } - Ordering::Equal => match dt_change { - Change::Insert(v) => return Some((ss_key, v)), - Change::Delete => continue, - }, }, + }; + + if in_delta { + let (key, change) = if NEXT { + dt_cursor.next().unwrap() + } else { + dt_cursor.prev().unwrap() + }; + return Some((key, change.as_insert().unwrap())); + } else { + return if NEXT { + ss_cursor.next() + } else { + ss_cursor.prev() + }; } } } - - /// Peek the next key-value pair. - pub fn peek_next(&self) -> Option<(&'a K, &'a V)> { - if let Some(key) = self.key() { - let mut ss_cursor = self.snapshot.lower_bound(Bound::Included(key)); - let mut dt_cursor = self.delta.lower_bound(Bound::Included(key)); - // either one of `ss_cursor.key()` and `dt_cursor.key()` == `Some(key)`, or both are - if ss_cursor.peek_next().map(|(k, _)| k) == Some(key) { - ss_cursor.next(); - } - if dt_cursor.peek_next().map(|(k, _)| k) == Some(key) { - dt_cursor.next(); - } - let next_ss_entry = || ss_cursor.next(); - let next_dt_entry = || dt_cursor.next(); - Self::peek_impl(PeekDirection::Next, next_ss_entry, next_dt_entry) - } else { - // we are at the ghost position, now let's go back to the beginning - let mut ss_iter = self.snapshot.iter(); - let mut dt_iter = self.delta.iter(); - Self::peek_impl(PeekDirection::Next, || ss_iter.next(), || dt_iter.next()) - } - } - - /// Peek the previous key-value pair. - pub fn peek_prev(&self) -> Option<(&'a K, &'a V)> { - if let Some(key) = self.key() { - let mut ss_cursor = self.snapshot.upper_bound(Bound::Included(key)); - let mut dt_cursor = self.delta.upper_bound(Bound::Included(key)); - // either one of `ss_cursor.key()` and `dt_cursor.key()` == `Some(key)`, or both are - if ss_cursor.peek_prev().map(|(k, _)| k) == Some(key) { - ss_cursor.prev(); - } - if dt_cursor.peek_prev().map(|(k, _)| k) == Some(key) { - dt_cursor.prev(); - } - let next_ss_entry = || ss_cursor.prev(); - let next_dt_entry = || dt_cursor.prev(); - Self::peek_impl(PeekDirection::Prev, next_ss_entry, next_dt_entry) - } else { - // we are at the ghost position, now let's go back to the end - let mut ss_iter = self.snapshot.iter(); - let mut dt_iter = self.delta.iter(); - Self::peek_impl( - PeekDirection::Prev, - || ss_iter.next_back(), - || dt_iter.next_back(), - ) - } - } - - /// Move the cursor to the next position. - pub fn move_next(&mut self) { - self.curr_key_value = self.peek_next(); - } - - /// Move the cursor to the previous position. - pub fn move_prev(&mut self) { - self.curr_key_value = self.peek_prev(); - } } #[cfg(test)] @@ -297,9 +253,10 @@ mod tests { assert_eq!(delta_map.first_key(), None); assert_eq!(delta_map.last_key(), None); - assert_eq!(delta_map.find(&1), None); - assert_eq!(delta_map.lower_bound(Bound::Included(&1)).key(), None); - assert_eq!(delta_map.upper_bound(Bound::Included(&1)).key(), None); + assert!(delta_map.before(&1).is_none()); + assert!(delta_map.after(&1).is_none()); + assert_eq!(delta_map.lower_bound(Bound::Included(&1)).peek_next(), None); + assert_eq!(delta_map.upper_bound(Bound::Included(&1)).peek_prev(), None); let mut map = BTreeMap::new(); map.insert(1, "1"); @@ -310,9 +267,9 @@ mod tests { let delta_map = DeltaBTreeMap::new(&map, &delta); assert_eq!(delta_map.first_key(), None); assert_eq!(delta_map.last_key(), None); - assert_eq!(delta_map.find(&1), None); - assert_eq!(delta_map.find(&2), None); - assert_eq!(delta_map.find(&3), None); + assert!(delta_map.before(&1).is_none()); + assert!(delta_map.before(&2).is_none()); + assert!(delta_map.before(&3).is_none()); } #[test] @@ -326,40 +283,46 @@ mod tests { assert_eq!(delta_map.first_key(), Some(&1)); assert_eq!(delta_map.last_key(), Some(&5)); - assert_eq!(delta_map.find(&100), None); - assert_eq!(delta_map.lower_bound(Bound::Included(&1)).key(), Some(&1)); - assert_eq!(delta_map.lower_bound(Bound::Excluded(&3)).key(), Some(&5)); - assert_eq!(delta_map.upper_bound(Bound::Included(&1)).key(), Some(&1)); - assert_eq!(delta_map.upper_bound(Bound::Excluded(&3)).key(), Some(&2)); - - let mut cursor = delta_map.find(&2).unwrap(); - assert_eq!(cursor.position(), PositionType::Snapshot); - assert_eq!(cursor.key(), Some(&2)); - assert_eq!(cursor.value(), Some(&"2")); - assert_eq!(cursor.key_value(), Some((&2, &"2"))); - assert_eq!(cursor.peek_next(), Some((&5, &"5"))); - assert_eq!(cursor.peek_prev(), Some((&1, &"1"))); - cursor.move_next(); - assert_eq!(cursor.key(), Some(&5)); - assert_eq!(cursor.value(), Some(&"5")); - cursor.move_next(); - assert_eq!(cursor.position(), PositionType::Ghost); - assert_eq!(cursor.key(), None); - assert_eq!(cursor.value(), None); - cursor.move_prev(); - assert_eq!(cursor.key(), Some(&5)); - assert_eq!(cursor.value(), Some(&"5")); - cursor.move_prev(); - cursor.move_prev(); - assert_eq!(cursor.key(), Some(&1)); - assert_eq!(cursor.value(), Some(&"1")); - assert_eq!(cursor.peek_prev(), None); + assert!(delta_map.before(&100).is_none()); + assert_eq!( + delta_map.lower_bound(Bound::Included(&1)).peek_next(), + Some((&1, &"1")) + ); + assert_eq!( + delta_map.lower_bound(Bound::Excluded(&3)).peek_next(), + Some((&5, &"5")) + ); + assert_eq!( + delta_map.upper_bound(Bound::Included(&1)).peek_prev(), + Some((&1, &"1")) + ); + assert_eq!( + delta_map.upper_bound(Bound::Excluded(&3)).peek_prev(), + Some((&2, &"2")) + ); + + let mut cursor = delta_map.before(&2).unwrap(); assert_eq!(cursor.peek_next(), Some((&2, &"2"))); - cursor.move_prev(); - assert_eq!(cursor.key(), None); - assert_eq!(cursor.value(), None); + assert_eq!(cursor.peek_prev(), Some((&1, &"1"))); + let (key, value) = cursor.next().unwrap(); + assert_eq!(key, &2); + assert_eq!(value, &"2"); + assert_eq!(cursor.peek_next(), Some((&5, &"5"))); + assert_eq!(cursor.peek_prev(), Some((&2, &"2"))); + cursor.next(); + assert_eq!(cursor.peek_next(), None); + cursor.next(); + assert_eq!(cursor.peek_next(), None); assert_eq!(cursor.peek_prev(), Some((&5, &"5"))); + cursor.prev(); + assert_eq!(cursor.peek_next(), Some((&5, &"5"))); + cursor.prev(); + cursor.prev(); assert_eq!(cursor.peek_next(), Some((&1, &"1"))); + assert_eq!(cursor.peek_prev(), None); + cursor.prev(); + assert_eq!(cursor.peek_next(), Some((&1, &"1"))); + assert_eq!(cursor.peek_prev(), None); } #[test] @@ -372,35 +335,42 @@ mod tests { assert_eq!(delta_map.first_key(), Some(&1)); assert_eq!(delta_map.last_key(), Some(&2)); - assert_eq!(delta_map.find(&100), None); - assert_eq!(delta_map.lower_bound(Bound::Included(&1)).key(), Some(&1)); - assert_eq!(delta_map.lower_bound(Bound::Excluded(&1)).key(), Some(&2)); - assert_eq!(delta_map.upper_bound(Bound::Included(&1)).key(), Some(&1)); - assert_eq!(delta_map.upper_bound(Bound::Excluded(&10)).key(), Some(&2)); - - let mut cursor = delta_map.find(&2).unwrap(); - assert_eq!(cursor.position(), PositionType::DeltaInsert); - assert_eq!(cursor.key(), Some(&2)); - assert_eq!(cursor.value(), Some(&"2")); - assert_eq!(cursor.key_value(), Some((&2, &"2"))); + assert!(delta_map.before(&100).is_none()); + assert_eq!( + delta_map.lower_bound(Bound::Included(&1)).peek_next(), + Some((&1, &"1")) + ); + assert_eq!( + delta_map.lower_bound(Bound::Excluded(&1)).peek_next(), + Some((&2, &"2")) + ); + assert_eq!( + delta_map.upper_bound(Bound::Included(&1)).peek_prev(), + Some((&1, &"1")) + ); + assert_eq!( + delta_map.upper_bound(Bound::Excluded(&10)).peek_prev(), + Some((&2, &"2")) + ); + + let mut cursor = delta_map.before(&2).unwrap(); + assert_eq!(cursor.peek_next(), Some((&2, &"2"))); + assert_eq!(cursor.peek_prev(), Some((&1, &"1"))); + cursor.next(); + assert_eq!(cursor.peek_next(), None); + assert_eq!(cursor.peek_prev(), Some((&2, &"2"))); + cursor.next(); assert_eq!(cursor.peek_next(), None); + assert_eq!(cursor.peek_prev(), Some((&2, &"2"))); + cursor.prev(); + assert_eq!(cursor.peek_next(), Some((&2, &"2"))); assert_eq!(cursor.peek_prev(), Some((&1, &"1"))); - cursor.move_next(); - assert_eq!(cursor.key(), None); - assert_eq!(cursor.value(), None); - cursor.move_prev(); - assert_eq!(cursor.key(), Some(&2)); - assert_eq!(cursor.value(), Some(&"2")); - cursor.move_prev(); - assert_eq!(cursor.key(), Some(&1)); - assert_eq!(cursor.value(), Some(&"1")); + cursor.prev(); + assert_eq!(cursor.peek_next(), Some((&1, &"1"))); assert_eq!(cursor.peek_prev(), None); - assert_eq!(cursor.peek_next(), Some((&2, &"2"))); - cursor.move_prev(); - assert_eq!(cursor.key(), None); - assert_eq!(cursor.value(), None); - assert_eq!(cursor.peek_prev(), Some((&2, &"2"))); + cursor.prev(); assert_eq!(cursor.peek_next(), Some((&1, &"1"))); + assert_eq!(cursor.peek_prev(), None); } #[test] @@ -414,26 +384,37 @@ mod tests { assert_eq!(delta_map.first_key(), Some(&3)); assert_eq!(delta_map.last_key(), Some(&3)); - assert_eq!(delta_map.find(&1), None); - assert_eq!(delta_map.find(&2), None); - assert_eq!(delta_map.lower_bound(Bound::Included(&1)).key(), Some(&3)); - assert_eq!(delta_map.lower_bound(Bound::Excluded(&0)).key(), Some(&3)); - assert_eq!(delta_map.upper_bound(Bound::Included(&1)).key(), None); - assert_eq!(delta_map.upper_bound(Bound::Excluded(&10)).key(), Some(&3)); - - let mut cursor = delta_map.find(&3).unwrap(); - assert_eq!(cursor.position(), PositionType::Snapshot); - assert_eq!(cursor.key(), Some(&3)); - assert_eq!(cursor.value(), Some(&"3")); - assert_eq!(cursor.key_value(), Some((&3, &"3"))); + assert!(delta_map.before(&1).is_none()); + assert!(delta_map.before(&2).is_none()); + assert_eq!( + delta_map.lower_bound(Bound::Included(&1)).peek_next(), + Some((&3, &"3")) + ); + assert_eq!( + delta_map.lower_bound(Bound::Excluded(&0)).peek_next(), + Some((&3, &"3")) + ); + assert_eq!(delta_map.upper_bound(Bound::Included(&1)).peek_prev(), None); + assert_eq!( + delta_map.upper_bound(Bound::Excluded(&10)).peek_prev(), + Some((&3, &"3")) + ); + + let mut cursor = delta_map.before(&3).unwrap(); + assert_eq!(cursor.peek_next(), Some((&3, &"3"))); + assert_eq!(cursor.peek_prev(), None); + cursor.next(); assert_eq!(cursor.peek_next(), None); + assert_eq!(cursor.peek_prev(), Some((&3, &"3"))); + cursor.prev(); + assert_eq!(cursor.peek_next(), Some((&3, &"3"))); + assert_eq!(cursor.peek_prev(), None); + cursor.prev(); + assert_eq!(cursor.peek_next(), Some((&3, &"3"))); assert_eq!(cursor.peek_prev(), None); - cursor.move_next(); - assert_eq!(cursor.key(), None); - assert_eq!(cursor.value(), None); - cursor.move_prev(); - assert_eq!(cursor.key(), Some(&3)); - assert_eq!(cursor.value(), Some(&"3")); + cursor.next(); + assert_eq!(cursor.peek_next(), None); + assert_eq!(cursor.peek_prev(), Some((&3, &"3"))); } #[test] @@ -447,15 +428,30 @@ mod tests { assert_eq!(delta_map.first_key(), Some(&1)); assert_eq!(delta_map.last_key(), Some(&1)); - assert_eq!(delta_map.find(&2), None); - assert_eq!(delta_map.find(&3), None); - assert_eq!(delta_map.lower_bound(Bound::Included(&1)).key(), Some(&1)); - assert_eq!(delta_map.lower_bound(Bound::Excluded(&1)).key(), None); - assert_eq!(delta_map.upper_bound(Bound::Included(&3)).key(), Some(&1)); - assert_eq!(delta_map.upper_bound(Bound::Excluded(&3)).key(), Some(&1)); - - let cursor = delta_map.find(&1).unwrap(); - assert_eq!(cursor.position(), PositionType::Snapshot); + assert!(delta_map.before(&2).is_none()); + assert!(delta_map.before(&3).is_none()); + assert_eq!( + delta_map.lower_bound(Bound::Included(&1)).peek_next(), + Some((&1, &"1")) + ); + assert_eq!(delta_map.lower_bound(Bound::Excluded(&1)).peek_next(), None); + assert_eq!( + delta_map.upper_bound(Bound::Included(&3)).peek_prev(), + Some((&1, &"1")) + ); + assert_eq!( + delta_map.upper_bound(Bound::Excluded(&3)).peek_prev(), + Some((&1, &"1")) + ); + + let mut cursor = delta_map.before(&1).unwrap(); + assert_eq!(cursor.peek_next(), Some((&1, &"1"))); + cursor.next(); + assert_eq!(cursor.peek_next(), None); + assert_eq!(cursor.peek_prev(), Some((&1, &"1"))); + cursor.prev(); + assert_eq!(cursor.peek_next(), Some((&1, &"1"))); + assert_eq!(cursor.peek_prev(), None); } #[test] @@ -470,11 +466,11 @@ mod tests { assert_eq!(delta_map.first_key(), None); assert_eq!(delta_map.last_key(), None); - assert_eq!(delta_map.find(&1), None); - assert_eq!(delta_map.find(&2), None); - assert_eq!(delta_map.find(&3), None); - assert_eq!(delta_map.lower_bound(Bound::Included(&1)).key(), None); - assert_eq!(delta_map.upper_bound(Bound::Excluded(&3)).key(), None); + assert!(delta_map.before(&1).is_none()); + assert!(delta_map.before(&2).is_none()); + assert!(delta_map.before(&3).is_none()); + assert_eq!(delta_map.lower_bound(Bound::Included(&1)).peek_next(), None); + assert_eq!(delta_map.upper_bound(Bound::Excluded(&3)).peek_prev(), None); } #[test] @@ -488,22 +484,30 @@ mod tests { assert_eq!(delta_map.first_key(), Some(&1)); assert_eq!(delta_map.last_key(), Some(&3)); - assert_eq!(delta_map.find(&10), None); - assert_eq!(delta_map.lower_bound(Bound::Included(&1)).key(), Some(&1)); - assert_eq!(delta_map.lower_bound(Bound::Excluded(&1)).key(), Some(&2)); - assert_eq!(delta_map.upper_bound(Bound::Included(&2)).key(), Some(&2)); - assert_eq!(delta_map.upper_bound(Bound::Excluded(&2)).key(), Some(&1)); - - let mut cursor = delta_map.find(&2).unwrap(); - assert_eq!(cursor.position(), PositionType::DeltaInsert); - assert_eq!(cursor.key(), Some(&2)); - assert_eq!(cursor.value(), Some(&"2")); - assert_eq!(cursor.key_value(), Some((&2, &"2"))); - assert_eq!(cursor.peek_next(), Some((&3, &"3"))); + assert!(delta_map.before(&10).is_none()); + assert_eq!( + delta_map.lower_bound(Bound::Included(&1)).peek_next(), + Some((&1, &"1")) + ); + assert_eq!( + delta_map.lower_bound(Bound::Excluded(&1)).peek_next(), + Some((&2, &"2")) + ); + assert_eq!( + delta_map.upper_bound(Bound::Included(&2)).peek_prev(), + Some((&2, &"2")) + ); + assert_eq!( + delta_map.upper_bound(Bound::Excluded(&2)).peek_prev(), + Some((&1, &"1")) + ); + + let mut cursor = delta_map.before(&2).unwrap(); + assert_eq!(cursor.peek_next(), Some((&2, &"2"))); assert_eq!(cursor.peek_prev(), Some((&1, &"1"))); - cursor.move_next(); - assert_eq!(cursor.key(), Some(&3)); - assert_eq!(cursor.value(), Some(&"3")); + cursor.next(); + assert_eq!(cursor.peek_next(), Some((&3, &"3"))); + assert_eq!(cursor.peek_prev(), Some((&2, &"2"))); } #[test] @@ -518,19 +522,15 @@ mod tests { assert_eq!(delta_map.first_key(), Some(&1)); assert_eq!(delta_map.last_key(), Some(&3)); - let mut cursor = delta_map.find(&1).unwrap(); - assert_eq!(cursor.position(), PositionType::DeltaUpdate); - assert_eq!(cursor.key(), Some(&1)); - assert_eq!(cursor.value(), Some(&"1 new")); - assert_eq!(cursor.key_value(), Some((&1, &"1 new"))); + let mut cursor = delta_map.before(&1).unwrap(); + assert_eq!(cursor.peek_next(), Some((&1, &"1 new"))); + assert_eq!(cursor.peek_prev(), None); + cursor.next(); assert_eq!(cursor.peek_next(), Some((&3, &"3"))); + assert_eq!(cursor.peek_prev(), Some((&1, &"1 new"))); + cursor.prev(); + assert_eq!(cursor.peek_next(), Some((&1, &"1 new"))); assert_eq!(cursor.peek_prev(), None); - cursor.move_next(); - assert_eq!(cursor.key(), Some(&3)); - assert_eq!(cursor.value(), Some(&"3")); - cursor.move_prev(); - assert_eq!(cursor.key(), Some(&1)); - assert_eq!(cursor.value(), Some(&"1 new")); } #[test] @@ -545,19 +545,15 @@ mod tests { assert_eq!(delta_map.first_key(), Some(&1)); assert_eq!(delta_map.last_key(), Some(&3)); - let mut cursor = delta_map.find(&3).unwrap(); - assert_eq!(cursor.position(), PositionType::DeltaUpdate); - assert_eq!(cursor.key(), Some(&3)); - assert_eq!(cursor.value(), Some(&"3 new")); - assert_eq!(cursor.key_value(), Some((&3, &"3 new"))); + let mut cursor = delta_map.before(&3).unwrap(); + assert_eq!(cursor.peek_next(), Some((&3, &"3 new"))); + assert_eq!(cursor.peek_prev(), Some((&1, &"1"))); + cursor.next(); assert_eq!(cursor.peek_next(), None); + assert_eq!(cursor.peek_prev(), Some((&3, &"3 new"))); + cursor.prev(); + assert_eq!(cursor.peek_next(), Some((&3, &"3 new"))); assert_eq!(cursor.peek_prev(), Some((&1, &"1"))); - cursor.move_next(); - assert_eq!(cursor.key(), None); - assert_eq!(cursor.value(), None); - cursor.move_prev(); - assert_eq!(cursor.key(), Some(&3)); - assert_eq!(cursor.value(), Some(&"3 new")); } #[test] @@ -575,50 +571,58 @@ mod tests { assert_eq!(delta_map.first_key(), Some(&0)); assert_eq!(delta_map.last_key(), Some(&4)); - assert_eq!(delta_map.find(&-1), None); - assert_eq!(delta_map.find(&3), None); - assert_eq!(delta_map.find(&10), None); - assert_eq!(delta_map.lower_bound(Bound::Included(&0)).key(), Some(&0)); - assert_eq!(delta_map.lower_bound(Bound::Excluded(&0)).key(), Some(&1)); - assert_eq!(delta_map.lower_bound(Bound::Included(&3)).key(), Some(&4)); - assert_eq!(delta_map.upper_bound(Bound::Included(&5)).key(), Some(&4)); - assert_eq!(delta_map.upper_bound(Bound::Excluded(&4)).key(), Some(&2)); - assert_eq!(delta_map.upper_bound(Bound::Excluded(&2)).key(), Some(&1)); - - let mut cursor = delta_map.find(&0).unwrap(); - assert_eq!(cursor.position(), PositionType::DeltaInsert); - assert_eq!(cursor.key_value(), Some((&0, &"0"))); - cursor.move_next(); - assert_eq!(cursor.position(), PositionType::DeltaUpdate); - assert_eq!(cursor.key_value(), Some((&1, &"1 new"))); - cursor.move_next(); - assert_eq!(cursor.position(), PositionType::Snapshot); - assert_eq!(cursor.key_value(), Some((&2, &"2"))); - cursor.move_next(); - assert_eq!(cursor.position(), PositionType::DeltaInsert); - assert_eq!(cursor.key_value(), Some((&4, &"4"))); - cursor.move_next(); - assert_eq!(cursor.position(), PositionType::Ghost); - assert_eq!(cursor.key_value(), None); - cursor.move_next(); - assert_eq!(cursor.position(), PositionType::DeltaInsert); - assert_eq!(cursor.key_value(), Some((&0, &"0"))); - cursor.move_prev(); - assert_eq!(cursor.position(), PositionType::Ghost); - cursor.move_prev(); - assert_eq!(cursor.position(), PositionType::DeltaInsert); - assert_eq!(cursor.key_value(), Some((&4, &"4"))); - cursor.move_prev(); - assert_eq!(cursor.position(), PositionType::Snapshot); - assert_eq!(cursor.key_value(), Some((&2, &"2"))); - cursor.move_prev(); - assert_eq!(cursor.position(), PositionType::DeltaUpdate); - assert_eq!(cursor.key_value(), Some((&1, &"1 new"))); - cursor.move_prev(); - assert_eq!(cursor.position(), PositionType::DeltaInsert); - assert_eq!(cursor.key_value(), Some((&0, &"0"))); - cursor.move_prev(); - assert_eq!(cursor.position(), PositionType::Ghost); + assert!(delta_map.before(&-1).is_none()); + assert!(delta_map.before(&3).is_none()); + assert!(delta_map.before(&10).is_none()); + assert_eq!( + delta_map.lower_bound(Bound::Included(&0)).peek_next(), + Some((&0, &"0")) + ); + assert_eq!( + delta_map.lower_bound(Bound::Excluded(&0)).peek_next(), + Some((&1, &"1 new")) + ); + assert_eq!( + delta_map.lower_bound(Bound::Included(&3)).peek_next(), + Some((&4, &"4")) + ); + assert_eq!( + delta_map.upper_bound(Bound::Included(&5)).peek_prev(), + Some((&4, &"4")) + ); + assert_eq!( + delta_map.upper_bound(Bound::Excluded(&4)).peek_prev(), + Some((&2, &"2")) + ); + assert_eq!( + delta_map.upper_bound(Bound::Excluded(&2)).peek_prev(), + Some((&1, &"1 new")) + ); + + let mut cursor = delta_map.before(&0).unwrap(); + assert_eq!(cursor.peek_next(), Some((&0, &"0"))); + cursor.next(); + assert_eq!(cursor.peek_next(), Some((&1, &"1 new"))); + cursor.next(); + assert_eq!(cursor.peek_next(), Some((&2, &"2"))); + cursor.next(); + assert_eq!(cursor.peek_next(), Some((&4, &"4"))); + cursor.next(); + assert_eq!(cursor.peek_next(), None); + cursor.next(); + assert_eq!(cursor.peek_next(), None); + cursor.prev(); + assert_eq!(cursor.peek_next(), Some((&4, &"4"))); + cursor.prev(); + assert_eq!(cursor.peek_next(), Some((&2, &"2"))); + cursor.prev(); + assert_eq!(cursor.peek_next(), Some((&1, &"1 new"))); + cursor.prev(); + assert_eq!(cursor.peek_next(), Some((&0, &"0"))); + assert_eq!(cursor.peek_prev(), None); + cursor.prev(); + assert_eq!(cursor.peek_next(), Some((&0, &"0"))); + assert_eq!(cursor.peek_prev(), None); } #[test] @@ -640,19 +644,17 @@ mod tests { assert_eq!(delta_map.first_key(), Some(&0)); assert_eq!(delta_map.last_key(), Some(&3)); - let mut cursor = delta_map.find(&0).unwrap(); + let mut cursor = delta_map.before(&0).unwrap(); let mut res = vec![]; - while let Some((k, v)) = cursor.key_value() { + while let Some((k, v)) = cursor.next() { res.push((*k, *v)); - cursor.move_next(); } assert_eq!(res, vec![(0, "0"), (1, "1 new"), (3, "3")]); - let mut cursor = delta_map.find(&3).unwrap(); + let mut cursor = delta_map.after(&3).unwrap(); let mut res = vec![]; - while let Some((k, v)) = cursor.key_value() { + while let Some((k, v)) = cursor.prev() { res.push((*k, *v)); - cursor.move_prev(); } assert_eq!(res, vec![(3, "3"), (1, "1 new"), (0, "0")]); }