Skip to content

Commit

Permalink
adopt some SHM data structures to support NPO (needed as elegant way …
Browse files Browse the repository at this point in the history
…to equalize some data structure sizes in zenoh-c)
  • Loading branch information
yellowhatter committed Jun 27, 2024
1 parent fc18f90 commit 6bd1b78
Show file tree
Hide file tree
Showing 12 changed files with 123 additions and 76 deletions.
29 changes: 28 additions & 1 deletion commons/zenoh-codec/src/core/shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use std::num::NonZeroUsize;

use zenoh_buffers::{
reader::{DidntRead, Reader},
writer::{DidntWrite, Writer},
Expand Down Expand Up @@ -62,6 +64,18 @@ where
}
}

impl<W> WCodec<NonZeroUsize, &mut W> for Zenoh080
where
W: Writer,
{
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: NonZeroUsize) -> Self::Output {
self.write(&mut *writer, x.get())?;
Ok(())
}
}

impl<W> WCodec<&ShmBufInfo, &mut W> for Zenoh080
where
W: Writer,
Expand All @@ -80,7 +94,7 @@ where

self.write(&mut *writer, data_descriptor)?;
self.write(&mut *writer, shm_protocol)?;
self.write(&mut *writer, data_len)?;
self.write(&mut *writer, *data_len)?;
self.write(&mut *writer, watchdog_descriptor)?;
self.write(&mut *writer, header_descriptor)?;
self.write(&mut *writer, generation)?;
Expand Down Expand Up @@ -138,6 +152,19 @@ where
}
}

impl<R> RCodec<NonZeroUsize, &mut R> for Zenoh080
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<NonZeroUsize, Self::Error> {
let size: usize = self.read(&mut *reader)?;
let size = NonZeroUsize::new(size).ok_or(DidntRead)?;
Ok(size)
}
}

impl<R> RCodec<ShmBufInfo, &mut R> for Zenoh080
where
R: Reader,
Expand Down
2 changes: 1 addition & 1 deletion commons/zenoh-shm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,4 @@ lockfree = { workspace = true }
stabby = { workspace = true }

