Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SHM channels #1633

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions commons/zenoh-shm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ num-traits = { workspace = true }
num_cpus = { workspace = true, optional = true }
thread-priority = { workspace = true }
lockfree = { workspace = true }
libc = { workspace = true }
stabby = { workspace = true }

[target.'cfg(unix)'.dependencies]
Expand Down
237 changes: 237 additions & 0 deletions commons/zenoh-shm/src/channel/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

use libc::memcpy;
use stabby::IStable;
use std::marker::PhantomData;
use std::mem::ManuallyDrop;
use std::mem::{align_of, size_of};
use std::sync::atomic::{AtomicPtr, AtomicUsize};
use zenoh_core::zerror;
use zenoh_result::{ZError, ZResult};

use std::sync::atomic::Ordering::Relaxed;

#[repr(C, packed(1))]
struct MessageHeader {
len: usize,
}

/*
________________________________________________________________________
| header |-opt-padding-| elem | elem | ..... |

*/
struct InnerLayout<T: IStable<ContainsIndirections = stabby::abi::B0>> {
_phantom: PhantomData<T>,
}

impl<T: IStable<ContainsIndirections = stabby::abi::B0>> InnerLayout<T> {
const fn header_with_padding<THeader>() -> usize {
size_of::<THeader>() + Self::header_padding::<THeader>()
}

const fn header_padding<THeader>() -> usize {
if size_of::<THeader>() > align_of::<T>() {
return (align_of::<T>() - (size_of::<THeader>() % align_of::<T>()))
% align_of::<T>();
} else if size_of::<THeader>() < align_of::<T>() {
return align_of::<T>() - size_of::<THeader>();
}
0
}

#[inline(always)]
fn byte_len(msg: &[T]) -> usize {
std::mem::size_of_val(msg)
}

#[inline(always)]
fn elem_len(byte_len: usize) -> usize {
byte_len / size_of::<T>()
}
}


#[repr(C)]
struct ChannelHeader {
// free data in storage units
pub free: AtomicUsize,
}

pub struct ChannelData<'a, T: IStable<ContainsIndirections = stabby::abi::B0> + 'a> {
// free data in storage units
pub header:

pub pop_pos: usize,
pub push_pos: usize,

/*
Shared data structure:
______________________________________________________
| 1. self.data | 2. rollover | 3. --- | 4. self.free |

1:
- aligned for T
- size is a multiply of alignment
2: same layout as 1
3: padding to align self.free
4: properly aligned self.free
*/
pub data: &'a mut [u8],

_phantom: PhantomData<T>,
}

impl<'a, T: IStable<ContainsIndirections = stabby::abi::B0> + 'a> ChannelData<'a, T> {
fn push(&mut self, msg: &[T]) -> bool {
let bytes_to_store =
InnerLayout::<T>::header_with_padding() + InnerLayout::<T>::byte_len(msg);

if self.free.load(Relaxed) < bytes_to_store {
return false;
}

let header = self.data[self.push_pos] as *mut u8 as *mut MessageHeader;

unsafe {
let data = ((&mut self.data[self.push_pos]) as *mut u8)
.add(InnerLayout::<T>::header_with_padding());

(*header).len = bytes_to_store;
memcpy(
data as *mut libc::c_void,
msg.as_ptr() as *const libc::c_void,
bytes_to_store - InnerLayout::<T>::header_with_padding(),
);
};
self.free
.fetch_sub(bytes_to_store, std::sync::atomic::Ordering::SeqCst);

let new_pos = self.push_pos + bytes_to_store;
if new_pos >= self.data.len() {
self.push_pos = 0;
} else {
self.push_pos = new_pos;
}

true
}

fn pop(&mut self) -> Option<Vec<T>> {
if self.data.len() == self.free.load(Relaxed) {
return None;
}

let header = self.data[self.pop_pos] as *const MessageHeader;
let len = unsafe { (*header).len };

let mut result = Vec::with_capacity(InnerLayout::<T>::elem_len(
len - InnerLayout::<T>::header_with_padding(),
));
unsafe {
let data = ((&self.data[self.pop_pos]) as *const u8)
.add(InnerLayout::<T>::header_with_padding());

memcpy(
result.spare_capacity_mut().as_ptr() as *mut libc::c_void,
data as *const libc::c_void,
len - InnerLayout::<T>::header_with_padding(),
);
result.set_len(len - InnerLayout::<T>::header_with_padding());
};

self.free
.fetch_add(len, std::sync::atomic::Ordering::SeqCst);

let new_pos = self.pop_pos + len;
if new_pos >= self.data.len() {
self.pop_pos = 0;
} else {
self.pop_pos = new_pos;
}

Some(result)
}
}

pub struct Channel<'a, T: IStable<ContainsIndirections = stabby::abi::B0>> {
data: ChannelData<'a, T>,
}

impl<'a, T: IStable<ContainsIndirections = stabby::abi::B0>> Channel<'a, T> {
pub fn create<TryIntoData>(data: TryIntoData) -> ZResult<Tx<'a, T>>
where
TryIntoData: TryInto<ChannelData<'a, T>>,
<TryIntoData as TryInto<ChannelData<'a, T>>>::Error: Into<zenoh_result::Error>,
{
let data = data.try_into().map_err(|e| e.into())?;
Ok(Tx { data })
}

pub fn open<TryIntoData>(data: TryIntoData) -> ZResult<Rx<'a, T>>
where
TryIntoData: TryInto<ChannelData<'a, T>>,
<TryIntoData as TryInto<ChannelData<'a, T>>>::Error: Into<zenoh_result::Error>,
{
let data = data.try_into().map_err(|e| e.into())?;
Ok(Rx { data })
}
}

pub struct Tx<'a, T: IStable<ContainsIndirections = stabby::abi::B0>> {
data: ChannelData<'a, T>,
}

impl<'a, T: IStable<ContainsIndirections = stabby::abi::B0>> Tx<'a, T> {
pub fn send(&mut self, msg: &[T]) -> bool {
self.data.push(msg)
}
}

pub struct Rx<'a, T: IStable<ContainsIndirections = stabby::abi::B0>> {
data: ChannelData<'a, T>,
}

impl<'a, T: IStable<ContainsIndirections = stabby::abi::B0>> Rx<'a, T> {
pub fn receive(&mut self) -> Option<Vec<T>> {
self.data.pop()
}
}

impl<'a, T: IStable<ContainsIndirections = stabby::abi::B0> + 'a> TryInto<ChannelData<'a, T>> for &'a mut [T] {
type Error = ();

fn try_into(self) -> Result<ChannelData<'a, T>, Self::Error> {
// todo: validate alignment for T

let total_bytes = InnerLayout::byte_len(self);

let header_with_padding = InnerLayout::header_with_padding::<AtomicUsize>()

let data_bytes = total_bytes / 2 - size_of::<AtomicUsize>();



let data = self


let data_len = data_bytes / size_of::<T>();

let data = &mut self[0..data_len];
let rollover = &mut self[data_len..data_len*2];
let free = (&mut self[data_len*2]) as *mut T

}
}
1 change: 1 addition & 0 deletions commons/zenoh-shm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub mod header;
pub mod posix_shm;
pub mod reader;
pub mod watchdog;
pub mod channel;

/// Information about a [`ShmBufInner`].
///
Expand Down
Loading