diff --git a/commons/zenoh-codec/src/core/shm.rs b/commons/zenoh-codec/src/core/shm.rs index 4f272f0ed4..b67716611d 100644 --- a/commons/zenoh-codec/src/core/shm.rs +++ b/commons/zenoh-codec/src/core/shm.rs @@ -11,6 +11,8 @@ // Contributors: // ZettaScale Zenoh Team, // +use std::num::NonZeroUsize; + use zenoh_buffers::{ reader::{DidntRead, Reader}, writer::{DidntWrite, Writer}, @@ -62,6 +64,18 @@ where } } +impl WCodec for Zenoh080 +where + W: Writer, +{ + type Output = Result<(), DidntWrite>; + + fn write(self, writer: &mut W, x: NonZeroUsize) -> Self::Output { + self.write(&mut *writer, x.get())?; + Ok(()) + } +} + impl WCodec<&ShmBufInfo, &mut W> for Zenoh080 where W: Writer, @@ -80,7 +94,7 @@ where self.write(&mut *writer, data_descriptor)?; self.write(&mut *writer, shm_protocol)?; - self.write(&mut *writer, data_len)?; + self.write(&mut *writer, *data_len)?; self.write(&mut *writer, watchdog_descriptor)?; self.write(&mut *writer, header_descriptor)?; self.write(&mut *writer, generation)?; @@ -138,6 +152,19 @@ where } } +impl RCodec for Zenoh080 +where + R: Reader, +{ + type Error = DidntRead; + + fn read(self, reader: &mut R) -> Result { + let size: usize = self.read(&mut *reader)?; + let size = NonZeroUsize::new(size).ok_or(DidntRead)?; + Ok(size) + } +} + impl RCodec for Zenoh080 where R: Reader, diff --git a/commons/zenoh-shm/Cargo.toml b/commons/zenoh-shm/Cargo.toml index a76cf896d3..5e3dec390e 100644 --- a/commons/zenoh-shm/Cargo.toml +++ b/commons/zenoh-shm/Cargo.toml @@ -52,4 +52,4 @@ lockfree = { workspace = true } stabby = { workspace = true } [dev-dependencies] -libc = { workspace = true } +libc = { workspace = true } \ No newline at end of file diff --git a/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_provider_backend.rs b/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_provider_backend.rs index 7de9e9f22d..f1680235c8 100644 --- a/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_provider_backend.rs +++ b/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_provider_backend.rs @@ -16,6 +16,7 @@ use std::{ borrow::Borrow, cmp, collections::BinaryHeap, + num::NonZeroUsize, sync::{ atomic::{AtomicPtr, AtomicUsize, Ordering}, Mutex, @@ -31,7 +32,7 @@ use crate::api::{ provider::{ chunk::{AllocatedChunk, ChunkDescriptor}, shm_provider_backend::ShmProviderBackend, - types::{AllocAlignment, ChunkAllocResult, MemoryLayout, ZAllocError}, + types::{AllocAlignment, ChunkAllocResult, MemoryLayout, ZAllocError, ZLayoutError}, }, }; @@ -45,7 +46,7 @@ const MIN_FREE_CHUNK_SIZE: usize = 1_024; #[derive(Eq, Copy, Clone, Debug)] struct Chunk { offset: ChunkID, - size: usize, + size: NonZeroUsize, } impl Ord for Chunk { @@ -86,8 +87,12 @@ impl PosixShmProviderBackendBuilder { self, size: usize, alignment: AllocAlignment, - ) -> ZResult> { - let layout = MemoryLayout::new(size, alignment)?; + ) -> Result, ZLayoutError> { + let layout = MemoryLayout::new( + size.try_into() + .map_err(|_| ZLayoutError::IncorrectLayoutArgs)?, + alignment, + )?; Ok(LayoutedPosixShmProviderBackendBuilder { layout }) } @@ -96,8 +101,12 @@ impl PosixShmProviderBackendBuilder { pub fn with_size( self, size: usize, - ) -> ZResult> { - let layout = MemoryLayout::new(size, AllocAlignment::default())?; + ) -> Result, ZLayoutError> { + let layout = MemoryLayout::new( + size.try_into() + .map_err(|_| ZLayoutError::IncorrectLayoutArgs)?, + AllocAlignment::default(), + )?; Ok(LayoutedPosixShmProviderBackendBuilder { layout }) } } @@ -149,7 +158,7 @@ impl PosixShmProviderBackend { ); Ok(Self { - available: AtomicUsize::new(layout.size()), + available: AtomicUsize::new(layout.size().get()), segment, free_list: Mutex::new(free_list), alignment: layout.alignment(), @@ -163,7 +172,7 @@ impl ShmProviderBackend for PosixShmProviderBackend { let required_len = layout.size(); - if self.available.load(Ordering::Relaxed) < required_len { + if self.available.load(Ordering::Relaxed) < required_len.get() { tracing::trace!( "PosixShmProviderBackend does not have sufficient free memory to allocate {:?}, try de-fragmenting!", layout); return Err(ZAllocError::OutOfMemory); } @@ -176,16 +185,20 @@ impl ShmProviderBackend for PosixShmProviderBackend { Some(mut chunk) if chunk.size >= required_len => { // NOTE: don't loose any chunks here, as it will lead to memory leak tracing::trace!("Allocator selected Chunk ({:?})", &chunk); - if chunk.size - required_len >= MIN_FREE_CHUNK_SIZE { + if chunk.size.get() - required_len.get() >= MIN_FREE_CHUNK_SIZE { let free_chunk = Chunk { - offset: chunk.offset + required_len as ChunkID, - size: chunk.size - required_len, + offset: chunk.offset + required_len.get() as ChunkID, + // SAFETY: this is safe because we always operate on a leftover, which is checked above! + size: unsafe { + NonZeroUsize::new_unchecked(chunk.size.get() - required_len.get()) + }, }; tracing::trace!("The allocation will leave a Free Chunk: {:?}", &free_chunk); guard.push(free_chunk); chunk.size = required_len; } - self.available.fetch_sub(chunk.size, Ordering::Relaxed); + self.available + .fetch_sub(chunk.size.get(), Ordering::Relaxed); let descriptor = ChunkDescriptor::new(self.segment.segment.id(), chunk.offset, chunk.size); @@ -219,16 +232,18 @@ impl ShmProviderBackend for PosixShmProviderBackend { offset: chunk.chunk, size: chunk.len, }; - self.available.fetch_add(free_chunk.size, Ordering::Relaxed); + self.available + .fetch_add(free_chunk.size.get(), Ordering::Relaxed); zlock!(self.free_list).push(free_chunk); } fn defragment(&self) -> usize { fn try_merge_adjacent_chunks(a: &Chunk, b: &Chunk) -> Option { - let end_offset = a.offset as usize + a.size; + let end_offset = a.offset as usize + a.size.get(); if end_offset == b.offset as usize { Some(Chunk { - size: a.size + b.size, + // SAFETY: this is safe because we operate on non-zero sizes and it will never overflow + size: unsafe { NonZeroUsize::new_unchecked(a.size.get() + b.size.get()) }, offset: a.offset, }) } else { @@ -256,7 +271,7 @@ impl ShmProviderBackend for PosixShmProviderBackend { match try_merge_adjacent_chunks(¤t, &next) { Some(c) => { current = c; - largest = largest.max(current.size); + largest = largest.max(current.size.get()); if i == n { guard.push(current) } @@ -279,7 +294,7 @@ impl ShmProviderBackend for PosixShmProviderBackend { self.available.load(Ordering::Relaxed) } - fn layout_for(&self, layout: MemoryLayout) -> ZResult { + fn layout_for(&self, layout: MemoryLayout) -> Result { layout.extend(self.alignment) } } diff --git a/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_segment.rs b/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_segment.rs index dd103462e4..3a08d2be55 100644 --- a/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_segment.rs +++ b/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_segment.rs @@ -12,7 +12,7 @@ // ZettaScale Zenoh Team, // -use std::sync::atomic::AtomicPtr; +use std::{num::NonZeroUsize, sync::atomic::AtomicPtr}; use zenoh_result::ZResult; @@ -32,8 +32,8 @@ pub(crate) struct PosixShmSegment { } impl PosixShmSegment { - pub(crate) fn create(alloc_size: usize) -> ZResult { - let segment = ArrayInSHM::create(alloc_size, POSIX_SHM_SEGMENT_PREFIX)?; + pub(crate) fn create(alloc_size: NonZeroUsize) -> ZResult { + let segment = ArrayInSHM::create(alloc_size.get(), POSIX_SHM_SEGMENT_PREFIX)?; Ok(Self { segment }) } diff --git a/commons/zenoh-shm/src/api/provider/chunk.rs b/commons/zenoh-shm/src/api/provider/chunk.rs index 939758a345..fe7d0d5cb6 100644 --- a/commons/zenoh-shm/src/api/provider/chunk.rs +++ b/commons/zenoh-shm/src/api/provider/chunk.rs @@ -12,7 +12,7 @@ // ZettaScale Zenoh Team, // -use std::sync::atomic::AtomicPtr; +use std::{num::NonZeroUsize, sync::atomic::AtomicPtr}; use crate::api::common::types::{ChunkID, SegmentID}; @@ -22,13 +22,13 @@ use crate::api::common::types::{ChunkID, SegmentID}; pub struct ChunkDescriptor { pub segment: SegmentID, pub chunk: ChunkID, - pub len: usize, + pub len: NonZeroUsize, } impl ChunkDescriptor { /// Create a new Chunk Descriptor #[zenoh_macros::unstable_doc] - pub fn new(segment: SegmentID, chunk: ChunkID, len: usize) -> Self { + pub fn new(segment: SegmentID, chunk: ChunkID, len: NonZeroUsize) -> Self { Self { segment, chunk, diff --git a/commons/zenoh-shm/src/api/provider/shm_provider.rs b/commons/zenoh-shm/src/api/provider/shm_provider.rs index 8773498b61..f23bc1ead6 100644 --- a/commons/zenoh-shm/src/api/provider/shm_provider.rs +++ b/commons/zenoh-shm/src/api/provider/shm_provider.rs @@ -16,6 +16,7 @@ use std::{ collections::VecDeque, future::{Future, IntoFuture}, marker::PhantomData, + num::NonZeroUsize, pin::Pin, sync::{atomic::Ordering, Arc, Mutex}, time::Duration, @@ -159,7 +160,7 @@ where IDSource: ProtocolIDSource, Backend: ShmProviderBackend, { - size: usize, + size: NonZeroUsize, provider_layout: MemoryLayout, provider: &'a ShmProvider, } @@ -182,8 +183,14 @@ where // NOTE: Depending on internal implementation, provider's backend might relayout // the allocations for bigger alignment (ex. 4-byte aligned allocation to 8-bytes aligned) + // Obtain nonzero size + let nonzero_size = data + .size + .try_into() + .map_err(|_| ZLayoutError::IncorrectLayoutArgs)?; + // Create layout for specified arguments - let layout = MemoryLayout::new(data.size, data.alignment) + let layout = MemoryLayout::new(nonzero_size, data.alignment) .map_err(|_| ZLayoutError::IncorrectLayoutArgs)?; // Obtain provider's layout for our layout @@ -194,7 +201,7 @@ where .map_err(|_| ZLayoutError::ProviderIncompatibleLayout)?; Ok(Self { - size: data.size, + size: nonzero_size, provider_layout, provider: data.provider, }) @@ -320,7 +327,7 @@ where let result = InnerPolicy::alloc(layout, provider); if let Err(ZAllocError::OutOfMemory) = result { // try to alloc again only if GC managed to reclaim big enough chunk - if provider.garbage_collect() >= layout.size() { + if provider.garbage_collect() >= layout.size().get() { return AltPolicy::alloc(layout, provider); } } @@ -352,7 +359,7 @@ where let result = InnerPolicy::alloc(layout, provider); if let Err(ZAllocError::NeedDefragment) = result { // try to alloc again only if big enough chunk was defragmented - if provider.defragment() >= layout.size() { + if provider.defragment() >= layout.size().get() { return AltPolicy::alloc(layout, provider); } } @@ -803,6 +810,8 @@ where /// Remember that chunk's len may be >= len! #[zenoh_macros::unstable_doc] pub fn map(&self, chunk: AllocatedChunk, len: usize) -> ZResult { + let len = len.try_into()?; + // allocate resources for SHM buffer let (allocated_header, allocated_watchdog, confirmed_watchdog) = Self::alloc_resources()?; @@ -837,7 +846,7 @@ where if is_free_chunk(maybe_free) { tracing::trace!("Garbage Collecting Chunk: {:?}", maybe_free); self.backend.free(&maybe_free.descriptor); - largest = largest.max(maybe_free.descriptor.len); + largest = largest.max(maybe_free.descriptor.len.get()); return false; } true @@ -868,7 +877,7 @@ where } } - fn alloc_inner(&self, size: usize, layout: &MemoryLayout) -> BufAllocResult + fn alloc_inner(&self, size: NonZeroUsize, layout: &MemoryLayout) -> BufAllocResult where Policy: AllocPolicy, { @@ -914,7 +923,7 @@ where fn wrap( &self, chunk: AllocatedChunk, - len: usize, + len: NonZeroUsize, allocated_header: AllocatedHeaderDescriptor, allocated_watchdog: AllocatedWatchdog, confirmed_watchdog: ConfirmedDescriptor, @@ -971,7 +980,7 @@ where { async fn alloc_inner_async( &self, - size: usize, + size: NonZeroUsize, backend_layout: &MemoryLayout, ) -> BufAllocResult where diff --git a/commons/zenoh-shm/src/api/provider/shm_provider_backend.rs b/commons/zenoh-shm/src/api/provider/shm_provider_backend.rs index 933940cac1..51795f5880 100644 --- a/commons/zenoh-shm/src/api/provider/shm_provider_backend.rs +++ b/commons/zenoh-shm/src/api/provider/shm_provider_backend.rs @@ -11,12 +11,9 @@ // Contributors: // ZettaScale Zenoh Team, // - -use zenoh_result::ZResult; - use super::{ chunk::ChunkDescriptor, - types::{ChunkAllocResult, MemoryLayout}, + types::{ChunkAllocResult, MemoryLayout, ZLayoutError}, }; /// The provider backend trait @@ -48,5 +45,5 @@ pub trait ShmProviderBackend { /// - validate, if the provided layout can be used with this backend /// - adopt the layout for backend capabilities #[zenoh_macros::unstable_doc] - fn layout_for(&self, layout: MemoryLayout) -> ZResult; + fn layout_for(&self, layout: MemoryLayout) -> Result; } diff --git a/commons/zenoh-shm/src/api/provider/types.rs b/commons/zenoh-shm/src/api/provider/types.rs index 603c4a481a..0466f2a554 100644 --- a/commons/zenoh-shm/src/api/provider/types.rs +++ b/commons/zenoh-shm/src/api/provider/types.rs @@ -12,9 +12,7 @@ // ZettaScale Zenoh Team, // -use std::fmt::Display; - -use zenoh_result::{bail, ZResult}; +use std::{fmt::Display, num::NonZeroUsize}; use super::chunk::AllocatedChunk; use crate::api::buffer::zshmmut::ZShmMut; @@ -61,14 +59,18 @@ impl Default for AllocAlignment { impl AllocAlignment { #[zenoh_macros::unstable_doc] - pub fn new(pow: u8) -> Self { - Self { pow } + pub fn new(pow: u8) -> Result { + match pow { + pow if pow < 64 => Ok(Self { pow }), + _ => Err(ZLayoutError::IncorrectLayoutArgs), + } } /// Get alignment in normal units (bytes) #[zenoh_macros::unstable_doc] - pub fn get_alignment_value(&self) -> usize { - 1usize << self.pow + pub fn get_alignment_value(&self) -> NonZeroUsize { + // SAFETY: this is safe + unsafe { NonZeroUsize::new_unchecked(1usize << self.pow) } } /// Align size according to inner alignment. @@ -84,11 +86,13 @@ impl AllocAlignment { /// assert_eq!(aligned_size, 8); /// ``` #[zenoh_macros::unstable_doc] - pub fn align_size(&self, size: usize) -> usize { + pub fn align_size(&self, size: NonZeroUsize) -> NonZeroUsize { let alignment = self.get_alignment_value(); - match size % alignment { + match size.get() % alignment { 0 => size, - remainder => size + (alignment - remainder), + remainder => unsafe { + NonZeroUsize::new_unchecked(size.get() + (alignment.get() - remainder)) + }, } } } @@ -97,7 +101,7 @@ impl AllocAlignment { #[zenoh_macros::unstable_doc] #[derive(Debug)] pub struct MemoryLayout { - size: usize, + size: NonZeroUsize, alignment: AllocAlignment, } @@ -113,16 +117,16 @@ impl Display for MemoryLayout { impl MemoryLayout { /// Try to create a new memory layout #[zenoh_macros::unstable_doc] - pub fn new(size: usize, alignment: AllocAlignment) -> ZResult { + pub fn new(size: NonZeroUsize, alignment: AllocAlignment) -> Result { // size of an allocation must be a miltiple of it's alignment! - match size % alignment.get_alignment_value() { + match size.get() % alignment.get_alignment_value() { 0 => Ok(Self { size, alignment }), - _ => bail!("size of an allocation must be a miltiple of it's alignment!"), + _ => Err(ZLayoutError::IncorrectLayoutArgs), } } #[zenoh_macros::unstable_doc] - pub fn size(&self) -> usize { + pub fn size(&self) -> NonZeroUsize { self.size } @@ -150,16 +154,12 @@ impl MemoryLayout { /// assert!(layout8b.is_ok()); // ok /// ``` #[zenoh_macros::unstable_doc] - pub fn extend(&self, new_alignment: AllocAlignment) -> ZResult { + pub fn extend(&self, new_alignment: AllocAlignment) -> Result { if self.alignment <= new_alignment { let new_size = new_alignment.align_size(self.size); return MemoryLayout::new(new_size, new_alignment); } - bail!( - "Cannot extend alignment form {} to {}: new alignment must be >= old!", - self.alignment, - new_alignment - ) + Err(ZLayoutError::IncorrectLayoutArgs) } } diff --git a/commons/zenoh-shm/src/lib.rs b/commons/zenoh-shm/src/lib.rs index 19f8a1c76f..8ec2458931 100644 --- a/commons/zenoh-shm/src/lib.rs +++ b/commons/zenoh-shm/src/lib.rs @@ -13,6 +13,7 @@ // use std::{ any::Any, + num::NonZeroUsize, sync::{ atomic::{AtomicPtr, Ordering}, Arc, @@ -62,7 +63,7 @@ pub struct ShmBufInfo { /// Actual data length /// NOTE: data_descriptor's len is >= of this len and describes the actual memory length /// dedicated in shared memory segment for this particular buffer. - pub data_len: usize, + pub data_len: NonZeroUsize, /// The watchdog descriptor pub watchdog_descriptor: Descriptor, @@ -76,7 +77,7 @@ impl ShmBufInfo { pub fn new( data_descriptor: ChunkDescriptor, shm_protocol: ProtocolID, - data_len: usize, + data_len: NonZeroUsize, watchdog_descriptor: Descriptor, header_descriptor: HeaderDescriptor, generation: u32, @@ -122,14 +123,10 @@ impl std::fmt::Debug for ShmBufInner { } impl ShmBufInner { - pub fn len(&self) -> usize { + pub fn len(&self) -> NonZeroUsize { self.info.data_len } - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - fn is_valid(&self) -> bool { self.header.header().generation.load(Ordering::SeqCst) == self.info.generation } @@ -156,7 +153,7 @@ impl ShmBufInner { fn as_slice(&self) -> &[u8] { tracing::trace!("ShmBufInner::as_slice() == len = {:?}", self.info.data_len); let bp = self.buf.load(Ordering::SeqCst); - unsafe { std::slice::from_raw_parts(bp, self.info.data_len) } + unsafe { std::slice::from_raw_parts(bp, self.info.data_len.get()) } } unsafe fn dec_ref_count(&self) { @@ -176,7 +173,7 @@ impl ShmBufInner { /// guarantee that your in applications only one process at the time will actually write. unsafe fn as_mut_slice_inner(&mut self) -> &mut [u8] { let bp = self.buf.load(Ordering::SeqCst); - std::slice::from_raw_parts_mut(bp, self.info.data_len) + std::slice::from_raw_parts_mut(bp, self.info.data_len.get()) } } diff --git a/commons/zenoh-shm/tests/posix_shm_provider.rs b/commons/zenoh-shm/tests/posix_shm_provider.rs index 60104be6cf..bb0f03d5de 100644 --- a/commons/zenoh-shm/tests/posix_shm_provider.rs +++ b/commons/zenoh-shm/tests/posix_shm_provider.rs @@ -43,7 +43,7 @@ fn posix_shm_provider_alloc() { .res() .expect("Error creating PosixShmProviderBackend!"); - let layout = MemoryLayout::new(100, AllocAlignment::default()).unwrap(); + let layout = MemoryLayout::new(100.try_into().unwrap(), AllocAlignment::default()).unwrap(); let _buf = backend .alloc(&layout) @@ -58,7 +58,7 @@ fn posix_shm_provider_open() { .res() .expect("Error creating PosixShmProviderBackend!"); - let layout = MemoryLayout::new(100, AllocAlignment::default()).unwrap(); + let layout = MemoryLayout::new(100.try_into().unwrap(), AllocAlignment::default()).unwrap(); let buf = backend .alloc(&layout) @@ -79,7 +79,8 @@ fn posix_shm_provider_allocator() { .res() .expect("Error creating PosixShmProviderBackend!"); - let layout = MemoryLayout::new(BUFFER_SIZE, AllocAlignment::default()).unwrap(); + let layout = + MemoryLayout::new(BUFFER_SIZE.try_into().unwrap(), AllocAlignment::default()).unwrap(); // exaust memory by allocating it all let mut buffers = vec![]; diff --git a/examples/examples/z_alloc_shm.rs b/examples/examples/z_alloc_shm.rs index eceb74f35b..e96ca7dab1 100644 --- a/examples/examples/z_alloc_shm.rs +++ b/examples/examples/z_alloc_shm.rs @@ -54,7 +54,7 @@ async fn run() -> ZResult<()> { // OPTION: Allocation with custom alignment and alloc policy customization let _comprehensive = provider .alloc(512) - .with_alignment(AllocAlignment::new(2)) + .with_alignment(AllocAlignment::new(2).unwrap()) // for more examples on policies, please see allocation policy usage below (for layout allocation API) .with_policy::() .wait() @@ -63,7 +63,7 @@ async fn run() -> ZResult<()> { // OPTION: Allocation with custom alignment and async alloc policy let _async = provider .alloc(512) - .with_alignment(AllocAlignment::new(2)) + .with_alignment(AllocAlignment::new(2).unwrap()) // for more examples on policies, please see allocation policy usage below (for layout allocation API) .with_policy::>>() .await @@ -83,7 +83,7 @@ async fn run() -> ZResult<()> { // OPTION: Comprehensive configuration: let _comprehensive_layout = provider .alloc(512) - .with_alignment(AllocAlignment::new(2)) + .with_alignment(AllocAlignment::new(2).unwrap()) .into_layout() .unwrap(); diff --git a/examples/examples/z_posix_shm_provider.rs b/examples/examples/z_posix_shm_provider.rs index 7c68d56bd3..f8eca6f14f 100644 --- a/examples/examples/z_posix_shm_provider.rs +++ b/examples/examples/z_posix_shm_provider.rs @@ -29,7 +29,8 @@ fn main() { let provider_alignment = AllocAlignment::default(); // A layout for POSIX Provider's memory - let provider_layout = MemoryLayout::new(size, provider_alignment).unwrap(); + let provider_layout = + MemoryLayout::new(size.try_into().unwrap(), provider_alignment).unwrap(); // Build a provider backend PosixShmProviderBackend::builder()