[dev-dependencies]
libc = { workspace = true }
libc = { workspace = true }
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::{
borrow::Borrow,
cmp,
collections::BinaryHeap,
num::NonZeroUsize,
sync::{
atomic::{AtomicPtr, AtomicUsize, Ordering},
Mutex,
Expand All @@ -31,7 +32,7 @@ use crate::api::{
provider::{
chunk::{AllocatedChunk, ChunkDescriptor},
shm_provider_backend::ShmProviderBackend,
types::{AllocAlignment, ChunkAllocResult, MemoryLayout, ZAllocError},
types::{AllocAlignment, ChunkAllocResult, MemoryLayout, ZAllocError, ZLayoutError},
},
};

Expand All @@ -45,7 +46,7 @@ const MIN_FREE_CHUNK_SIZE: usize = 1_024;
#[derive(Eq, Copy, Clone, Debug)]
struct Chunk {
offset: ChunkID,
size: usize,
size: NonZeroUsize,
}

impl Ord for Chunk {
Expand Down Expand Up @@ -86,8 +87,12 @@ impl PosixShmProviderBackendBuilder {
self,
size: usize,
alignment: AllocAlignment,
) -> ZResult<LayoutedPosixShmProviderBackendBuilder<MemoryLayout>> {
let layout = MemoryLayout::new(size, alignment)?;
) -> Result<LayoutedPosixShmProviderBackendBuilder<MemoryLayout>, ZLayoutError> {
let layout = MemoryLayout::new(
size.try_into()
.map_err(|_| ZLayoutError::IncorrectLayoutArgs)?,
alignment,
)?;
Ok(LayoutedPosixShmProviderBackendBuilder { layout })
}

Expand All @@ -96,8 +101,12 @@ impl PosixShmProviderBackendBuilder {
pub fn with_size(
self,
size: usize,
) -> ZResult<LayoutedPosixShmProviderBackendBuilder<MemoryLayout>> {
let layout = MemoryLayout::new(size, AllocAlignment::default())?;
) -> Result<LayoutedPosixShmProviderBackendBuilder<MemoryLayout>, ZLayoutError> {
let layout = MemoryLayout::new(
size.try_into()
.map_err(|_| ZLayoutError::IncorrectLayoutArgs)?,
AllocAlignment::default(),
)?;
Ok(LayoutedPosixShmProviderBackendBuilder { layout })
}
}
Expand Down Expand Up @@ -149,7 +158,7 @@ impl PosixShmProviderBackend {
);

Ok(Self {
available: AtomicUsize::new(layout.size()),
available: AtomicUsize::new(layout.size().get()),
segment,
free_list: Mutex::new(free_list),
alignment: layout.alignment(),
Expand All @@ -163,7 +172,7 @@ impl ShmProviderBackend for PosixShmProviderBackend {

let required_len = layout.size();

if self.available.load(Ordering::Relaxed) < required_len {
if self.available.load(Ordering::Relaxed) < required_len.get() {
tracing::trace!( "PosixShmProviderBackend does not have sufficient free memory to allocate {:?}, try de-fragmenting!", layout);
return Err(ZAllocError::OutOfMemory);
}
Expand All @@ -176,16 +185,20 @@ impl ShmProviderBackend for PosixShmProviderBackend {
Some(mut chunk) if chunk.size >= required_len => {
// NOTE: don't loose any chunks here, as it will lead to memory leak
tracing::trace!("Allocator selected Chunk ({:?})", &chunk);
if chunk.size - required_len >= MIN_FREE_CHUNK_SIZE {
if chunk.size.get() - required_len.get() >= MIN_FREE_CHUNK_SIZE {
let free_chunk = Chunk {
offset: chunk.offset + required_len as ChunkID,
size: chunk.size - required_len,
offset: chunk.offset + required_len.get() as ChunkID,
// SAFETY: this is safe because we always operate on a leftover, which is checked above!
size: unsafe {
NonZeroUsize::new_unchecked(chunk.size.get() - required_len.get())
},
};
tracing::trace!("The allocation will leave a Free Chunk: {:?}", &free_chunk);
guard.push(free_chunk);
chunk.size = required_len;
}
self.available.fetch_sub(chunk.size, Ordering::Relaxed);
self.available
.fetch_sub(chunk.size.get(), Ordering::Relaxed);

let descriptor =
ChunkDescriptor::new(self.segment.segment.id(), chunk.offset, chunk.size);
Expand Down Expand Up @@ -219,16 +232,18 @@ impl ShmProviderBackend for PosixShmProviderBackend {
offset: chunk.chunk,
size: chunk.len,
};
self.available.fetch_add(free_chunk.size, Ordering::Relaxed);
self.available
.fetch_add(free_chunk.size.get(), Ordering::Relaxed);
zlock!(self.free_list).push(free_chunk);
}

fn defragment(&self) -> usize {
fn try_merge_adjacent_chunks(a: &Chunk, b: &Chunk) -> Option<Chunk> {
let end_offset = a.offset as usize + a.size;
let end_offset = a.offset as usize + a.size.get();
if end_offset == b.offset as usize {
Some(Chunk {
size: a.size + b.size,
// SAFETY: this is safe because we operate on non-zero sizes and it will never overflow
size: unsafe { NonZeroUsize::new_unchecked(a.size.get() + b.size.get()) },
offset: a.offset,
})
} else {
Expand Down Expand Up @@ -256,7 +271,7 @@ impl ShmProviderBackend for PosixShmProviderBackend {
match try_merge_adjacent_chunks(&current, &next) {
Some(c) => {
current = c;
largest = largest.max(current.size);
largest = largest.max(current.size.get());
if i == n {
guard.push(current)
}
Expand All @@ -279,7 +294,7 @@ impl ShmProviderBackend for PosixShmProviderBackend {
self.available.load(Ordering::Relaxed)
}

fn layout_for(&self, layout: MemoryLayout) -> ZResult<MemoryLayout> {
fn layout_for(&self, layout: MemoryLayout) -> Result<MemoryLayout, ZLayoutError> {
layout.extend(self.alignment)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//

use std::sync::atomic::AtomicPtr;
use std::{num::NonZeroUsize, sync::atomic::AtomicPtr};

use zenoh_result::ZResult;

Expand All @@ -32,8 +32,8 @@ pub(crate) struct PosixShmSegment {
}

impl PosixShmSegment {
pub(crate) fn create(alloc_size: usize) -> ZResult<Self> {
let segment = ArrayInSHM::create(alloc_size, POSIX_SHM_SEGMENT_PREFIX)?;
pub(crate) fn create(alloc_size: NonZeroUsize) -> ZResult<Self> {
let segment = ArrayInSHM::create(alloc_size.get(), POSIX_SHM_SEGMENT_PREFIX)?;
Ok(Self { segment })
}

Expand Down
6 changes: 3 additions & 3 deletions commons/zenoh-shm/src/api/provider/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//

use std::sync::atomic::AtomicPtr;
use std::{num::NonZeroUsize, sync::atomic::AtomicPtr};

use crate::api::common::types::{ChunkID, SegmentID};

Expand All @@ -22,13 +22,13 @@ use crate::api::common::types::{ChunkID, SegmentID};
pub struct ChunkDescriptor {
pub segment: SegmentID,
pub chunk: ChunkID,
pub len: usize,
pub len: NonZeroUsize,
}

impl ChunkDescriptor {
/// Create a new Chunk Descriptor
#[zenoh_macros::unstable_doc]
pub fn new(segment: SegmentID, chunk: ChunkID, len: usize) -> Self {
pub fn new(segment: SegmentID, chunk: ChunkID, len: NonZeroUsize) -> Self {
Self {
segment,
chunk,
Expand Down
27 changes: 18 additions & 9 deletions commons/zenoh-shm/src/api/provider/shm_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::{
collections::VecDeque,
future::{Future, IntoFuture},
marker::PhantomData,
num::NonZeroUsize,
pin::Pin,
sync::{atomic::Ordering, Arc, Mutex},
time::Duration,
Expand Down Expand Up @@ -159,7 +160,7 @@ where
IDSource: ProtocolIDSource,
Backend: ShmProviderBackend,
{
size: usize,
size: NonZeroUsize,
provider_layout: MemoryLayout,
provider: &'a ShmProvider<IDSource, Backend>,
}
Expand All @@ -182,8 +183,14 @@ where
// NOTE: Depending on internal implementation, provider's backend might relayout
// the allocations for bigger alignment (ex. 4-byte aligned allocation to 8-bytes aligned)

// Obtain nonzero size
let nonzero_size = data
.size
.try_into()
.map_err(|_| ZLayoutError::IncorrectLayoutArgs)?;

// Create layout for specified arguments
let layout = MemoryLayout::new(data.size, data.alignment)
let layout = MemoryLayout::new(nonzero_size, data.alignment)
.map_err(|_| ZLayoutError::IncorrectLayoutArgs)?;

// Obtain provider's layout for our layout
Expand All @@ -194,7 +201,7 @@ where
.map_err(|_| ZLayoutError::ProviderIncompatibleLayout)?;

Ok(Self {
size: data.size,
size: nonzero_size,
provider_layout,
provider: data.provider,
})
Expand Down Expand Up @@ -320,7 +327,7 @@ where
let result = InnerPolicy::alloc(layout, provider);
if let Err(ZAllocError::OutOfMemory) = result {
// try to alloc again only if GC managed to reclaim big enough chunk
if provider.garbage_collect() >= layout.size() {
if provider.garbage_collect() >= layout.size().get() {
return AltPolicy::alloc(layout, provider);
}
}
Expand Down Expand Up @@ -352,7 +359,7 @@ where
let result = InnerPolicy::alloc(layout, provider);
if let Err(ZAllocError::NeedDefragment) = result {
// try to alloc again only if big enough chunk was defragmented
if provider.defragment() >= layout.size() {
if provider.defragment() >= layout.size().get() {
return AltPolicy::alloc(layout, provider);
}
}
Expand Down Expand Up @@ -803,6 +810,8 @@ where
/// Remember that chunk's len may be >= len!
#[zenoh_macros::unstable_doc]
pub fn map(&self, chunk: AllocatedChunk, len: usize) -> ZResult<ZShmMut> {
let len = len.try_into()?;

// allocate resources for SHM buffer
let (allocated_header, allocated_watchdog, confirmed_watchdog) = Self::alloc_resources()?;

Expand Down Expand Up @@ -837,7 +846,7 @@ where
if is_free_chunk(maybe_free) {
tracing::trace!("Garbage Collecting Chunk: {:?}", maybe_free);
self.backend.free(&maybe_free.descriptor);
largest = largest.max(maybe_free.descriptor.len);
largest = largest.max(maybe_free.descriptor.len.get());
return false;
}
true
Expand Down Expand Up @@ -868,7 +877,7 @@ where
}
}

fn alloc_inner<Policy>(&self, size: usize, layout: &MemoryLayout) -> BufAllocResult
fn alloc_inner<Policy>(&self, size: NonZeroUsize, layout: &MemoryLayout) -> BufAllocResult
where
Policy: AllocPolicy,
{
Expand Down Expand Up @@ -914,7 +923,7 @@ where
fn wrap(
&self,
chunk: AllocatedChunk,
len: usize,
len: NonZeroUsize,
allocated_header: AllocatedHeaderDescriptor,
allocated_watchdog: AllocatedWatchdog,
confirmed_watchdog: ConfirmedDescriptor,
Expand Down Expand Up @@ -971,7 +980,7 @@ where
{
async fn alloc_inner_async<Policy>(
&self,
size: usize,
size: NonZeroUsize,
backend_layout: &MemoryLayout,
) -> BufAllocResult
where
Expand Down
7 changes: 2 additions & 5 deletions commons/zenoh-shm/src/api/provider/shm_provider_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,9 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

use zenoh_result::ZResult;

use super::{
chunk::ChunkDescriptor,
types::{ChunkAllocResult, MemoryLayout},
types::{ChunkAllocResult, MemoryLayout, ZLayoutError},
};

/// The provider backend trait
Expand Down Expand Up @@ -48,5 +45,5 @@ pub trait ShmProviderBackend {
/// - validate, if the provided layout can be used with this backend
/// - adopt the layout for backend capabilities
#[zenoh_macros::unstable_doc]
fn layout_for(&self, layout: MemoryLayout) -> ZResult<MemoryLayout>;
fn layout_for(&self, layout: MemoryLayout) -> Result<MemoryLayout, ZLayoutError>;
}
Loading

0 comments on commit 6bd1b78

Please sign in to comment.