From 07316267cea5797628aeb3e73d1f0505c833a6bb Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Tue, 3 Dec 2024 11:32:25 +0300 Subject: [PATCH 1/2] WIP on SHM channels --- commons/zenoh-shm/Cargo.toml | 1 + commons/zenoh-shm/src/channel/mod.rs | 206 +++++++++++++++++++++++++++ commons/zenoh-shm/src/lib.rs | 1 + 3 files changed, 208 insertions(+) create mode 100644 commons/zenoh-shm/src/channel/mod.rs diff --git a/commons/zenoh-shm/Cargo.toml b/commons/zenoh-shm/Cargo.toml index 8dbcb7c6b..442b072e2 100644 --- a/commons/zenoh-shm/Cargo.toml +++ b/commons/zenoh-shm/Cargo.toml @@ -47,6 +47,7 @@ num-traits = { workspace = true } num_cpus = { workspace = true, optional = true } thread-priority = { workspace = true } lockfree = { workspace = true } +libc = { workspace = true } stabby = { workspace = true } [target.'cfg(unix)'.dependencies] diff --git a/commons/zenoh-shm/src/channel/mod.rs b/commons/zenoh-shm/src/channel/mod.rs new file mode 100644 index 000000000..c1812975b --- /dev/null +++ b/commons/zenoh-shm/src/channel/mod.rs @@ -0,0 +1,206 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use libc::memcpy; +use stabby::IStable; +use std::marker::PhantomData; +use std::mem::ManuallyDrop; +use std::mem::{align_of, size_of}; +use std::sync::atomic::{AtomicPtr, AtomicUsize}; +use zenoh_core::zerror; +use zenoh_result::{ZError, ZResult}; + +use std::sync::atomic::Ordering::Relaxed; + +#[repr(C, packed(1))] +struct MessageHeader { + len: usize, +} + +/* +________________________________________________________________________ +| header |-opt-padding-| elem | elem | ..... | + +*/ + +struct InnerLayout> { + _phantom: PhantomData, +} + +impl> InnerLayout { + const fn header_with_padding() -> usize { + size_of::() + Self::header_padding() + } + + const fn header_padding() -> usize { + if size_of::() > align_of::() { + return (align_of::() - (size_of::() % align_of::())) + % align_of::(); + } else if size_of::() < align_of::() { + return align_of::() - size_of::(); + } + 0 + } + + #[inline(always)] + fn byte_len(msg: &[T]) -> usize { + std::mem::size_of_val(msg) + } + + #[inline(always)] + fn elem_len(byte_len: usize) -> usize { + byte_len / size_of::() + } +} + +pub struct ChannelData<'a, T: IStable + 'a> { + // free data in storage units + pub free: &'a mut AtomicUsize, + + pub pop_pos: usize, + pub push_pos: usize, + + /* + Shared data structure: + ___________________________________________________ + | 1. self.data | 2. rollover | 3. --- | self.free | + + 1: + - aligned for T + - size is a multiply of alignment + 2: same layout as 1 + 3: padding to align self.free + 4: properly aligned + + */ + pub data: &'a mut [u8], + + _phantom: PhantomData, +} + +impl<'a, T: IStable + 'a> ChannelData<'a, T> { + fn push(&mut self, msg: &[T]) -> bool { + let bytes_to_store = + InnerLayout::::header_with_padding() + InnerLayout::::byte_len(msg); + + if self.free.load(Relaxed) < bytes_to_store { + return false; + } + + let header = self.data[self.push_pos] as *mut u8 as *mut MessageHeader; + + unsafe { + let data = ((&mut self.data[self.push_pos]) as *mut u8) + .add(InnerLayout::::header_with_padding()); + + (*header).len = bytes_to_store; + memcpy( + data as *mut libc::c_void, + msg.as_ptr() as *const libc::c_void, + bytes_to_store - InnerLayout::::header_with_padding(), + ); + }; + self.free + .fetch_sub(bytes_to_store, std::sync::atomic::Ordering::SeqCst); + + let new_pos = self.push_pos + bytes_to_store; + if new_pos >= self.data.len() { + self.push_pos = 0; + } else { + self.push_pos = new_pos; + } + + true + } + + fn pop(&mut self) -> Option> { + if self.data.len() == self.free.load(Relaxed) { + return None; + } + + let header = self.data[self.pop_pos] as *const MessageHeader; + let len = unsafe { (*header).len }; + + let mut result = Vec::with_capacity(InnerLayout::::elem_len( + len - InnerLayout::::header_with_padding(), + )); + unsafe { + let data = ((&self.data[self.pop_pos]) as *const u8) + .add(InnerLayout::::header_with_padding()); + + memcpy( + result.spare_capacity_mut().as_ptr() as *mut libc::c_void, + data as *const libc::c_void, + len - InnerLayout::::header_with_padding(), + ); + result.set_len(len - InnerLayout::::header_with_padding()); + }; + + self.free + .fetch_add(len, std::sync::atomic::Ordering::SeqCst); + + let new_pos = self.pop_pos + len; + if new_pos >= self.data.len() { + self.pop_pos = 0; + } else { + self.pop_pos = new_pos; + } + + Some(result) + } +} + +pub struct Channel<'a, T: IStable> { + data: ChannelData<'a, T>, +} + +impl<'a, T: IStable> Channel<'a, T> { + pub fn create(data: TryIntoData) -> ZResult> + where + TryIntoData: TryInto>, + >>::Error: Into, + { + let data = data.try_into().map_err(|e| e.into())?; + Ok(Tx { data }) + } + + pub fn open(data: TryIntoData) -> ZResult> + where + TryIntoData: TryInto>, + >>::Error: Into, + { + let data = data.try_into().map_err(|e| e.into())?; + Ok(Rx { data }) + } +} + +pub struct Tx<'a, T: IStable> { + data: ChannelData<'a, T>, +} + +impl<'a, T: IStable> Tx<'a, T> { + pub fn send(&mut self, msg: &[T]) -> bool { + self.data.push(msg) + } +} + +pub struct Rx<'a, T: IStable> { + data: ChannelData<'a, T>, +} + +impl<'a, T: IStable> Rx<'a, T> { + pub fn receive(&mut self) -> Option> { + self.data.pop() + } +} diff --git a/commons/zenoh-shm/src/lib.rs b/commons/zenoh-shm/src/lib.rs index 36f2068db..0b29a9d47 100644 --- a/commons/zenoh-shm/src/lib.rs +++ b/commons/zenoh-shm/src/lib.rs @@ -57,6 +57,7 @@ pub mod header; pub mod posix_shm; pub mod reader; pub mod watchdog; +pub mod channel; /// Information about a [`ShmBufInner`]. /// From 7b7610ed69b624307edabf26eec2846d05c5aa39 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Mon, 16 Dec 2024 11:46:45 +0300 Subject: [PATCH 2/2] wip on SHM channel --- commons/zenoh-shm/src/channel/mod.rs | 57 +++++++++++++++++++++------- 1 file changed, 44 insertions(+), 13 deletions(-) diff --git a/commons/zenoh-shm/src/channel/mod.rs b/commons/zenoh-shm/src/channel/mod.rs index c1812975b..053d36f80 100644 --- a/commons/zenoh-shm/src/channel/mod.rs +++ b/commons/zenoh-shm/src/channel/mod.rs @@ -33,22 +33,21 @@ ________________________________________________________________________ | header |-opt-padding-| elem | elem | ..... | */ - struct InnerLayout> { _phantom: PhantomData, } impl> InnerLayout { - const fn header_with_padding() -> usize { - size_of::() + Self::header_padding() + const fn header_with_padding() -> usize { + size_of::() + Self::header_padding::() } - const fn header_padding() -> usize { - if size_of::() > align_of::() { - return (align_of::() - (size_of::() % align_of::())) + const fn header_padding() -> usize { + if size_of::() > align_of::() { + return (align_of::() - (size_of::() % align_of::())) % align_of::(); - } else if size_of::() < align_of::() { - return align_of::() - size_of::(); + } else if size_of::() < align_of::() { + return align_of::() - size_of::(); } 0 } @@ -64,25 +63,31 @@ impl> InnerLayout { } } + +#[repr(C)] +struct ChannelHeader { + // free data in storage units + pub free: AtomicUsize, +} + pub struct ChannelData<'a, T: IStable + 'a> { // free data in storage units - pub free: &'a mut AtomicUsize, + pub header: pub pop_pos: usize, pub push_pos: usize, /* Shared data structure: - ___________________________________________________ - | 1. self.data | 2. rollover | 3. --- | self.free | + ______________________________________________________ + | 1. self.data | 2. rollover | 3. --- | 4. self.free | 1: - aligned for T - size is a multiply of alignment 2: same layout as 1 3: padding to align self.free - 4: properly aligned - + 4: properly aligned self.free */ pub data: &'a mut [u8], @@ -204,3 +209,29 @@ impl<'a, T: IStable> Rx<'a, T> { self.data.pop() } } + +impl<'a, T: IStable + 'a> TryInto> for &'a mut [T] { + type Error = (); + + fn try_into(self) -> Result, Self::Error> { + // todo: validate alignment for T + + let total_bytes = InnerLayout::byte_len(self); + + let header_with_padding = InnerLayout::header_with_padding::() + + let data_bytes = total_bytes / 2 - size_of::(); + + + + let data = self + + + let data_len = data_bytes / size_of::(); + + let data = &mut self[0..data_len]; + let rollover = &mut self[data_len..data_len*2]; + let free = (&mut self[data_len*2]) as *mut T + + } +} \ No newline at end of file