From 25592ac008da690738aaf68bee16972c228df9db Mon Sep 17 00:00:00 2001 From: "Ugur Arikan (DHL Data & Analytics)" Date: Mon, 12 Aug 2024 08:26:43 +0200 Subject: [PATCH] fill with to enable concurrent initialized growth `into_concurrent_filled_with` and `grow_to_and_fill_with` methods are implemented to enable data structures which always have an initialized and valid state. --- Cargo.toml | 6 +- src/common_traits/iterator/iter_con.rs | 69 -------- src/common_traits/iterator/iter_fragment.rs | 35 ----- src/common_traits/iterator/mod.rs | 2 - src/concurrent_pinned_vec.rs | 164 +++++++++++++++++--- src/growth/doubling/doubling_growth.rs | 2 +- src/growth/linear/linear_growth.rs | 2 +- src/growth/recursive/recursive_growth.rs | 6 +- src/into_concurrent_pinned_vec.rs | 14 ++ 9 files changed, 161 insertions(+), 139 deletions(-) delete mode 100644 src/common_traits/iterator/iter_con.rs delete mode 100644 src/common_traits/iterator/iter_fragment.rs diff --git a/Cargo.toml b/Cargo.toml index 84dca04..397065a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "orx-split-vec" -version = "3.2.0" +version = "3.3.0" edition = "2021" authors = ["orxfun "] description = "An efficient constant access time vector with dynamic capacity and pinned elements." @@ -10,8 +10,8 @@ keywords = ["vec", "array", "split", "fragments", "pinned"] categories = ["data-structures", "rust-patterns"] [dependencies] -orx-pseudo-default = "1.2.0" -orx-pinned-vec = "3.2.0" +orx-pseudo-default = "1.2" +orx-pinned-vec = "3.3" [[bench]] name = "serial_access" diff --git a/src/common_traits/iterator/iter_con.rs b/src/common_traits/iterator/iter_con.rs deleted file mode 100644 index 37199a1..0000000 --- a/src/common_traits/iterator/iter_con.rs +++ /dev/null @@ -1,69 +0,0 @@ -use super::iter_fragment::FragmentIter; -use crate::fragment::fragment_struct::Fragment; -use std::iter::FusedIterator; - -/// Iterator over the `SplitVec`. -/// -/// This struct is created by `SplitVec::iter()` method. -#[derive(Debug, Clone)] -#[must_use = "iterators are lazy and do nothing unless consumed"] -pub struct IterCon<'a, T> { - num_fragments: usize, - last_fragment_len: usize, - fragments: &'a [Fragment], - inner: FragmentIter<'a, T>, - f: usize, -} - -impl<'a, T> IterCon<'a, T> { - pub(crate) fn new(fragments: &'a [Fragment], last_fragment_len: usize) -> Self { - assert!(!fragments.is_empty()); - - let num_fragments = fragments.len(); - - let first_fragment_len = match num_fragments { - 0 => 0, - 1 => last_fragment_len, - _ => fragments[0].capacity(), - }; - let inner = FragmentIter::new(&fragments[0], first_fragment_len); - - Self { - num_fragments, - last_fragment_len, - fragments, - inner, - f: 0, - } - } - - fn next_fragment(&mut self) -> Option<&'a T> { - match self.f + 1 < self.num_fragments { - true => { - self.f += 1; - let fragment_len = match self.f == self.num_fragments - 1 { - false => self.fragments[self.f].capacity(), - true => self.last_fragment_len, - }; - self.inner = FragmentIter::new(&self.fragments[self.f], fragment_len); - self.next() - } - false => None, - } - } -} - -impl<'a, T> Iterator for IterCon<'a, T> { - type Item = &'a T; - - #[inline(always)] - fn next(&mut self) -> Option { - let next_element = self.inner.next(); - match next_element.is_some() { - true => next_element, - false => self.next_fragment(), - } - } -} - -impl FusedIterator for IterCon<'_, T> {} diff --git a/src/common_traits/iterator/iter_fragment.rs b/src/common_traits/iterator/iter_fragment.rs deleted file mode 100644 index 145a61f..0000000 --- a/src/common_traits/iterator/iter_fragment.rs +++ /dev/null @@ -1,35 +0,0 @@ -use crate::Fragment; -use std::marker::PhantomData; - -#[derive(Debug, Clone)] -pub(crate) struct FragmentIter<'a, T> { - fragment: &'a [T], - i: usize, - phantom: PhantomData<&'a ()>, -} - -impl<'a, T> FragmentIter<'a, T> { - pub(crate) fn new(fragment: &'a Fragment, len: usize) -> Self { - let fragment = unsafe { std::slice::from_raw_parts(fragment.as_ptr(), len) }; - Self { - fragment, - i: 0, - phantom: PhantomData, - } - } -} - -impl<'a, T: 'a> Iterator for FragmentIter<'a, T> { - type Item = &'a T; - - fn next(&mut self) -> Option { - match self.i < self.fragment.len() { - true => { - let element = unsafe { self.fragment.get_unchecked(self.i) }; - self.i += 1; - Some(element) - } - false => None, - } - } -} diff --git a/src/common_traits/iterator/mod.rs b/src/common_traits/iterator/mod.rs index bf41ce9..bbaa4ed 100644 --- a/src/common_traits/iterator/mod.rs +++ b/src/common_traits/iterator/mod.rs @@ -2,8 +2,6 @@ mod eq; mod from_iter; pub(crate) mod into_iter; pub(crate) mod iter; -pub(crate) mod iter_con; -mod iter_fragment; pub(crate) mod iter_mut; pub(crate) mod iter_mut_rev; pub(crate) mod iter_rev; diff --git a/src/concurrent_pinned_vec.rs b/src/concurrent_pinned_vec.rs index 6546708..92874f2 100644 --- a/src/concurrent_pinned_vec.rs +++ b/src/concurrent_pinned_vec.rs @@ -1,5 +1,4 @@ use crate::{ - common_traits::iterator::iter_con::IterCon, fragment::fragment_struct::{ maximum_concurrent_capacity, num_fragments_for_capacity, set_fragments_len, }, @@ -44,7 +43,7 @@ impl Drop for ConcurrentSplitVec { impl ConcurrentSplitVec { fn num_fragments(&self) -> usize { - self.num_fragments.load(Ordering::Relaxed) + self.num_fragments.load(Ordering::SeqCst) } fn fragments(&self) -> &[Fragment] { @@ -124,6 +123,7 @@ impl ConcurrentPinnedVec for ConcurrentSp let growth = self.growth.clone(); // let (mut fragments, growth) = (self.fragments, self.growth.clone()); + SplitVec::from_raw_parts(len, fragments, growth) } @@ -164,6 +164,94 @@ impl ConcurrentPinnedVec for ConcurrentSp } } + fn grow_to_and_fill_with( + &self, + new_capacity: usize, + fill_with: F, + ) -> Result + where + F: Fn() -> T, + { + let capacity = self.capacity(); + match new_capacity <= capacity { + true => Ok(capacity), + false => { + let mut num_fragments = self.num_fragments(); + + let mut current_capacity = capacity; + + while new_capacity > current_capacity { + let new_fragment_capacity = self + .growth + .new_fragment_capacity(self.fragments_for(num_fragments)); + let mut new_fragment = Fragment::::new(new_fragment_capacity); + for _ in 0..new_fragment_capacity { + new_fragment.push(fill_with()); + } + + self.push_fragment(new_fragment, num_fragments); + + num_fragments += 1; + current_capacity += new_fragment_capacity; + } + + self.num_fragments.store(num_fragments, Ordering::SeqCst); + self.capacity.store(current_capacity, Ordering::SeqCst); + + Ok(current_capacity) + } + } + } + + fn slices>(&self, range: R) -> >::SliceIter<'_> { + use std::slice::from_raw_parts; + + let fragments = self.fragments(); + let fragment_and_inner_indices = + |i| self.growth.get_fragment_and_inner_indices_unchecked(i); + + let a = range_start(&range); + let b = range_end(&range, self.capacity()); + + match b.saturating_sub(a) { + 0 => vec![], + _ => { + let (sf, si) = fragment_and_inner_indices(a); + let (ef, ei) = fragment_and_inner_indices(b - 1); + + match sf == ef { + true => { + let p = self.fragment_element_ptr_mut(sf, si); + let slice = unsafe { from_raw_parts(p, ei - si + 1) }; + vec![slice] + } + false => { + let mut vec = Vec::with_capacity(ef - sf + 1); + + let slice_len = fragments[sf].capacity() - si; + let ptr_s = self.fragment_element_ptr_mut(sf, si); + let slice = unsafe { from_raw_parts(ptr_s, slice_len) }; + vec.push(slice); + + for (f, fragment) in fragments.iter().enumerate().take(ef).skip(sf + 1) { + let slice_len = fragment.capacity(); + let ptr_s = self.fragment_element_ptr_mut(f, 0); + let slice = unsafe { from_raw_parts(ptr_s, slice_len) }; + vec.push(slice); + } + + let slice_len = ei + 1; + let ptr_s = self.fragment_element_ptr_mut(ef, 0); + let slice = unsafe { from_raw_parts(ptr_s, slice_len) }; + vec.push(slice); + + vec + } + } + } + } + } + unsafe fn slices_mut>( &self, range: R, @@ -217,36 +305,62 @@ impl ConcurrentPinnedVec for ConcurrentSp where T: 'a, { + use std::slice::from_raw_parts; + let fragments = self.fragments(); - match len { - 0 => IterCon::new(&fragments[0..1], 0), + let fragment_and_inner_indices = + |i| self.growth.get_fragment_and_inner_indices_unchecked(i); + + let range = 0..len; + + let a = range_start(&range); + let b = range_end(&range, self.capacity()); + + let x = match b.saturating_sub(a) { + 0 => vec![], _ => { - // let mut num_fragments = 0; - let mut count = 0; - - for (num_fragments, fragment) in fragments.iter().enumerate() { - // for fragment in fragments.iter() { - // num_fragments += 1; - let capacity = fragment.capacity(); - let new_count = count + capacity; - - match new_count >= len { - true => { - let last_fragment_len = capacity - (new_count - len); - return IterCon::new( - &fragments[0..(num_fragments + 1)], - last_fragment_len, - ); + let (sf, si) = fragment_and_inner_indices(a); + let (ef, ei) = fragment_and_inner_indices(b - 1); + + match sf == ef { + true => { + let p = fragments[sf].as_ptr().add(si); + let slice = from_raw_parts(p, ei - si + 1); + vec![slice] + } + false => { + let mut vec = Vec::with_capacity(ef - sf + 1); + + if let Some(first) = fragments.get(sf) { + let slice_len = first.capacity() - si; + + let p = first.as_ptr().add(si); + let slice = from_raw_parts(p, slice_len); + vec.push(slice); + } + + for fragment in fragments.iter().take(ef).skip(sf + 1) { + let slice_len = fragment.capacity(); + let p = fragment.as_ptr(); + let slice = from_raw_parts(p, slice_len); + vec.push(slice); + } + + if let Some(last) = fragments.get(ef) { + let slice_len = ei + 1; + let p = last.as_ptr(); + let slice = from_raw_parts(p, slice_len); + vec.push(slice); } - false => count = new_count, + + vec } } - - let last_fragment_len = fragments[fragments.len() - 1].capacity(); - IterCon::new(fragments, last_fragment_len) } - } + }; + + x.into_iter().flat_map(|x| x.iter()) } unsafe fn iter_mut<'a>(&'a mut self, len: usize) -> impl Iterator + 'a diff --git a/src/growth/doubling/doubling_growth.rs b/src/growth/doubling/doubling_growth.rs index b9cefab..063f7ca 100644 --- a/src/growth/doubling/doubling_growth.rs +++ b/src/growth/doubling/doubling_growth.rs @@ -276,7 +276,7 @@ mod tests { let mut curr_capacity = 4; let mut cumulative_capacity = 4; - for index in 0..1111111 { + for index in 0..51_111 { if index == cumulative_capacity { prev_cumulative_capacity = cumulative_capacity; curr_capacity *= 2; diff --git a/src/growth/linear/linear_growth.rs b/src/growth/linear/linear_growth.rs index d3ccab5..c4d7b14 100644 --- a/src/growth/linear/linear_growth.rs +++ b/src/growth/linear/linear_growth.rs @@ -262,7 +262,7 @@ mod tests { let mut prev_cumulative_capacity = 0; let mut cumulative_capacity = curr_capacity; - for index in 0..1111111 { + for index in 0..51_111 { if index == cumulative_capacity { prev_cumulative_capacity = cumulative_capacity; cumulative_capacity += curr_capacity; diff --git a/src/growth/recursive/recursive_growth.rs b/src/growth/recursive/recursive_growth.rs index f5e5432..176912f 100644 --- a/src/growth/recursive/recursive_growth.rs +++ b/src/growth/recursive/recursive_growth.rs @@ -249,9 +249,9 @@ mod tests { let mut fragments: Vec> = vec![]; - let lengths = [30, 52, 14, 1, 7, 3, 79, 248, 147, 530]; + let lengths = [30, 1, 7, 3, 79, 147, 530]; let mut index = 0; - for _ in 0..100 { + for _ in 0..10 { for &len in &lengths { let mut vec = Vec::with_capacity(len); for _ in 0..len { @@ -266,7 +266,7 @@ mod tests { let mut index = 0; let mut f = 0; - for _ in 0..100 { + for _ in 0..10 { for &len in &lengths { for i in 0..len { let maybe_fi = diff --git a/src/into_concurrent_pinned_vec.rs b/src/into_concurrent_pinned_vec.rs index adcb263..e80d3be 100644 --- a/src/into_concurrent_pinned_vec.rs +++ b/src/into_concurrent_pinned_vec.rs @@ -7,4 +7,18 @@ impl IntoConcurrentPinnedVec for SplitVec fn into_concurrent(self) -> Self::ConPinnedVec { self.into() } + + fn into_concurrent_filled_with(mut self, fill_with: F) -> Self::ConPinnedVec + where + F: Fn() -> T, + { + if let Some(fragment) = self.fragments.last_mut() { + let (len, capacity) = (fragment.len(), fragment.capacity()); + for _ in len..capacity { + fragment.push(fill_with()); + } + } + + self.into() + } }