diff --git a/commons/zenoh-codec/src/core/shm.rs b/commons/zenoh-codec/src/core/shm.rs index e25496a268..4f272f0ed4 100644 --- a/commons/zenoh-codec/src/core/shm.rs +++ b/commons/zenoh-codec/src/core/shm.rs @@ -17,7 +17,7 @@ use zenoh_buffers::{ }; use zenoh_shm::{ api::provider::chunk::ChunkDescriptor, header::descriptor::HeaderDescriptor, - watchdog::descriptor::Descriptor, SharedMemoryBufInfo, + watchdog::descriptor::Descriptor, ShmBufInfo, }; use crate::{RCodec, WCodec, Zenoh080}; @@ -62,14 +62,14 @@ where } } -impl WCodec<&SharedMemoryBufInfo, &mut W> for Zenoh080 +impl WCodec<&ShmBufInfo, &mut W> for Zenoh080 where W: Writer, { type Output = Result<(), DidntWrite>; - fn write(self, writer: &mut W, x: &SharedMemoryBufInfo) -> Self::Output { - let SharedMemoryBufInfo { + fn write(self, writer: &mut W, x: &ShmBufInfo) -> Self::Output { + let ShmBufInfo { data_descriptor, shm_protocol, data_len, @@ -138,13 +138,13 @@ where } } -impl RCodec for Zenoh080 +impl RCodec for Zenoh080 where R: Reader, { type Error = DidntRead; - fn read(self, reader: &mut R) -> Result { + fn read(self, reader: &mut R) -> Result { let data_descriptor = self.read(&mut *reader)?; let shm_protocol = self.read(&mut *reader)?; let data_len = self.read(&mut *reader)?; @@ -152,7 +152,7 @@ where let header_descriptor = self.read(&mut *reader)?; let generation = self.read(&mut *reader)?; - let shm_info = SharedMemoryBufInfo::new( + let shm_info = ShmBufInfo::new( data_descriptor, shm_protocol, data_len, diff --git a/commons/zenoh-codec/tests/codec.rs b/commons/zenoh-codec/tests/codec.rs index c26b681336..c2cc71ea17 100644 --- a/commons/zenoh-codec/tests/codec.rs +++ b/commons/zenoh-codec/tests/codec.rs @@ -363,12 +363,12 @@ fn codec_encoding() { fn codec_shm_info() { use zenoh_shm::{ api::provider::chunk::ChunkDescriptor, header::descriptor::HeaderDescriptor, - watchdog::descriptor::Descriptor, SharedMemoryBufInfo, + watchdog::descriptor::Descriptor, ShmBufInfo, }; - run!(SharedMemoryBufInfo, { + run!(ShmBufInfo, { let mut rng = rand::thread_rng(); - SharedMemoryBufInfo::new( + ShmBufInfo::new( ChunkDescriptor::new(rng.gen(), rng.gen(), rng.gen()), rng.gen(), rng.gen(), diff --git a/commons/zenoh-config/src/defaults.rs b/commons/zenoh-config/src/defaults.rs index 1a1080cd49..9d593fabb1 100644 --- a/commons/zenoh-config/src/defaults.rs +++ b/commons/zenoh-config/src/defaults.rs @@ -216,7 +216,7 @@ impl Default for LinkRxConf { // Make explicit the value and ignore clippy warning #[allow(clippy::derivable_impls)] -impl Default for SharedMemoryConf { +impl Default for ShmConf { fn default() -> Self { Self { enabled: false } } diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index 459d7be6f3..07112b2c5f 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -488,7 +488,7 @@ validated_struct::validator! { }, }, pub shared_memory: - SharedMemoryConf { + ShmConf { /// Whether shared memory is enabled or not. /// If set to `true`, the SHM buffer optimization support will be announced to other parties. (default `false`). /// This option doesn't make SHM buffer optimization mandatory, the real support depends on other party setting diff --git a/commons/zenoh-protocol/src/common/mod.rs b/commons/zenoh-protocol/src/common/mod.rs index ef53e5a8ac..99bc471cfd 100644 --- a/commons/zenoh-protocol/src/common/mod.rs +++ b/commons/zenoh-protocol/src/common/mod.rs @@ -46,6 +46,12 @@ pub mod imsg { byte } + pub const fn set_bitfield(mut byte: u8, value: u8, mask: u8) -> u8 { + byte = unset_flag(byte, mask); + byte |= value; + byte + } + pub const fn has_option(options: u64, flag: u64) -> bool { options & flag != 0 } diff --git a/commons/zenoh-protocol/src/network/mod.rs b/commons/zenoh-protocol/src/network/mod.rs index 952fe74e89..371f3eda78 100644 --- a/commons/zenoh-protocol/src/network/mod.rs +++ b/commons/zenoh-protocol/src/network/mod.rs @@ -274,7 +274,7 @@ pub mod ext { } pub fn set_priority(&mut self, priority: Priority) { - self.inner = imsg::set_flag(self.inner, priority as u8); + self.inner = imsg::set_bitfield(self.inner, priority as u8, Self::P_MASK); } pub const fn get_priority(&self) -> Priority { diff --git a/commons/zenoh-protocol/src/scouting/hello.rs b/commons/zenoh-protocol/src/scouting/hello.rs index 62ea915e5a..6639792976 100644 --- a/commons/zenoh-protocol/src/scouting/hello.rs +++ b/commons/zenoh-protocol/src/scouting/hello.rs @@ -12,7 +12,6 @@ // ZettaScale Zenoh Team, // use alloc::vec::Vec; -use core::fmt; use crate::core::{Locator, WhatAmI, ZenohId}; @@ -107,16 +106,6 @@ pub struct Hello { pub locators: Vec, } -impl fmt::Display for Hello { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("Hello") - .field("zid", &self.zid) - .field("whatami", &self.whatami) - .field("locators", &self.locators) - .finish() - } -} - impl Hello { #[cfg(feature = "test")] pub fn rand() -> Self { diff --git a/commons/zenoh-shm/src/api/buffer/traits.rs b/commons/zenoh-shm/src/api/buffer/traits.rs index 9104abc4a1..a5d6b9eba5 100644 --- a/commons/zenoh-shm/src/api/buffer/traits.rs +++ b/commons/zenoh-shm/src/api/buffer/traits.rs @@ -15,10 +15,10 @@ use std::ops::{Deref, DerefMut}; #[zenoh_macros::unstable_doc] -pub trait SHMBuf: Deref + AsRef<[u8]> { +pub trait ShmBuf: Deref + AsRef<[u8]> { #[zenoh_macros::unstable_doc] fn is_valid(&self) -> bool; } #[zenoh_macros::unstable_doc] -pub trait SHMBufMut: SHMBuf + DerefMut + AsMut<[u8]> {} +pub trait ShmBufMut: ShmBuf + DerefMut + AsMut<[u8]> {} diff --git a/commons/zenoh-shm/src/api/buffer/zshm.rs b/commons/zenoh-shm/src/api/buffer/zshm.rs index d6f34f293a..23b902ac4c 100644 --- a/commons/zenoh-shm/src/api/buffer/zshm.rs +++ b/commons/zenoh-shm/src/api/buffer/zshm.rs @@ -20,16 +20,16 @@ use std::{ use zenoh_buffers::{ZBuf, ZSlice}; -use super::{traits::SHMBuf, zshmmut::zshmmut}; -use crate::SharedMemoryBuf; +use super::{traits::ShmBuf, zshmmut::zshmmut}; +use crate::ShmBufInner; /// An immutable SHM buffer #[zenoh_macros::unstable_doc] #[repr(transparent)] #[derive(Clone, Debug, PartialEq, Eq)] -pub struct ZShm(pub(crate) SharedMemoryBuf); +pub struct ZShm(pub(crate) ShmBufInner); -impl SHMBuf for ZShm { +impl ShmBuf for ZShm { fn is_valid(&self) -> bool { self.0.is_valid() } @@ -44,7 +44,7 @@ impl PartialEq<&zshm> for ZShm { impl Borrow for ZShm { fn borrow(&self) -> &zshm { // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)] - // to SharedMemoryBuf type, so it is safe to transmute them in any direction + // to ShmBufInner type, so it is safe to transmute them in any direction unsafe { core::mem::transmute(self) } } } @@ -52,7 +52,7 @@ impl Borrow for ZShm { impl BorrowMut for ZShm { fn borrow_mut(&mut self) -> &mut zshm { // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)] - // to SharedMemoryBuf type, so it is safe to transmute them in any direction + // to ShmBufInner type, so it is safe to transmute them in any direction unsafe { core::mem::transmute(self) } } } @@ -71,8 +71,8 @@ impl AsRef<[u8]> for ZShm { } } -impl From for ZShm { - fn from(value: SharedMemoryBuf) -> Self { +impl From for ZShm { + fn from(value: ShmBufInner) -> Self { Self(value) } } @@ -96,7 +96,7 @@ impl TryFrom<&mut ZShm> for &mut zshmmut { match value.0.is_unique() && value.0.is_valid() { true => { // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)] - // to SharedMemoryBuf type, so it is safe to transmute them in any direction + // to ShmBufInner type, so it is safe to transmute them in any direction Ok(unsafe { core::mem::transmute(value) }) } false => Err(()), @@ -139,18 +139,18 @@ impl DerefMut for zshm { } } -impl From<&SharedMemoryBuf> for &zshm { - fn from(value: &SharedMemoryBuf) -> Self { +impl From<&ShmBufInner> for &zshm { + fn from(value: &ShmBufInner) -> Self { // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)] - // to SharedMemoryBuf type, so it is safe to transmute them in any direction + // to ShmBufInner type, so it is safe to transmute them in any direction unsafe { core::mem::transmute(value) } } } -impl From<&mut SharedMemoryBuf> for &mut zshm { - fn from(value: &mut SharedMemoryBuf) -> Self { +impl From<&mut ShmBufInner> for &mut zshm { + fn from(value: &mut ShmBufInner) -> Self { // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)] - // to SharedMemoryBuf type, so it is safe to transmute them in any direction + // to ShmBufInner type, so it is safe to transmute them in any direction unsafe { core::mem::transmute(value) } } } @@ -162,7 +162,7 @@ impl TryFrom<&mut zshm> for &mut zshmmut { match value.0 .0.is_unique() && value.0 .0.is_valid() { true => { // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)] - // to SharedMemoryBuf type, so it is safe to transmute them in any direction + // to ShmBufInner type, so it is safe to transmute them in any direction Ok(unsafe { core::mem::transmute(value) }) } false => Err(()), diff --git a/commons/zenoh-shm/src/api/buffer/zshmmut.rs b/commons/zenoh-shm/src/api/buffer/zshmmut.rs index 7341b7600c..39a01dff74 100644 --- a/commons/zenoh-shm/src/api/buffer/zshmmut.rs +++ b/commons/zenoh-shm/src/api/buffer/zshmmut.rs @@ -18,27 +18,27 @@ use std::borrow::{Borrow, BorrowMut}; use zenoh_buffers::{ZBuf, ZSlice}; use super::{ - traits::{SHMBuf, SHMBufMut}, + traits::{ShmBuf, ShmBufMut}, zshm::{zshm, ZShm}, }; -use crate::SharedMemoryBuf; +use crate::ShmBufInner; /// A mutable SHM buffer #[zenoh_macros::unstable_doc] #[derive(Debug, PartialEq, Eq)] #[repr(transparent)] -pub struct ZShmMut(SharedMemoryBuf); +pub struct ZShmMut(ShmBufInner); -impl SHMBuf for ZShmMut { +impl ShmBuf for ZShmMut { fn is_valid(&self) -> bool { self.0.is_valid() } } -impl SHMBufMut for ZShmMut {} +impl ShmBufMut for ZShmMut {} impl ZShmMut { - pub(crate) unsafe fn new_unchecked(data: SharedMemoryBuf) -> Self { + pub(crate) unsafe fn new_unchecked(data: ShmBufInner) -> Self { Self(data) } } @@ -49,10 +49,10 @@ impl PartialEq for &ZShmMut { } } -impl TryFrom for ZShmMut { - type Error = SharedMemoryBuf; +impl TryFrom for ZShmMut { + type Error = ShmBufInner; - fn try_from(value: SharedMemoryBuf) -> Result { + fn try_from(value: ShmBufInner) -> Result { match value.is_unique() && value.is_valid() { true => Ok(Self(value)), false => Err(value), @@ -74,7 +74,7 @@ impl TryFrom for ZShmMut { impl Borrow for ZShmMut { fn borrow(&self) -> &zshm { // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)] - // to SharedMemoryBuf type, so it is safe to transmute them in any direction + // to ShmBufInner type, so it is safe to transmute them in any direction unsafe { core::mem::transmute(self) } } } @@ -82,7 +82,7 @@ impl Borrow for ZShmMut { impl BorrowMut for ZShmMut { fn borrow_mut(&mut self) -> &mut zshm { // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)] - // to SharedMemoryBuf type, so it is safe to transmute them in any direction + // to ShmBufInner type, so it is safe to transmute them in any direction unsafe { core::mem::transmute(self) } } } @@ -90,7 +90,7 @@ impl BorrowMut for ZShmMut { impl Borrow for ZShmMut { fn borrow(&self) -> &zshmmut { // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)] - // to SharedMemoryBuf type, so it is safe to transmute them in any direction + // to ShmBufInner type, so it is safe to transmute them in any direction unsafe { core::mem::transmute(self) } } } @@ -98,7 +98,7 @@ impl Borrow for ZShmMut { impl BorrowMut for ZShmMut { fn borrow_mut(&mut self) -> &mut zshmmut { // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)] - // to SharedMemoryBuf type, so it is safe to transmute them in any direction + // to ShmBufInner type, so it is safe to transmute them in any direction unsafe { core::mem::transmute(self) } } } @@ -174,13 +174,13 @@ impl DerefMut for zshmmut { } } -impl TryFrom<&mut SharedMemoryBuf> for &mut zshmmut { +impl TryFrom<&mut ShmBufInner> for &mut zshmmut { type Error = (); - fn try_from(value: &mut SharedMemoryBuf) -> Result { + fn try_from(value: &mut ShmBufInner) -> Result { match value.is_unique() && value.is_valid() { // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)] - // to SharedMemoryBuf type, so it is safe to transmute them in any direction + // to ShmBufInner type, so it is safe to transmute them in any direction true => Ok(unsafe { core::mem::transmute(value) }), false => Err(()), } diff --git a/commons/zenoh-shm/src/api/client/mod.rs b/commons/zenoh-shm/src/api/client/mod.rs index eab20733e7..4a147cbf67 100644 --- a/commons/zenoh-shm/src/api/client/mod.rs +++ b/commons/zenoh-shm/src/api/client/mod.rs @@ -12,5 +12,5 @@ // ZettaScale Zenoh Team, // -pub mod shared_memory_client; -pub mod shared_memory_segment; +pub mod shm_client; +pub mod shm_segment; diff --git a/commons/zenoh-shm/src/api/client/shared_memory_client.rs b/commons/zenoh-shm/src/api/client/shm_client.rs similarity index 70% rename from commons/zenoh-shm/src/api/client/shared_memory_client.rs rename to commons/zenoh-shm/src/api/client/shm_client.rs index dd3cf5db12..e25f818912 100644 --- a/commons/zenoh-shm/src/api/client/shared_memory_client.rs +++ b/commons/zenoh-shm/src/api/client/shm_client.rs @@ -16,13 +16,13 @@ use std::{fmt::Debug, sync::Arc}; use zenoh_result::ZResult; -use super::shared_memory_segment::SharedMemorySegment; +use super::shm_segment::ShmSegment; use crate::api::common::types::SegmentID; -/// SharedMemoryClient - client factory implementation for particular shared memory protocol +/// ShmClient - client factory implementation for particular shared memory protocol #[zenoh_macros::unstable_doc] -pub trait SharedMemoryClient: Debug + Send + Sync { +pub trait ShmClient: Debug + Send + Sync { /// Attach to particular shared memory segment #[zenoh_macros::unstable_doc] - fn attach(&self, segment: SegmentID) -> ZResult>; + fn attach(&self, segment: SegmentID) -> ZResult>; } diff --git a/commons/zenoh-shm/src/api/client/shared_memory_segment.rs b/commons/zenoh-shm/src/api/client/shm_segment.rs similarity index 84% rename from commons/zenoh-shm/src/api/client/shared_memory_segment.rs rename to commons/zenoh-shm/src/api/client/shm_segment.rs index e3aaf9ba39..8744fbb765 100644 --- a/commons/zenoh-shm/src/api/client/shared_memory_segment.rs +++ b/commons/zenoh-shm/src/api/client/shm_segment.rs @@ -18,9 +18,9 @@ use zenoh_result::ZResult; use crate::api::common::types::ChunkID; -/// SharedMemorySegment - RAII interface to interact with particular shared memory segment +/// ShmSegment - RAII interface to interact with particular shared memory segment #[zenoh_macros::unstable_doc] -pub trait SharedMemorySegment: Debug + Send + Sync { +pub trait ShmSegment: Debug + Send + Sync { /// Obtain the actual region of memory identified by it's id #[zenoh_macros::unstable_doc] fn map(&self, chunk: ChunkID) -> ZResult>; diff --git a/commons/zenoh-shm/src/api/client_storage/mod.rs b/commons/zenoh-shm/src/api/client_storage/mod.rs index 7b78c23182..205bc3a9dc 100644 --- a/commons/zenoh-shm/src/api/client_storage/mod.rs +++ b/commons/zenoh-shm/src/api/client_storage/mod.rs @@ -22,12 +22,10 @@ use zenoh_result::{bail, ZResult}; use crate::{ api::{ - client::{ - shared_memory_client::SharedMemoryClient, shared_memory_segment::SharedMemorySegment, - }, + client::{shm_client::ShmClient, shm_segment::ShmSegment}, common::types::ProtocolID, protocol_implementations::posix::{ - posix_shared_memory_client::PosixSharedMemoryClient, protocol_id::POSIX_PROTOCOL_ID, + posix_shm_client::PosixShmClient, protocol_id::POSIX_PROTOCOL_ID, }, }, reader::{ClientStorage, GlobalDataSegmentID}, @@ -36,10 +34,10 @@ use crate::{ lazy_static! { /// A global lazily-initialized SHM client storage. /// When initialized, contains default client set, - /// see SharedMemoryClientStorage::with_default_client_set + /// see ShmClientStorage::with_default_client_set #[zenoh_macros::unstable_doc] - pub static ref GLOBAL_CLIENT_STORAGE: Arc = Arc::new( - SharedMemoryClientStorage::builder() + pub static ref GLOBAL_CLIENT_STORAGE: Arc = Arc::new( + ShmClientStorage::builder() .with_default_client_set() .build() ); @@ -47,64 +45,60 @@ lazy_static! { /// Builder to create new client storages #[zenoh_macros::unstable_doc] -pub struct SharedMemoryClientSetBuilder; +pub struct ShmClientSetBuilder; -impl SharedMemoryClientSetBuilder { +impl ShmClientSetBuilder { /// Add client to the storage (without including the default client set) #[zenoh_macros::unstable_doc] pub fn with_client( self, id: ProtocolID, - client: Arc, - ) -> SharedMemoryClientStorageBuilder { + client: Arc, + ) -> ShmClientStorageBuilder { let clients = HashMap::from([(id, client)]); - SharedMemoryClientStorageBuilder::new(clients) + ShmClientStorageBuilder::new(clients) } /// Add list of clients to the storage (without including the default client set) #[zenoh_macros::unstable_doc] pub fn with_clients( self, - clients: &[(ProtocolID, Arc)], - ) -> SharedMemoryClientStorageBuilder { + clients: &[(ProtocolID, Arc)], + ) -> ShmClientStorageBuilder { let clients = clients.iter().cloned().collect(); - SharedMemoryClientStorageBuilder::new(clients) + ShmClientStorageBuilder::new(clients) } /// Include default clients #[zenoh_macros::unstable_doc] - pub fn with_default_client_set(self) -> SharedMemoryClientStorageBuilder { + pub fn with_default_client_set(self) -> ShmClientStorageBuilder { let clients = HashMap::from([( POSIX_PROTOCOL_ID, - Arc::new(PosixSharedMemoryClient {}) as Arc, + Arc::new(PosixShmClient {}) as Arc, )]); - SharedMemoryClientStorageBuilder::new(clients) + ShmClientStorageBuilder::new(clients) } } #[zenoh_macros::unstable_doc] -pub struct SharedMemoryClientStorageBuilder { - clients: HashMap>, +pub struct ShmClientStorageBuilder { + clients: HashMap>, } -impl SharedMemoryClientStorageBuilder { - fn new(clients: HashMap>) -> Self { +impl ShmClientStorageBuilder { + fn new(clients: HashMap>) -> Self { Self { clients } } /// Add client to the storage #[zenoh_macros::unstable_doc] - pub fn with_client( - mut self, - id: ProtocolID, - client: Arc, - ) -> ZResult { + pub fn with_client(mut self, id: ProtocolID, client: Arc) -> ZResult { match self.clients.entry(id) { std::collections::hash_map::Entry::Occupied(occupied) => { bail!("Client already exists for id {id}: {:?}!", occupied) } std::collections::hash_map::Entry::Vacant(vacant) => { - vacant.insert(client as Arc); + vacant.insert(client as Arc); Ok(self) } } @@ -112,15 +106,15 @@ impl SharedMemoryClientStorageBuilder { /// Add list of clients to the storage #[zenoh_macros::unstable_doc] - pub fn with_clients(mut self, clients: &[(ProtocolID, Arc)]) -> Self { + pub fn with_clients(mut self, clients: &[(ProtocolID, Arc)]) -> Self { self.clients.extend(clients.iter().cloned()); self } /// Build the storage with parameters specified on previous step #[zenoh_macros::unstable_doc] - pub fn build(self) -> SharedMemoryClientStorage { - SharedMemoryClientStorage::new(self.clients) + pub fn build(self) -> ShmClientStorage { + ShmClientStorage::new(self.clients) } } @@ -129,24 +123,24 @@ impl SharedMemoryClientStorageBuilder { /// SHM buffers for Protocols added to this instance. #[zenoh_macros::unstable_doc] #[derive(Debug)] -pub struct SharedMemoryClientStorage { - pub(crate) clients: ClientStorage>, - pub(crate) segments: RwLock>>, +pub struct ShmClientStorage { + pub(crate) clients: ClientStorage>, + pub(crate) segments: RwLock>>, } -impl Eq for SharedMemoryClientStorage {} +impl Eq for ShmClientStorage {} -impl PartialEq for SharedMemoryClientStorage { +impl PartialEq for ShmClientStorage { fn eq(&self, other: &Self) -> bool { std::ptr::eq(self, other) } } -impl SharedMemoryClientStorage { +impl ShmClientStorage { /// Get the builder to construct a new storage #[zenoh_macros::unstable_doc] - pub fn builder() -> SharedMemoryClientSetBuilder { - SharedMemoryClientSetBuilder + pub fn builder() -> ShmClientSetBuilder { + ShmClientSetBuilder } /// Get the list of supported SHM protocols. @@ -155,7 +149,7 @@ impl SharedMemoryClientStorage { self.clients.get_clients().keys().copied().collect() } - fn new(clients: HashMap>) -> Self { + fn new(clients: HashMap>) -> Self { Self { clients: ClientStorage::new(clients), segments: RwLock::default(), diff --git a/commons/zenoh-shm/src/api/common/types.rs b/commons/zenoh-shm/src/api/common/types.rs index 02e009aff3..5f423e7459 100644 --- a/commons/zenoh-shm/src/api/common/types.rs +++ b/commons/zenoh-shm/src/api/common/types.rs @@ -13,8 +13,8 @@ // /// Unique protocol identifier. -/// Here is a contract: it is up to user to make sure that incompatible SharedMemoryClient -/// and SharedMemoryProviderBackend implementations will never use the same ProtocolID +/// Here is a contract: it is up to user to make sure that incompatible ShmClient +/// and ShmProviderBackend implementations will never use the same ProtocolID #[zenoh_macros::unstable_doc] pub type ProtocolID = u32; diff --git a/commons/zenoh-shm/src/api/protocol_implementations/posix/mod.rs b/commons/zenoh-shm/src/api/protocol_implementations/posix/mod.rs index 12c8aba0b6..e5dd7db33e 100644 --- a/commons/zenoh-shm/src/api/protocol_implementations/posix/mod.rs +++ b/commons/zenoh-shm/src/api/protocol_implementations/posix/mod.rs @@ -12,8 +12,8 @@ // ZettaScale Zenoh Team, // -pub mod posix_shared_memory_client; -pub mod posix_shared_memory_provider_backend; +pub mod posix_shm_client; +pub mod posix_shm_provider_backend; pub mod protocol_id; -pub(crate) mod posix_shared_memory_segment; +pub(crate) mod posix_shm_segment; diff --git a/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shared_memory_client.rs b/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_client.rs similarity index 65% rename from commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shared_memory_client.rs rename to commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_client.rs index 5684b0b15f..73e2a96cd9 100644 --- a/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shared_memory_client.rs +++ b/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_client.rs @@ -16,23 +16,21 @@ use std::sync::Arc; use zenoh_result::ZResult; -use super::posix_shared_memory_segment::PosixSharedMemorySegment; +use super::posix_shm_segment::PosixShmSegment; use crate::api::{ - client::{ - shared_memory_client::SharedMemoryClient, shared_memory_segment::SharedMemorySegment, - }, + client::{shm_client::ShmClient, shm_segment::ShmSegment}, common::types::SegmentID, }; /// Client factory implementation for particular shared memory protocol #[zenoh_macros::unstable_doc] #[derive(Debug)] -pub struct PosixSharedMemoryClient; +pub struct PosixShmClient; -impl SharedMemoryClient for PosixSharedMemoryClient { +impl ShmClient for PosixShmClient { /// Attach to particular shared memory segment #[zenoh_macros::unstable_doc] - fn attach(&self, segment: SegmentID) -> ZResult> { - Ok(Arc::new(PosixSharedMemorySegment::open(segment)?)) + fn attach(&self, segment: SegmentID) -> ZResult> { + Ok(Arc::new(PosixShmSegment::open(segment)?)) } } diff --git a/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shared_memory_provider_backend.rs b/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_provider_backend.rs similarity index 79% rename from commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shared_memory_provider_backend.rs rename to commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_provider_backend.rs index 60e2a10891..7de9e9f22d 100644 --- a/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shared_memory_provider_backend.rs +++ b/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_provider_backend.rs @@ -25,12 +25,12 @@ use std::{ use zenoh_core::zlock; use zenoh_result::ZResult; -use super::posix_shared_memory_segment::PosixSharedMemorySegment; +use super::posix_shm_segment::PosixShmSegment; use crate::api::{ common::types::ChunkID, provider::{ chunk::{AllocatedChunk, ChunkDescriptor}, - shared_memory_provider_backend::SharedMemoryProviderBackend, + shm_provider_backend::ShmProviderBackend, types::{AllocAlignment, ChunkAllocResult, MemoryLayout, ZAllocError}, }, }; @@ -68,16 +68,16 @@ impl PartialEq for Chunk { /// Builder to create posix SHM provider #[zenoh_macros::unstable_doc] -pub struct PosixSharedMemoryProviderBackendBuilder; +pub struct PosixShmProviderBackendBuilder; -impl PosixSharedMemoryProviderBackendBuilder { +impl PosixShmProviderBackendBuilder { /// Use existing layout #[zenoh_macros::unstable_doc] pub fn with_layout>( self, layout: Layout, - ) -> LayoutedPosixSharedMemoryProviderBackendBuilder { - LayoutedPosixSharedMemoryProviderBackendBuilder { layout } + ) -> LayoutedPosixShmProviderBackendBuilder { + LayoutedPosixShmProviderBackendBuilder { layout } } /// Construct layout in-place using arguments @@ -86,9 +86,9 @@ impl PosixSharedMemoryProviderBackendBuilder { self, size: usize, alignment: AllocAlignment, - ) -> ZResult> { + ) -> ZResult> { let layout = MemoryLayout::new(size, alignment)?; - Ok(LayoutedPosixSharedMemoryProviderBackendBuilder { layout }) + Ok(LayoutedPosixShmProviderBackendBuilder { layout }) } /// Construct layout in-place from size (default alignment will be used) @@ -96,44 +96,44 @@ impl PosixSharedMemoryProviderBackendBuilder { pub fn with_size( self, size: usize, - ) -> ZResult> { + ) -> ZResult> { let layout = MemoryLayout::new(size, AllocAlignment::default())?; - Ok(LayoutedPosixSharedMemoryProviderBackendBuilder { layout }) + Ok(LayoutedPosixShmProviderBackendBuilder { layout }) } } #[zenoh_macros::unstable_doc] -pub struct LayoutedPosixSharedMemoryProviderBackendBuilder> { +pub struct LayoutedPosixShmProviderBackendBuilder> { layout: Layout, } -impl> LayoutedPosixSharedMemoryProviderBackendBuilder { - /// try to create PosixSharedMemoryProviderBackend +impl> LayoutedPosixShmProviderBackendBuilder { + /// try to create PosixShmProviderBackend #[zenoh_macros::unstable_doc] - pub fn res(self) -> ZResult { - PosixSharedMemoryProviderBackend::new(self.layout.borrow()) + pub fn res(self) -> ZResult { + PosixShmProviderBackend::new(self.layout.borrow()) } } -/// A backend for SharedMemoryProvider based on POSIX shared memory. +/// A backend for ShmProvider based on POSIX shared memory. /// This is the default general-purpose backed shipped with Zenoh. #[zenoh_macros::unstable_doc] -pub struct PosixSharedMemoryProviderBackend { +pub struct PosixShmProviderBackend { available: AtomicUsize, - segment: PosixSharedMemorySegment, + segment: PosixShmSegment, free_list: Mutex>, alignment: AllocAlignment, } -impl PosixSharedMemoryProviderBackend { +impl PosixShmProviderBackend { /// Get the builder to construct a new instance #[zenoh_macros::unstable_doc] - pub fn builder() -> PosixSharedMemoryProviderBackendBuilder { - PosixSharedMemoryProviderBackendBuilder + pub fn builder() -> PosixShmProviderBackendBuilder { + PosixShmProviderBackendBuilder } fn new(layout: &MemoryLayout) -> ZResult { - let segment = PosixSharedMemorySegment::create(layout.size())?; + let segment = PosixShmSegment::create(layout.size())?; let mut free_list = BinaryHeap::new(); let root_chunk = Chunk { @@ -143,7 +143,7 @@ impl PosixSharedMemoryProviderBackend { free_list.push(root_chunk); tracing::trace!( - "Created PosixSharedMemoryProviderBackend id {}, layout {:?}", + "Created PosixShmProviderBackend id {}, layout {:?}", segment.segment.id(), layout ); @@ -157,14 +157,14 @@ impl PosixSharedMemoryProviderBackend { } } -impl SharedMemoryProviderBackend for PosixSharedMemoryProviderBackend { +impl ShmProviderBackend for PosixShmProviderBackend { fn alloc(&self, layout: &MemoryLayout) -> ChunkAllocResult { - tracing::trace!("PosixSharedMemoryProviderBackend::alloc({:?})", layout); + tracing::trace!("PosixShmProviderBackend::alloc({:?})", layout); let required_len = layout.size(); if self.available.load(Ordering::Relaxed) < required_len { - tracing::trace!( "PosixSharedMemoryProviderBackend does not have sufficient free memory to allocate {:?}, try de-fragmenting!", layout); + tracing::trace!( "PosixShmProviderBackend does not have sufficient free memory to allocate {:?}, try de-fragmenting!", layout); return Err(ZAllocError::OutOfMemory); } @@ -196,13 +196,13 @@ impl SharedMemoryProviderBackend for PosixSharedMemoryProviderBackend { }) } Some(c) => { - tracing::trace!("PosixSharedMemoryProviderBackend::alloc({:?}) cannot find any big enough chunk\nSharedMemoryManager::free_list = {:?}", layout, self.free_list); + tracing::trace!("PosixShmProviderBackend::alloc({:?}) cannot find any big enough chunk\nShmManager::free_list = {:?}", layout, self.free_list); guard.push(c); Err(ZAllocError::NeedDefragment) } None => { // NOTE: that should never happen! If this happens - there is a critical bug somewhere around! - let err = format!("PosixSharedMemoryProviderBackend::alloc({:?}) cannot find any available chunk\nSharedMemoryManager::free_list = {:?}", layout, self.free_list); + let err = format!("PosixShmProviderBackend::alloc({:?}) cannot find any available chunk\nShmManager::free_list = {:?}", layout, self.free_list); #[cfg(feature = "test")] panic!("{err}"); #[cfg(not(feature = "test"))] diff --git a/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shared_memory_segment.rs b/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_segment.rs similarity index 86% rename from commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shared_memory_segment.rs rename to commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_segment.rs index 3f74594ad0..dd103462e4 100644 --- a/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shared_memory_segment.rs +++ b/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_segment.rs @@ -18,7 +18,7 @@ use zenoh_result::ZResult; use crate::{ api::{ - client::shared_memory_segment::SharedMemorySegment, + client::shm_segment::ShmSegment, common::types::{ChunkID, SegmentID}, }, posix_shm::array::ArrayInSHM, @@ -27,11 +27,11 @@ use crate::{ const POSIX_SHM_SEGMENT_PREFIX: &str = "posix_shm_provider_segment"; #[derive(Debug)] -pub(crate) struct PosixSharedMemorySegment { +pub(crate) struct PosixShmSegment { pub(crate) segment: ArrayInSHM, } -impl PosixSharedMemorySegment { +impl PosixShmSegment { pub(crate) fn create(alloc_size: usize) -> ZResult { let segment = ArrayInSHM::create(alloc_size, POSIX_SHM_SEGMENT_PREFIX)?; Ok(Self { segment }) @@ -43,7 +43,7 @@ impl PosixSharedMemorySegment { } } -impl SharedMemorySegment for PosixSharedMemorySegment { +impl ShmSegment for PosixShmSegment { fn map(&self, chunk: ChunkID) -> ZResult> { unsafe { Ok(AtomicPtr::new(self.segment.elem_mut(chunk))) } } diff --git a/commons/zenoh-shm/src/api/protocol_implementations/posix/protocol_id.rs b/commons/zenoh-shm/src/api/protocol_implementations/posix/protocol_id.rs index b2eec8d7a5..cff39f921a 100644 --- a/commons/zenoh-shm/src/api/protocol_implementations/posix/protocol_id.rs +++ b/commons/zenoh-shm/src/api/protocol_implementations/posix/protocol_id.rs @@ -14,6 +14,6 @@ use crate::api::common::types::ProtocolID; -/// Protocol identifier to use when creating SharedMemoryProvider +/// Protocol identifier to use when creating ShmProvider #[zenoh_macros::unstable_doc] pub const POSIX_PROTOCOL_ID: ProtocolID = 0; diff --git a/commons/zenoh-shm/src/api/provider/mod.rs b/commons/zenoh-shm/src/api/provider/mod.rs index a769baacb3..2d25e37c3d 100644 --- a/commons/zenoh-shm/src/api/provider/mod.rs +++ b/commons/zenoh-shm/src/api/provider/mod.rs @@ -13,6 +13,6 @@ // pub mod chunk; -pub mod shared_memory_provider; -pub mod shared_memory_provider_backend; +pub mod shm_provider; +pub mod shm_provider_backend; pub mod types; diff --git a/commons/zenoh-shm/src/api/provider/shared_memory_provider.rs b/commons/zenoh-shm/src/api/provider/shm_provider.rs similarity index 83% rename from commons/zenoh-shm/src/api/provider/shared_memory_provider.rs rename to commons/zenoh-shm/src/api/provider/shm_provider.rs index 9c0c497044..8773498b61 100644 --- a/commons/zenoh-shm/src/api/provider/shared_memory_provider.rs +++ b/commons/zenoh-shm/src/api/provider/shm_provider.rs @@ -27,7 +27,7 @@ use zenoh_result::ZResult; use super::{ chunk::{AllocatedChunk, ChunkDescriptor}, - shared_memory_provider_backend::SharedMemoryProviderBackend, + shm_provider_backend::ShmProviderBackend, types::{ AllocAlignment, BufAllocResult, BufLayoutAllocResult, ChunkAllocResult, MemoryLayout, ZAllocError, ZLayoutAllocError, ZLayoutError, @@ -46,7 +46,7 @@ use crate::{ storage::GLOBAL_STORAGE, validator::GLOBAL_VALIDATOR, }, - SharedMemoryBuf, SharedMemoryBufInfo, + ShmBufInfo, ShmBufInner, }; #[derive(Debug)] @@ -73,25 +73,25 @@ impl BusyChunk { struct AllocData<'a, IDSource, Backend> where IDSource: ProtocolIDSource, - Backend: SharedMemoryProviderBackend, + Backend: ShmProviderBackend, { size: usize, alignment: AllocAlignment, - provider: &'a SharedMemoryProvider, + provider: &'a ShmProvider, } #[zenoh_macros::unstable_doc] pub struct AllocLayoutSizedBuilder<'a, IDSource, Backend>(AllocData<'a, IDSource, Backend>) where IDSource: ProtocolIDSource, - Backend: SharedMemoryProviderBackend; + Backend: ShmProviderBackend; impl<'a, IDSource, Backend> AllocLayoutSizedBuilder<'a, IDSource, Backend> where IDSource: ProtocolIDSource, - Backend: SharedMemoryProviderBackend, + Backend: ShmProviderBackend, { - fn new(provider: &'a SharedMemoryProvider, size: usize) -> Self { + fn new(provider: &'a ShmProvider, size: usize) -> Self { Self(AllocData { provider, size, @@ -129,7 +129,7 @@ where impl<'a, IDSource, Backend> Resolvable for AllocLayoutSizedBuilder<'a, IDSource, Backend> where IDSource: ProtocolIDSource, - Backend: SharedMemoryProviderBackend, + Backend: ShmProviderBackend, { type To = BufLayoutAllocResult; } @@ -138,7 +138,7 @@ where impl<'a, IDSource, Backend> Wait for AllocLayoutSizedBuilder<'a, IDSource, Backend> where IDSource: ProtocolIDSource, - Backend: SharedMemoryProviderBackend, + Backend: ShmProviderBackend, { fn wait(self) -> ::To { let builder = AllocBuilder2::<'a, IDSource, Backend, JustAlloc> { @@ -151,23 +151,23 @@ where /// A layout for allocations. /// This is a pre-calculated layout suitable for making series of similar allocations -/// adopted for particular SharedMemoryProvider +/// adopted for particular ShmProvider #[zenoh_macros::unstable_doc] #[derive(Debug)] pub struct AllocLayout<'a, IDSource, Backend> where IDSource: ProtocolIDSource, - Backend: SharedMemoryProviderBackend, + Backend: ShmProviderBackend, { size: usize, provider_layout: MemoryLayout, - provider: &'a SharedMemoryProvider, + provider: &'a ShmProvider, } impl<'a, IDSource, Backend> AllocLayout<'a, IDSource, Backend> where IDSource: ProtocolIDSource, - Backend: SharedMemoryProviderBackend, + Backend: ShmProviderBackend, { /// Allocate the new buffer with this layout #[zenoh_macros::unstable_doc] @@ -204,8 +204,8 @@ where /// Trait for deallocation policies. #[zenoh_macros::unstable_doc] pub trait ForceDeallocPolicy { - fn dealloc( - provider: &SharedMemoryProvider, + fn dealloc( + provider: &ShmProvider, ) -> bool; } @@ -213,8 +213,8 @@ pub trait ForceDeallocPolicy { #[zenoh_macros::unstable_doc] pub struct DeallocOptimal; impl ForceDeallocPolicy for DeallocOptimal { - fn dealloc( - provider: &SharedMemoryProvider, + fn dealloc( + provider: &ShmProvider, ) -> bool { let mut guard = provider.busy_list.lock().unwrap(); let chunk_to_dealloc = match guard.remove(1) { @@ -235,8 +235,8 @@ impl ForceDeallocPolicy for DeallocOptimal { #[zenoh_macros::unstable_doc] pub struct DeallocYoungest; impl ForceDeallocPolicy for DeallocYoungest { - fn dealloc( - provider: &SharedMemoryProvider, + fn dealloc( + provider: &ShmProvider, ) -> bool { match provider.busy_list.lock().unwrap().pop_back() { Some(val) => { @@ -252,8 +252,8 @@ impl ForceDeallocPolicy for DeallocYoungest { #[zenoh_macros::unstable_doc] pub struct DeallocEldest; impl ForceDeallocPolicy for DeallocEldest { - fn dealloc( - provider: &SharedMemoryProvider, + fn dealloc( + provider: &ShmProvider, ) -> bool { match provider.busy_list.lock().unwrap().pop_front() { Some(val) => { @@ -268,9 +268,9 @@ impl ForceDeallocPolicy for DeallocEldest { /// Trait for allocation policies #[zenoh_macros::unstable_doc] pub trait AllocPolicy { - fn alloc( + fn alloc( layout: &MemoryLayout, - provider: &SharedMemoryProvider, + provider: &ShmProvider, ) -> ChunkAllocResult; } @@ -278,9 +278,9 @@ pub trait AllocPolicy { #[zenoh_macros::unstable_doc] #[async_trait] pub trait AsyncAllocPolicy: Send { - async fn alloc_async( + async fn alloc_async( layout: &MemoryLayout, - provider: &SharedMemoryProvider, + provider: &ShmProvider, ) -> ChunkAllocResult; } @@ -288,9 +288,9 @@ pub trait AsyncAllocPolicy: Send { #[zenoh_macros::unstable_doc] pub struct JustAlloc; impl AllocPolicy for JustAlloc { - fn alloc( + fn alloc( layout: &MemoryLayout, - provider: &SharedMemoryProvider, + provider: &ShmProvider, ) -> ChunkAllocResult { provider.backend.alloc(layout) } @@ -313,9 +313,9 @@ where InnerPolicy: AllocPolicy, AltPolicy: AllocPolicy, { - fn alloc( + fn alloc( layout: &MemoryLayout, - provider: &SharedMemoryProvider, + provider: &ShmProvider, ) -> ChunkAllocResult { let result = InnerPolicy::alloc(layout, provider); if let Err(ZAllocError::OutOfMemory) = result { @@ -345,9 +345,9 @@ where InnerPolicy: AllocPolicy, AltPolicy: AllocPolicy, { - fn alloc( + fn alloc( layout: &MemoryLayout, - provider: &SharedMemoryProvider, + provider: &ShmProvider, ) -> ChunkAllocResult { let result = InnerPolicy::alloc(layout, provider); if let Err(ZAllocError::NeedDefragment) = result { @@ -384,9 +384,9 @@ where AltPolicy: AllocPolicy, DeallocatePolicy: ForceDeallocPolicy, { - fn alloc( + fn alloc( layout: &MemoryLayout, - provider: &SharedMemoryProvider, + provider: &ShmProvider, ) -> ChunkAllocResult { let mut result = InnerPolicy::alloc(layout, provider); for _ in 0..N { @@ -422,12 +422,9 @@ impl AsyncAllocPolicy for BlockOn where InnerPolicy: AllocPolicy + Send, { - async fn alloc_async< - IDSource: ProtocolIDSource, - Backend: SharedMemoryProviderBackend + Sync, - >( + async fn alloc_async( layout: &MemoryLayout, - provider: &SharedMemoryProvider, + provider: &ShmProvider, ) -> ChunkAllocResult { loop { match InnerPolicy::alloc(layout, provider) { @@ -446,9 +443,9 @@ impl AllocPolicy for BlockOn where InnerPolicy: AllocPolicy, { - fn alloc( + fn alloc( layout: &MemoryLayout, - provider: &SharedMemoryProvider, + provider: &ShmProvider, ) -> ChunkAllocResult { loop { match InnerPolicy::alloc(layout, provider) { @@ -469,14 +466,14 @@ where 'a, Policy: AllocPolicy, IDSource, - Backend: SharedMemoryProviderBackend, + Backend: ShmProviderBackend, > { - provider: &'a SharedMemoryProvider, - allocations: lockfree::map::Map, SharedMemoryBuf>, + provider: &'a ShmProvider, + allocations: lockfree::map::Map, ShmBufInner>, _phantom: PhantomData, } -impl<'a, Policy: AllocPolicy, IDSource, Backend: SharedMemoryProviderBackend> +impl<'a, Policy: AllocPolicy, IDSource, Backend: ShmProviderBackend> ShmAllocator<'a, Policy, IDSource, Backend> { fn allocate(&self, layout: std::alloc::Layout) -> BufAllocResult { @@ -490,7 +487,7 @@ impl<'a, Policy: AllocPolicy, IDSource, Backend: SharedMemoryProviderBackend> } } -unsafe impl<'a, Policy: AllocPolicy, IDSource, Backend: SharedMemoryProviderBackend> +unsafe impl<'a, Policy: AllocPolicy, IDSource, Backend: ShmProviderBackend> allocator_api2::alloc::Allocator for ShmAllocator<'a, Policy, IDSource, Backend> { fn allocate( @@ -520,7 +517,7 @@ unsafe impl<'a, Policy: AllocPolicy, IDSource, Backend: SharedMemoryProviderBack pub struct AllocBuilder2< 'a, IDSource: ProtocolIDSource, - Backend: SharedMemoryProviderBackend, + Backend: ShmProviderBackend, Policy = JustAlloc, > { data: AllocData<'a, IDSource, Backend>, @@ -531,7 +528,7 @@ pub struct AllocBuilder2< impl<'a, IDSource, Backend, Policy> AllocBuilder2<'a, IDSource, Backend, Policy> where IDSource: ProtocolIDSource, - Backend: SharedMemoryProviderBackend, + Backend: ShmProviderBackend, { /// Set the allocation policy #[zenoh_macros::unstable_doc] @@ -546,7 +543,7 @@ where impl<'a, IDSource, Backend, Policy> Resolvable for AllocBuilder2<'a, IDSource, Backend, Policy> where IDSource: ProtocolIDSource, - Backend: SharedMemoryProviderBackend, + Backend: ShmProviderBackend, { type To = BufLayoutAllocResult; } @@ -555,7 +552,7 @@ where impl<'a, IDSource, Backend, Policy> Wait for AllocBuilder2<'a, IDSource, Backend, Policy> where IDSource: ProtocolIDSource, - Backend: SharedMemoryProviderBackend, + Backend: ShmProviderBackend, Policy: AllocPolicy, { fn wait(self) -> ::To { @@ -573,7 +570,7 @@ where impl<'a, IDSource, Backend, Policy> IntoFuture for AllocBuilder2<'a, IDSource, Backend, Policy> where IDSource: ProtocolIDSource, - Backend: SharedMemoryProviderBackend + Sync, + Backend: ShmProviderBackend + Sync, Policy: AsyncAllocPolicy, { type Output = ::To; @@ -599,7 +596,7 @@ where pub struct AllocBuilder< 'a, IDSource: ProtocolIDSource, - Backend: SharedMemoryProviderBackend, + Backend: ShmProviderBackend, Policy = JustAlloc, > { layout: &'a AllocLayout<'a, IDSource, Backend>, @@ -610,7 +607,7 @@ pub struct AllocBuilder< impl<'a, IDSource, Backend, Policy> AllocBuilder<'a, IDSource, Backend, Policy> where IDSource: ProtocolIDSource, - Backend: SharedMemoryProviderBackend, + Backend: ShmProviderBackend, { /// Set the allocation policy #[zenoh_macros::unstable_doc] @@ -625,7 +622,7 @@ where impl<'a, IDSource, Backend, Policy> Resolvable for AllocBuilder<'a, IDSource, Backend, Policy> where IDSource: ProtocolIDSource, - Backend: SharedMemoryProviderBackend, + Backend: ShmProviderBackend, { type To = BufAllocResult; } @@ -634,7 +631,7 @@ where impl<'a, IDSource, Backend, Policy> Wait for AllocBuilder<'a, IDSource, Backend, Policy> where IDSource: ProtocolIDSource, - Backend: SharedMemoryProviderBackend, + Backend: ShmProviderBackend, Policy: AllocPolicy, { fn wait(self) -> ::To { @@ -648,7 +645,7 @@ where impl<'a, IDSource, Backend, Policy> IntoFuture for AllocBuilder<'a, IDSource, Backend, Policy> where IDSource: ProtocolIDSource, - Backend: SharedMemoryProviderBackend + Sync, + Backend: ShmProviderBackend + Sync, Policy: AsyncAllocPolicy, { type Output = ::To; @@ -668,9 +665,9 @@ where } #[zenoh_macros::unstable_doc] -pub struct SharedMemoryProviderBuilder; -impl SharedMemoryProviderBuilder { - /// Get the builder to construct SharedMemoryProvider +pub struct ShmProviderBuilder; +impl ShmProviderBuilder { + /// Get the builder to construct ShmProvider #[zenoh_macros::unstable_doc] pub fn builder() -> Self { Self @@ -678,38 +675,33 @@ impl SharedMemoryProviderBuilder { /// Set compile-time-evaluated protocol ID (preferred) #[zenoh_macros::unstable_doc] - pub fn protocol_id( - self, - ) -> SharedMemoryProviderBuilderID> { - SharedMemoryProviderBuilderID::> { + pub fn protocol_id(self) -> ShmProviderBuilderID> { + ShmProviderBuilderID::> { id: StaticProtocolID, } } /// Set runtime-evaluated protocol ID #[zenoh_macros::unstable_doc] - pub fn dynamic_protocol_id( - self, - id: ProtocolID, - ) -> SharedMemoryProviderBuilderID { - SharedMemoryProviderBuilderID:: { + pub fn dynamic_protocol_id(self, id: ProtocolID) -> ShmProviderBuilderID { + ShmProviderBuilderID:: { id: DynamicProtocolID::new(id), } } } #[zenoh_macros::unstable_doc] -pub struct SharedMemoryProviderBuilderID { +pub struct ShmProviderBuilderID { id: IDSource, } -impl SharedMemoryProviderBuilderID { +impl ShmProviderBuilderID { /// Set the backend #[zenoh_macros::unstable_doc] - pub fn backend( + pub fn backend( self, backend: Backend, - ) -> SharedMemoryProviderBuilderBackendID { - SharedMemoryProviderBuilderBackendID { + ) -> ShmProviderBuilderBackendID { + ShmProviderBuilderBackendID { backend, id: self.id, } @@ -717,34 +709,34 @@ impl SharedMemoryProviderBuilderID { } #[zenoh_macros::unstable_doc] -pub struct SharedMemoryProviderBuilderBackendID +pub struct ShmProviderBuilderBackendID where IDSource: ProtocolIDSource, - Backend: SharedMemoryProviderBackend, + Backend: ShmProviderBackend, { backend: Backend, id: IDSource, } -impl SharedMemoryProviderBuilderBackendID +impl ShmProviderBuilderBackendID where IDSource: ProtocolIDSource, - Backend: SharedMemoryProviderBackend, + Backend: ShmProviderBackend, { - /// build SharedMemoryProvider + /// build ShmProvider #[zenoh_macros::unstable_doc] - pub fn res(self) -> SharedMemoryProvider { - SharedMemoryProvider::new(self.backend, self.id) + pub fn res(self) -> ShmProvider { + ShmProvider::new(self.backend, self.id) } } -/// Trait to create ProtocolID sources for SharedMemoryProvider +/// Trait to create ProtocolID sources for ShmProvider #[zenoh_macros::unstable_doc] pub trait ProtocolIDSource: Send + Sync { fn id(&self) -> ProtocolID; } /// Static ProtocolID source. This is a recommended API to set ProtocolID -/// when creating SharedMemoryProvider as the ID value is statically evaluated +/// when creating ShmProvider as the ID value is statically evaluated /// at compile-time and can be optimized. #[zenoh_macros::unstable_doc] #[derive(Default)] @@ -756,7 +748,7 @@ impl ProtocolIDSource for StaticProtocolID { } /// Dynamic ProtocolID source. This is an alternative API to set ProtocolID -/// when creating SharedMemoryProvider for cases where ProtocolID is unknown +/// when creating ShmProvider for cases where ProtocolID is unknown /// at compile-time. #[zenoh_macros::unstable_doc] pub struct DynamicProtocolID { @@ -779,20 +771,20 @@ unsafe impl Sync for DynamicProtocolID {} /// A generalized interface for shared memory data sources #[zenoh_macros::unstable_doc] #[derive(Debug)] -pub struct SharedMemoryProvider +pub struct ShmProvider where IDSource: ProtocolIDSource, - Backend: SharedMemoryProviderBackend, + Backend: ShmProviderBackend, { backend: Backend, busy_list: Mutex>, id: IDSource, } -impl SharedMemoryProvider +impl ShmProvider where IDSource: ProtocolIDSource, - Backend: SharedMemoryProviderBackend, + Backend: ShmProviderBackend, { /// Rich interface for making allocations #[zenoh_macros::unstable_doc] @@ -814,7 +806,7 @@ where // allocate resources for SHM buffer let (allocated_header, allocated_watchdog, confirmed_watchdog) = Self::alloc_resources()?; - // wrap everything to SharedMemoryBuf + // wrap everything to ShmBufInner let wrapped = self.wrap( chunk, len, @@ -863,10 +855,10 @@ where } // PRIVATE impls -impl SharedMemoryProvider +impl ShmProvider where IDSource: ProtocolIDSource, - Backend: SharedMemoryProviderBackend, + Backend: ShmProviderBackend, { fn new(backend: Backend, id: IDSource) -> Self { Self { @@ -891,7 +883,7 @@ where // and it is necessary to handle that properly and pass this len to corresponding free(...) let chunk = Policy::alloc(layout, self)?; - // wrap allocated chunk to SharedMemoryBuf + // wrap allocated chunk to ShmBufInner let wrapped = self.wrap( chunk, size, @@ -926,7 +918,7 @@ where allocated_header: AllocatedHeaderDescriptor, allocated_watchdog: AllocatedWatchdog, confirmed_watchdog: ConfirmedDescriptor, - ) -> SharedMemoryBuf { + ) -> ShmBufInner { let header = allocated_header.descriptor.clone(); let descriptor = Descriptor::from(&allocated_watchdog.descriptor); @@ -943,7 +935,7 @@ where ); // Create buffer's info - let info = SharedMemoryBufInfo::new( + let info = ShmBufInfo::new( chunk.descriptor.clone(), self.id.id(), len, @@ -953,7 +945,7 @@ where ); // Create buffer - let shmb = SharedMemoryBuf { + let shmb = ShmBufInner { header, buf: chunk.data, info, @@ -972,10 +964,10 @@ where } // PRIVATE impls for Sync backend -impl SharedMemoryProvider +impl ShmProvider where IDSource: ProtocolIDSource, - Backend: SharedMemoryProviderBackend + Sync, + Backend: ShmProviderBackend + Sync, { async fn alloc_inner_async( &self, @@ -996,7 +988,7 @@ where // and it is necessary to handle that properly and pass this len to corresponding free(...) let chunk = Policy::alloc_async(backend_layout, self).await?; - // wrap allocated chunk to SharedMemoryBuf + // wrap allocated chunk to ShmBufInner let wrapped = self.wrap( chunk, size, diff --git a/commons/zenoh-shm/src/api/provider/shared_memory_provider_backend.rs b/commons/zenoh-shm/src/api/provider/shm_provider_backend.rs similarity index 97% rename from commons/zenoh-shm/src/api/provider/shared_memory_provider_backend.rs rename to commons/zenoh-shm/src/api/provider/shm_provider_backend.rs index cd15ce3720..0487981e5c 100644 --- a/commons/zenoh-shm/src/api/provider/shared_memory_provider_backend.rs +++ b/commons/zenoh-shm/src/api/provider/shm_provider_backend.rs @@ -22,7 +22,7 @@ use super::{ /// The provider backend trait /// Implemet this interface to create a Zenoh-compatible shared memory provider #[zenoh_macros::unstable_doc] -pub trait SharedMemoryProviderBackend { +pub trait ShmProviderBackend { /// Allocate the chunk of desired size. /// If successful, the result's chunk size will be >= len #[zenoh_macros::unstable_doc] diff --git a/commons/zenoh-shm/src/lib.rs b/commons/zenoh-shm/src/lib.rs index 316477d26e..eec962a7e4 100644 --- a/commons/zenoh-shm/src/lib.rs +++ b/commons/zenoh-shm/src/lib.rs @@ -50,14 +50,14 @@ pub mod posix_shm; pub mod reader; pub mod watchdog; -/// Informations about a [`SharedMemoryBuf`]. +/// Informations about a [`ShmBufInner`]. /// -/// This that can be serialized and can be used to retrieve the [`SharedMemoryBuf`] in a remote process. +/// This that can be serialized and can be used to retrieve the [`ShmBufInner`] in a remote process. #[derive(Clone, Debug, PartialEq, Eq)] -pub struct SharedMemoryBufInfo { +pub struct ShmBufInfo { /// The data chunk descriptor pub data_descriptor: ChunkDescriptor, - /// Protocol identifier for particular SharedMemory implementation + /// Protocol identifier for particular SHM implementation pub shm_protocol: ProtocolID, /// Actual data length /// NOTE: data_descriptor's len is >= of this len and describes the actual memory length @@ -72,7 +72,7 @@ pub struct SharedMemoryBufInfo { pub generation: u32, } -impl SharedMemoryBufInfo { +impl ShmBufInfo { pub fn new( data_descriptor: ChunkDescriptor, shm_protocol: ProtocolID, @@ -80,8 +80,8 @@ impl SharedMemoryBufInfo { watchdog_descriptor: Descriptor, header_descriptor: HeaderDescriptor, generation: u32, - ) -> SharedMemoryBufInfo { - SharedMemoryBufInfo { + ) -> ShmBufInfo { + ShmBufInfo { data_descriptor, shm_protocol, data_len, @@ -94,14 +94,14 @@ impl SharedMemoryBufInfo { /// A zenoh buffer in shared memory. #[non_exhaustive] -pub struct SharedMemoryBuf { +pub struct ShmBufInner { pub(crate) header: OwnedHeaderDescriptor, pub(crate) buf: AtomicPtr, - pub info: SharedMemoryBufInfo, + pub info: ShmBufInfo, pub(crate) watchdog: Arc, } -impl PartialEq for SharedMemoryBuf { +impl PartialEq for ShmBufInner { fn eq(&self, other: &Self) -> bool { // currently there is no API to resize an SHM buffer, but it is intended in the future, // so I add size comparsion here to avoid future bugs :) @@ -109,11 +109,11 @@ impl PartialEq for SharedMemoryBuf { && self.info.data_len == other.info.data_len } } -impl Eq for SharedMemoryBuf {} +impl Eq for ShmBufInner {} -impl std::fmt::Debug for SharedMemoryBuf { +impl std::fmt::Debug for ShmBufInner { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("SharedMemoryBuf") + f.debug_struct("ShmBufInner") .field("header", &self.header) .field("buf", &self.buf) .field("info", &self.info) @@ -121,7 +121,7 @@ impl std::fmt::Debug for SharedMemoryBuf { } } -impl SharedMemoryBuf { +impl ShmBufInner { pub fn len(&self) -> usize { self.info.data_len } @@ -154,10 +154,7 @@ impl SharedMemoryBuf { // PRIVATE: fn as_slice(&self) -> &[u8] { - tracing::trace!( - "SharedMemoryBuf::as_slice() == len = {:?}", - self.info.data_len - ); + tracing::trace!("ShmBufInner::as_slice() == len = {:?}", self.info.data_len); let bp = self.buf.load(Ordering::SeqCst); unsafe { std::slice::from_raw_parts(bp, self.info.data_len) } } @@ -183,21 +180,21 @@ impl SharedMemoryBuf { } } -impl Drop for SharedMemoryBuf { +impl Drop for ShmBufInner { fn drop(&mut self) { // # Safety - // obviouly, we need to decrement refcount when dropping SharedMemoryBuf instance + // obviouly, we need to decrement refcount when dropping ShmBufInner instance unsafe { self.dec_ref_count() }; } } -impl Clone for SharedMemoryBuf { +impl Clone for ShmBufInner { fn clone(&self) -> Self { // # Safety - // obviouly, we need to increment refcount when cloning SharedMemoryBuf instance + // obviouly, we need to increment refcount when cloning ShmBufInner instance unsafe { self.inc_ref_count() }; let bp = self.buf.load(Ordering::SeqCst); - SharedMemoryBuf { + ShmBufInner { header: self.header.clone(), buf: AtomicPtr::new(bp), info: self.info.clone(), @@ -207,20 +204,20 @@ impl Clone for SharedMemoryBuf { } // Buffer impls -// - SharedMemoryBuf -impl AsRef<[u8]> for SharedMemoryBuf { +// - ShmBufInner +impl AsRef<[u8]> for ShmBufInner { fn as_ref(&self) -> &[u8] { self.as_slice() } } -impl AsMut<[u8]> for SharedMemoryBuf { +impl AsMut<[u8]> for ShmBufInner { fn as_mut(&mut self) -> &mut [u8] { unsafe { self.as_mut_slice_inner() } } } -impl ZSliceBuffer for SharedMemoryBuf { +impl ZSliceBuffer for ShmBufInner { fn as_slice(&self) -> &[u8] { self.as_ref() } diff --git a/commons/zenoh-shm/src/reader.rs b/commons/zenoh-shm/src/reader.rs index c2ce2303a9..1298c38aff 100644 --- a/commons/zenoh-shm/src/reader.rs +++ b/commons/zenoh-shm/src/reader.rs @@ -19,34 +19,34 @@ use zenoh_result::ZResult; use crate::{ api::{ - client::shared_memory_segment::SharedMemorySegment, - client_storage::SharedMemoryClientStorage, + client::shm_segment::ShmSegment, + client_storage::ShmClientStorage, common::types::{ProtocolID, SegmentID}, }, header::subscription::GLOBAL_HEADER_SUBSCRIPTION, watchdog::confirmator::GLOBAL_CONFIRMATOR, - SharedMemoryBuf, SharedMemoryBufInfo, + ShmBufInfo, ShmBufInner, }; #[derive(Debug, Clone, Eq, PartialEq)] -pub struct SharedMemoryReader { - client_storage: Arc, +pub struct ShmReader { + client_storage: Arc, } -impl Deref for SharedMemoryReader { - type Target = SharedMemoryClientStorage; +impl Deref for ShmReader { + type Target = ShmClientStorage; fn deref(&self) -> &Self::Target { &self.client_storage } } -impl SharedMemoryReader { - pub fn new(client_storage: Arc) -> Self { +impl ShmReader { + pub fn new(client_storage: Arc) -> Self { Self { client_storage } } - pub fn read_shmbuf(&self, info: &SharedMemoryBufInfo) -> ZResult { + pub fn read_shmbuf(&self, info: &ShmBufInfo) -> ZResult { // Read does not increment the reference count as it is assumed // that the sender of this buffer has incremented it for us. @@ -54,7 +54,7 @@ impl SharedMemoryReader { let watchdog = Arc::new(GLOBAL_CONFIRMATOR.add(&info.watchdog_descriptor)?); let segment = self.ensure_segment(info)?; - let shmb = SharedMemoryBuf { + let shmb = ShmBufInner { header: GLOBAL_HEADER_SUBSCRIPTION.link(&info.header_descriptor)?, buf: segment.map(info.data_descriptor.chunk)?, info: info.clone(), @@ -68,7 +68,7 @@ impl SharedMemoryReader { } } - fn ensure_segment(&self, info: &SharedMemoryBufInfo) -> ZResult> { + fn ensure_segment(&self, info: &ShmBufInfo) -> ZResult> { let id = GlobalDataSegmentID::new(info.shm_protocol, info.data_descriptor.segment); // fastest path: try to get access to already mounted SHM segment diff --git a/commons/zenoh-shm/tests/posix_shm_provider.rs b/commons/zenoh-shm/tests/posix_shm_provider.rs index 4c27879623..60104be6cf 100644 --- a/commons/zenoh-shm/tests/posix_shm_provider.rs +++ b/commons/zenoh-shm/tests/posix_shm_provider.rs @@ -13,13 +13,12 @@ // use zenoh_shm::api::{ - client::shared_memory_client::SharedMemoryClient, + client::shm_client::ShmClient, protocol_implementations::posix::{ - posix_shared_memory_client::PosixSharedMemoryClient, - posix_shared_memory_provider_backend::PosixSharedMemoryProviderBackend, + posix_shm_client::PosixShmClient, posix_shm_provider_backend::PosixShmProviderBackend, }, provider::{ - shared_memory_provider_backend::SharedMemoryProviderBackend, + shm_provider_backend::ShmProviderBackend, types::{AllocAlignment, MemoryLayout}, }, }; @@ -29,43 +28,43 @@ static BUFFER_SIZE: usize = 1024; #[test] fn posix_shm_provider_create() { - let _backend = PosixSharedMemoryProviderBackend::builder() + let _backend = PosixShmProviderBackend::builder() .with_size(1024) .expect("Error creating Layout!") .res() - .expect("Error creating PosixSharedMemoryProviderBackend!"); + .expect("Error creating PosixShmProviderBackend!"); } #[test] fn posix_shm_provider_alloc() { - let backend = PosixSharedMemoryProviderBackend::builder() + let backend = PosixShmProviderBackend::builder() .with_size(1024) .expect("Error creating Layout!") .res() - .expect("Error creating PosixSharedMemoryProviderBackend!"); + .expect("Error creating PosixShmProviderBackend!"); let layout = MemoryLayout::new(100, AllocAlignment::default()).unwrap(); let _buf = backend .alloc(&layout) - .expect("PosixSharedMemoryProviderBackend: error allocating buffer"); + .expect("PosixShmProviderBackend: error allocating buffer"); } #[test] fn posix_shm_provider_open() { - let backend = PosixSharedMemoryProviderBackend::builder() + let backend = PosixShmProviderBackend::builder() .with_size(1024) .expect("Error creating Layout!") .res() - .expect("Error creating PosixSharedMemoryProviderBackend!"); + .expect("Error creating PosixShmProviderBackend!"); let layout = MemoryLayout::new(100, AllocAlignment::default()).unwrap(); let buf = backend .alloc(&layout) - .expect("PosixSharedMemoryProviderBackend: error allocating buffer"); + .expect("PosixShmProviderBackend: error allocating buffer"); - let client = PosixSharedMemoryClient {}; + let client = PosixShmClient {}; let _segment = client .attach(buf.descriptor.segment) @@ -74,11 +73,11 @@ fn posix_shm_provider_open() { #[test] fn posix_shm_provider_allocator() { - let backend = PosixSharedMemoryProviderBackend::builder() + let backend = PosixShmProviderBackend::builder() .with_size(BUFFER_SIZE * BUFFER_NUM) .expect("Error creating Layout!") .res() - .expect("Error creating PosixSharedMemoryProviderBackend!"); + .expect("Error creating PosixShmProviderBackend!"); let layout = MemoryLayout::new(BUFFER_SIZE, AllocAlignment::default()).unwrap(); @@ -87,7 +86,7 @@ fn posix_shm_provider_allocator() { for _ in 0..BUFFER_NUM { let buf = backend .alloc(&layout) - .expect("PosixSharedMemoryProviderBackend: error allocating buffer"); + .expect("PosixShmProviderBackend: error allocating buffer"); buffers.push(buf); } @@ -103,7 +102,7 @@ fn posix_shm_provider_allocator() { // allocate new one let buf = backend .alloc(&layout) - .expect("PosixSharedMemoryProviderBackend: error allocating buffer"); + .expect("PosixShmProviderBackend: error allocating buffer"); buffers.push(buf); } diff --git a/examples/examples/z_alloc_shm.rs b/examples/examples/z_alloc_shm.rs index 4423e0b07a..1beabaebd8 100644 --- a/examples/examples/z_alloc_shm.rs +++ b/examples/examples/z_alloc_shm.rs @@ -14,8 +14,8 @@ use zenoh::{ prelude::*, shm::{ - AllocAlignment, BlockOn, Deallocate, Defragment, GarbageCollect, - PosixSharedMemoryProviderBackend, SharedMemoryProviderBuilder, POSIX_PROTOCOL_ID, + AllocAlignment, BlockOn, Deallocate, Defragment, GarbageCollect, PosixShmProviderBackend, + ShmProviderBuilder, POSIX_PROTOCOL_ID, }, Config, }; @@ -29,14 +29,14 @@ async fn main() { async fn run() -> ZResult<()> { // create an SHM backend... - // NOTE: For extended PosixSharedMemoryProviderBackend API please check z_posix_shm_provider.rs - let backend = PosixSharedMemoryProviderBackend::builder() + // NOTE: For extended PosixShmProviderBackend API please check z_posix_shm_provider.rs + let backend = PosixShmProviderBackend::builder() .with_size(65536) .unwrap() .res() .unwrap(); // ...and an SHM provider - let provider = SharedMemoryProviderBuilder::builder() + let provider = ShmProviderBuilder::builder() .protocol_id::() .backend(backend) .res(); @@ -90,7 +90,7 @@ async fn run() -> ZResult<()> { simple_layout }; - // Allocate SharedMemoryBuf + // Allocate ShmBufInner // Policy is a generics-based API to describe necessary allocation behaviour // that will be higly optimized at compile-time. // Policy resolvable can be sync and async. diff --git a/examples/examples/z_bytes_shm.rs b/examples/examples/z_bytes_shm.rs index 66d47193ae..75bf01e3bf 100644 --- a/examples/examples/z_bytes_shm.rs +++ b/examples/examples/z_bytes_shm.rs @@ -15,21 +15,21 @@ use zenoh::{ bytes::ZBytes, prelude::*, shm::{ - zshm, zshmmut, PosixSharedMemoryProviderBackend, SharedMemoryProviderBuilder, ZShm, - ZShmMut, POSIX_PROTOCOL_ID, + zshm, zshmmut, PosixShmProviderBackend, ShmProviderBuilder, ZShm, ZShmMut, + POSIX_PROTOCOL_ID, }, }; fn main() { // create an SHM backend... - // NOTE: For extended PosixSharedMemoryProviderBackend API please check z_posix_shm_provider.rs - let backend = PosixSharedMemoryProviderBackend::builder() + // NOTE: For extended PosixShmProviderBackend API please check z_posix_shm_provider.rs + let backend = PosixShmProviderBackend::builder() .with_size(4096) .unwrap() .res() .unwrap(); // ...and an SHM provider - let provider = SharedMemoryProviderBuilder::builder() + let provider = ShmProviderBuilder::builder() .protocol_id::() .backend(backend) .res(); diff --git a/examples/examples/z_get_shm.rs b/examples/examples/z_get_shm.rs index 8766d54b95..fd902bfe65 100644 --- a/examples/examples/z_get_shm.rs +++ b/examples/examples/z_get_shm.rs @@ -19,8 +19,8 @@ use zenoh::{ query::QueryTarget, selector::KeyExpr, shm::{ - zshm, BlockOn, GarbageCollect, PosixSharedMemoryProviderBackend, - SharedMemoryProviderBuilder, POSIX_PROTOCOL_ID, + zshm, BlockOn, GarbageCollect, PosixShmProviderBackend, ShmProviderBuilder, + POSIX_PROTOCOL_ID, }, Config, }; @@ -45,14 +45,14 @@ async fn main() { println!("Creating POSIX SHM provider..."); // create an SHM backend... - // NOTE: For extended PosixSharedMemoryProviderBackend API please check z_posix_shm_provider.rs - let backend = PosixSharedMemoryProviderBackend::builder() + // NOTE: For extended PosixShmProviderBackend API please check z_posix_shm_provider.rs + let backend = PosixShmProviderBackend::builder() .with_size(N * 1024) .unwrap() .res() .unwrap(); // ...and an SHM provider - let provider = SharedMemoryProviderBuilder::builder() + let provider = ShmProviderBuilder::builder() .protocol_id::() .backend(backend) .res(); @@ -69,7 +69,7 @@ async fn main() { let content = value .take() - .unwrap_or_else(|| "Get from SharedMemory Rust!".to_string()); + .unwrap_or_else(|| "Get from SHM Rust!".to_string()); sbuf[0..content.len()].copy_from_slice(content.as_bytes()); println!("Sending Query '{selector}'..."); @@ -87,7 +87,7 @@ async fn main() { print!(">> Received ('{}': ", sample.key_expr().as_str()); match sample.payload().deserialize::<&zshm>() { Ok(payload) => println!("'{}')", String::from_utf8_lossy(payload),), - Err(e) => println!("'Not a SharedMemoryBuf: {:?}')", e), + Err(e) => println!("'Not a ShmBufInner: {:?}')", e), } } Err(err) => { diff --git a/examples/examples/z_ping_shm.rs b/examples/examples/z_ping_shm.rs index c0cc20127d..033fe2d844 100644 --- a/examples/examples/z_ping_shm.rs +++ b/examples/examples/z_ping_shm.rs @@ -19,7 +19,7 @@ use zenoh::{ key_expr::keyexpr, prelude::*, publisher::CongestionControl, - shm::{PosixSharedMemoryProviderBackend, SharedMemoryProviderBuilder, POSIX_PROTOCOL_ID}, + shm::{PosixShmProviderBackend, ShmProviderBuilder, POSIX_PROTOCOL_ID}, Config, }; use zenoh_examples::CommonArgs; @@ -53,14 +53,14 @@ fn main() { let mut samples = Vec::with_capacity(n); // create an SHM backend... - // NOTE: For extended PosixSharedMemoryProviderBackend API please check z_posix_shm_provider.rs - let backend = PosixSharedMemoryProviderBackend::builder() + // NOTE: For extended PosixShmProviderBackend API please check z_posix_shm_provider.rs + let backend = PosixShmProviderBackend::builder() .with_size(size) .unwrap() .res() .unwrap(); // ...and an SHM provider - let provider = SharedMemoryProviderBuilder::builder() + let provider = ShmProviderBuilder::builder() .protocol_id::() .backend(backend) .res(); diff --git a/examples/examples/z_posix_shm_provider.rs b/examples/examples/z_posix_shm_provider.rs index d89d419846..7c68d56bd3 100644 --- a/examples/examples/z_posix_shm_provider.rs +++ b/examples/examples/z_posix_shm_provider.rs @@ -12,14 +12,13 @@ // ZettaScale Zenoh Team, // use zenoh::shm::{ - AllocAlignment, MemoryLayout, PosixSharedMemoryProviderBackend, SharedMemoryProviderBuilder, - POSIX_PROTOCOL_ID, + AllocAlignment, MemoryLayout, PosixShmProviderBackend, ShmProviderBuilder, POSIX_PROTOCOL_ID, }; fn main() { // Construct an SHM backend let backend = { - // NOTE: code in this block is a specific PosixSharedMemoryProviderBackend API. + // NOTE: code in this block is a specific PosixShmProviderBackend API. // Total amount of shared memory to allocate let size = 4096; @@ -33,14 +32,14 @@ fn main() { let provider_layout = MemoryLayout::new(size, provider_alignment).unwrap(); // Build a provider backend - PosixSharedMemoryProviderBackend::builder() + PosixShmProviderBackend::builder() .with_layout(provider_layout) .res() .unwrap() }; // Construct an SHM provider for particular backend and POSIX_PROTOCOL_ID - let _shared_memory_provider = SharedMemoryProviderBuilder::builder() + let _shm_provider = ShmProviderBuilder::builder() .protocol_id::() .backend(backend) .res(); diff --git a/examples/examples/z_pub_shm.rs b/examples/examples/z_pub_shm.rs index 9c4e64c496..dfb6fb44a6 100644 --- a/examples/examples/z_pub_shm.rs +++ b/examples/examples/z_pub_shm.rs @@ -16,8 +16,7 @@ use zenoh::{ key_expr::KeyExpr, prelude::*, shm::{ - BlockOn, GarbageCollect, PosixSharedMemoryProviderBackend, SharedMemoryProviderBuilder, - POSIX_PROTOCOL_ID, + BlockOn, GarbageCollect, PosixShmProviderBackend, ShmProviderBuilder, POSIX_PROTOCOL_ID, }, Config, }; @@ -42,14 +41,14 @@ async fn main() -> Result<(), ZError> { println!("Creating POSIX SHM provider..."); // create an SHM backend... - // NOTE: For extended PosixSharedMemoryProviderBackend API please check z_posix_shm_provider.rs - let backend = PosixSharedMemoryProviderBackend::builder() + // NOTE: For extended PosixShmProviderBackend API please check z_posix_shm_provider.rs + let backend = PosixShmProviderBackend::builder() .with_size(N * 1024) .unwrap() .res() .unwrap(); // ...and an SHM provider - let provider = SharedMemoryProviderBuilder::builder() + let provider = ShmProviderBuilder::builder() .protocol_id::() .backend(backend) .res(); @@ -97,7 +96,7 @@ struct Args { #[arg(short, long, default_value = "demo/example/zenoh-rs-pub")] /// The key expression to publish onto. path: KeyExpr<'static>, - #[arg(short, long, default_value = "Pub from SharedMemory Rust!")] + #[arg(short, long, default_value = "Pub from SHM Rust!")] /// The value of to publish. value: String, #[command(flatten)] diff --git a/examples/examples/z_pub_shm_thr.rs b/examples/examples/z_pub_shm_thr.rs index fca2994d33..cff095024e 100644 --- a/examples/examples/z_pub_shm_thr.rs +++ b/examples/examples/z_pub_shm_thr.rs @@ -16,7 +16,7 @@ use zenoh::{ internal::buffers::ZSlice, prelude::*, publisher::CongestionControl, - shm::{PosixSharedMemoryProviderBackend, SharedMemoryProviderBuilder, POSIX_PROTOCOL_ID}, + shm::{PosixShmProviderBackend, ShmProviderBuilder, POSIX_PROTOCOL_ID}, Config, }; use zenoh_examples::CommonArgs; @@ -35,14 +35,14 @@ async fn main() { let z = zenoh::open(config).await.unwrap(); // create an SHM backend... - // NOTE: For extended PosixSharedMemoryProviderBackend API please check z_posix_shm_provider.rs - let backend = PosixSharedMemoryProviderBackend::builder() + // NOTE: For extended PosixShmProviderBackend API please check z_posix_shm_provider.rs + let backend = PosixShmProviderBackend::builder() .with_size(sm_size) .unwrap() .res() .unwrap(); // ...and an SHM provider - let provider = SharedMemoryProviderBuilder::builder() + let provider = ShmProviderBuilder::builder() .protocol_id::() .backend(backend) .res(); diff --git a/examples/examples/z_queryable_shm.rs b/examples/examples/z_queryable_shm.rs index ec2058c897..c76a031286 100644 --- a/examples/examples/z_queryable_shm.rs +++ b/examples/examples/z_queryable_shm.rs @@ -16,8 +16,8 @@ use zenoh::{ key_expr::KeyExpr, prelude::*, shm::{ - zshm, BlockOn, GarbageCollect, PosixSharedMemoryProviderBackend, - SharedMemoryProviderBuilder, POSIX_PROTOCOL_ID, + zshm, BlockOn, GarbageCollect, PosixShmProviderBackend, ShmProviderBuilder, + POSIX_PROTOCOL_ID, }, Config, }; @@ -42,14 +42,14 @@ async fn main() { println!("Creating POSIX SHM provider..."); // create an SHM backend... - // NOTE: For extended PosixSharedMemoryProviderBackend API please check z_posix_shm_provider.rs - let backend = PosixSharedMemoryProviderBackend::builder() + // NOTE: For extended PosixShmProviderBackend API please check z_posix_shm_provider.rs + let backend = PosixShmProviderBackend::builder() .with_size(N * 1024) .unwrap() .res() .unwrap(); // ...and an SHM provider - let provider = SharedMemoryProviderBuilder::builder() + let provider = ShmProviderBuilder::builder() .protocol_id::() .backend(backend) .res(); @@ -71,7 +71,7 @@ async fn main() { if let Some(payload) = query.payload() { match payload.deserialize::<&zshm>() { Ok(payload) => print!(": '{}'", String::from_utf8_lossy(payload)), - Err(e) => print!(": 'Not a SharedMemoryBuf: {:?}'", e), + Err(e) => print!(": 'Not a ShmBufInner: {:?}'", e), } } println!(")"); @@ -105,7 +105,7 @@ struct Args { #[arg(short, long, default_value = "demo/example/zenoh-rs-queryable")] /// The key expression matching queries to reply to. key: KeyExpr<'static>, - #[arg(short, long, default_value = "Queryable from SharedMemory Rust!")] + #[arg(short, long, default_value = "Queryable from SHM Rust!")] /// The value to reply to queries. value: String, #[arg(long)] diff --git a/examples/examples/z_sub_shm.rs b/examples/examples/z_sub_shm.rs index 4cc797d8b4..e32c6140ac 100644 --- a/examples/examples/z_sub_shm.rs +++ b/examples/examples/z_sub_shm.rs @@ -42,7 +42,7 @@ async fn main() { ); match sample.payload().deserialize::<&zshm>() { Ok(payload) => print!("'{}'", String::from_utf8_lossy(payload)), - Err(e) => print!("'Not a SharedMemoryBuf: {:?}'", e), + Err(e) => print!("'Not a ShmBufInner: {:?}'", e), } println!(")"); } @@ -62,7 +62,7 @@ async fn main() { // kind, key_expr, payload // ), // Err(e) => { - // println!(">> [Subscriber] Not a SharedMemoryBuf: {:?}", e); + // println!(">> [Subscriber] Not a ShmBufInner: {:?}", e); // } // } // } diff --git a/io/zenoh-transport/src/manager.rs b/io/zenoh-transport/src/manager.rs index 7d5e8f0885..f578e4d4fa 100644 --- a/io/zenoh-transport/src/manager.rs +++ b/io/zenoh-transport/src/manager.rs @@ -27,7 +27,7 @@ use zenoh_result::{bail, ZResult}; #[cfg(feature = "shared-memory")] use zenoh_shm::api::client_storage::GLOBAL_CLIENT_STORAGE; #[cfg(feature = "shared-memory")] -use zenoh_shm::reader::SharedMemoryReader; +use zenoh_shm::reader::ShmReader; use zenoh_task::TaskController; use super::{ @@ -140,12 +140,12 @@ pub struct TransportManagerBuilder { tx_threads: usize, protocols: Option>, #[cfg(feature = "shared-memory")] - shm_reader: Option, + shm_reader: Option, } impl TransportManagerBuilder { #[cfg(feature = "shared-memory")] - pub fn shm_reader(mut self, shm_reader: Option) -> Self { + pub fn shm_reader(mut self, shm_reader: Option) -> Self { self.shm_reader = shm_reader; self } @@ -268,7 +268,7 @@ impl TransportManagerBuilder { #[cfg(feature = "shared-memory")] let shm_reader = self .shm_reader - .unwrap_or_else(|| SharedMemoryReader::new(GLOBAL_CLIENT_STORAGE.clone())); + .unwrap_or_else(|| ShmReader::new(GLOBAL_CLIENT_STORAGE.clone())); let unicast = self.unicast.build( &mut prng, @@ -364,7 +364,7 @@ pub struct TransportManager { pub(crate) locator_inspector: zenoh_link::LocatorInspector, pub(crate) new_unicast_link_sender: NewLinkChannelSender, #[cfg(feature = "shared-memory")] - pub(crate) shmr: SharedMemoryReader, + pub(crate) shmr: ShmReader, #[cfg(feature = "stats")] pub(crate) stats: Arc, pub(crate) task_controller: TaskController, @@ -374,7 +374,7 @@ impl TransportManager { pub fn new( params: TransportManagerParams, mut prng: PseudoRng, - #[cfg(feature = "shared-memory")] shmr: SharedMemoryReader, + #[cfg(feature = "shared-memory")] shmr: ShmReader, ) -> TransportManager { // Initialize the Cipher let mut key = [0_u8; BlockCipher::BLOCK_SIZE]; diff --git a/io/zenoh-transport/src/multicast/manager.rs b/io/zenoh-transport/src/multicast/manager.rs index 9e7ff1ea35..cab59dfb32 100644 --- a/io/zenoh-transport/src/multicast/manager.rs +++ b/io/zenoh-transport/src/multicast/manager.rs @@ -17,7 +17,7 @@ use tokio::sync::Mutex; #[cfg(feature = "transport_compression")] use zenoh_config::CompressionMulticastConf; #[cfg(feature = "shared-memory")] -use zenoh_config::SharedMemoryConf; +use zenoh_config::ShmConf; use zenoh_config::{Config, LinkTxConf}; use zenoh_core::zasynclock; use zenoh_link::*; @@ -152,7 +152,7 @@ impl Default for TransportManagerBuilderMulticast { fn default() -> TransportManagerBuilderMulticast { let link_tx = LinkTxConf::default(); #[cfg(feature = "shared-memory")] - let shm = SharedMemoryConf::default(); + let shm = ShmConf::default(); #[cfg(feature = "transport_compression")] let compression = CompressionMulticastConf::default(); diff --git a/io/zenoh-transport/src/shm.rs b/io/zenoh-transport/src/shm.rs index 7a50a68742..8450ad878e 100644 --- a/io/zenoh-transport/src/shm.rs +++ b/io/zenoh-transport/src/shm.rs @@ -26,10 +26,7 @@ use zenoh_protocol::{ }, }; use zenoh_result::ZResult; -use zenoh_shm::{ - api::common::types::ProtocolID, reader::SharedMemoryReader, SharedMemoryBuf, - SharedMemoryBufInfo, -}; +use zenoh_shm::{api::common::types::ProtocolID, reader::ShmReader, ShmBufInfo, ShmBufInner}; use crate::unicast::establishment::ext::shm::AuthSegment; @@ -84,7 +81,7 @@ pub fn map_zmsg_to_partner( } } -pub fn map_zmsg_to_shmbuf(msg: &mut NetworkMessage, shmr: &SharedMemoryReader) -> ZResult<()> { +pub fn map_zmsg_to_shmbuf(msg: &mut NetworkMessage, shmr: &ShmReader) -> ZResult<()> { match &mut msg.body { NetworkBody::Push(Push { payload, .. }) => match payload { PushBody::Put(b) => b.map_to_shmbuf(shmr), @@ -117,7 +114,7 @@ trait MapShm { // RX: // - shminfo -> shmbuf // - rawbuf -> rawbuf (no changes) - fn map_to_shmbuf(&mut self, shmr: &SharedMemoryReader) -> ZResult<()>; + fn map_to_shmbuf(&mut self, shmr: &ShmReader) -> ZResult<()>; // TX: // - shmbuf -> shminfo if partner supports shmbuf's SHM protocol @@ -170,7 +167,7 @@ impl MapShm for Put { map_to_partner!(payload, ext_shm, partner_shm_cfg) } - fn map_to_shmbuf(&mut self, shmr: &SharedMemoryReader) -> ZResult<()> { + fn map_to_shmbuf(&mut self, shmr: &ShmReader) -> ZResult<()> { let Self { payload, ext_shm, .. } = self; @@ -197,7 +194,7 @@ impl MapShm for Query { } } - fn map_to_shmbuf(&mut self, shmr: &SharedMemoryReader) -> ZResult<()> { + fn map_to_shmbuf(&mut self, shmr: &ShmReader) -> ZResult<()> { if let Self { ext_body: Some(QueryBodyType { payload, ext_shm, .. @@ -229,7 +226,7 @@ impl MapShm for Reply { } } - fn map_to_shmbuf(&mut self, shmr: &SharedMemoryReader) -> ZResult<()> { + fn map_to_shmbuf(&mut self, shmr: &ShmReader) -> ZResult<()> { match &mut self.payload { PushBody::Put(put) => { let Put { @@ -254,7 +251,7 @@ impl MapShm for Err { map_to_partner!(payload, ext_shm, partner_shm_cfg) } - fn map_to_shmbuf(&mut self, shmr: &SharedMemoryReader) -> ZResult<()> { + fn map_to_shmbuf(&mut self, shmr: &ShmReader) -> ZResult<()> { let Self { payload, ext_shm, .. } = self; @@ -264,7 +261,7 @@ impl MapShm for Err { #[cold] #[inline(never)] -pub fn shmbuf_to_rawbuf(shmb: &SharedMemoryBuf) -> ZSlice { +pub fn shmbuf_to_rawbuf(shmb: &ShmBufInner) -> ZSlice { // Convert shmb to raw buffer // TODO: optimize this! We should not make additional buffer copy here, // but we need to make serializer serialize SHM buffer as raw buffer. @@ -273,7 +270,7 @@ pub fn shmbuf_to_rawbuf(shmb: &SharedMemoryBuf) -> ZSlice { #[cold] #[inline(never)] -pub fn shmbuf_to_shminfo(shmb: &SharedMemoryBuf) -> ZResult { +pub fn shmbuf_to_shminfo(shmb: &ShmBufInner) -> ZResult { // Serialize the shmb info let codec = Zenoh080::new(); let mut info = vec![]; @@ -281,7 +278,7 @@ pub fn shmbuf_to_shminfo(shmb: &SharedMemoryBuf) -> ZResult { codec .write(&mut writer, &shmb.info) .map_err(|e| zerror!("{:?}", e))?; - // Increase the reference count so to keep the SharedMemoryBuf valid + // Increase the reference count so to keep the ShmBufInner valid unsafe { shmb.inc_ref_count() }; // Replace the content of the slice let mut zslice: ZSlice = info.into(); @@ -295,7 +292,7 @@ fn to_shm_partner( ) -> ZResult { let mut res = false; for zs in zbuf.zslices_mut() { - if let Some(shmb) = zs.downcast_ref::() { + if let Some(shmb) = zs.downcast_ref::() { if partner_shm_cfg.supports_protocol(shmb.info.shm_protocol) { *zs = shmbuf_to_shminfo(shmb)?; res = true; @@ -310,14 +307,14 @@ fn to_shm_partner( fn to_non_shm_partner(zbuf: &mut ZBuf) { for zs in zbuf.zslices_mut() { - if let Some(shmb) = zs.downcast_ref::() { + if let Some(shmb) = zs.downcast_ref::() { // Replace the content of the slice with rawbuf *zs = shmbuf_to_rawbuf(shmb) } } } -pub fn map_zbuf_to_shmbuf(zbuf: &mut ZBuf, shmr: &SharedMemoryReader) -> ZResult<()> { +pub fn map_zbuf_to_shmbuf(zbuf: &mut ZBuf, shmr: &ShmReader) -> ZResult<()> { for zs in zbuf.zslices_mut().filter(|x| x.kind == ZSliceKind::ShmPtr) { map_zslice_to_shmbuf(zs, shmr)?; } @@ -326,12 +323,12 @@ pub fn map_zbuf_to_shmbuf(zbuf: &mut ZBuf, shmr: &SharedMemoryReader) -> ZResult #[cold] #[inline(never)] -pub fn map_zslice_to_shmbuf(zslice: &mut ZSlice, shmr: &SharedMemoryReader) -> ZResult<()> { +pub fn map_zslice_to_shmbuf(zslice: &mut ZSlice, shmr: &ShmReader) -> ZResult<()> { let codec = Zenoh080::new(); let mut reader = zslice.reader(); // Deserialize the shminfo - let shmbinfo: SharedMemoryBufInfo = codec.read(&mut reader).map_err(|e| zerror!("{:?}", e))?; + let shmbinfo: ShmBufInfo = codec.read(&mut reader).map_err(|e| zerror!("{:?}", e))?; // Mount shmbuf let smb = shmr.read_shmbuf(&shmbinfo)?; diff --git a/io/zenoh-transport/src/unicast/manager.rs b/io/zenoh-transport/src/unicast/manager.rs index 89ecc1cb1c..a3bc1a56a8 100644 --- a/io/zenoh-transport/src/unicast/manager.rs +++ b/io/zenoh-transport/src/unicast/manager.rs @@ -24,7 +24,7 @@ use tokio::sync::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard}; #[cfg(feature = "transport_compression")] use zenoh_config::CompressionUnicastConf; #[cfg(feature = "shared-memory")] -use zenoh_config::SharedMemoryConf; +use zenoh_config::ShmConf; use zenoh_config::{Config, LinkTxConf, QoSUnicastConf, TransportUnicastConf}; use zenoh_core::{zasynclock, zcondfeat}; use zenoh_crypto::PseudoRng; @@ -35,7 +35,7 @@ use zenoh_protocol::{ }; use zenoh_result::{bail, zerror, ZResult}; #[cfg(feature = "shared-memory")] -use zenoh_shm::reader::SharedMemoryReader; +use zenoh_shm::reader::ShmReader; #[cfg(feature = "shared-memory")] use super::establishment::ext::shm::AuthUnicast; @@ -216,7 +216,7 @@ impl TransportManagerBuilderUnicast { pub fn build( self, #[allow(unused)] prng: &mut PseudoRng, // Required for #[cfg(feature = "transport_multilink")] - #[cfg(feature = "shared-memory")] shm_reader: &SharedMemoryReader, + #[cfg(feature = "shared-memory")] shm_reader: &ShmReader, ) -> ZResult { if self.is_qos && self.is_lowlatency { bail!("'qos' and 'lowlatency' options are incompatible"); @@ -267,7 +267,7 @@ impl Default for TransportManagerBuilderUnicast { let link_tx = LinkTxConf::default(); let qos = QoSUnicastConf::default(); #[cfg(feature = "shared-memory")] - let shm = SharedMemoryConf::default(); + let shm = ShmConf::default(); #[cfg(feature = "transport_compression")] let compression = CompressionUnicastConf::default(); diff --git a/io/zenoh-transport/tests/unicast_shm.rs b/io/zenoh-transport/tests/unicast_shm.rs index 1b2369e620..5ec01f9290 100644 --- a/io/zenoh-transport/tests/unicast_shm.rs +++ b/io/zenoh-transport/tests/unicast_shm.rs @@ -38,14 +38,11 @@ mod tests { use zenoh_shm::{ api::{ protocol_implementations::posix::{ - posix_shared_memory_provider_backend::PosixSharedMemoryProviderBackend, - protocol_id::POSIX_PROTOCOL_ID, - }, - provider::shared_memory_provider::{ - BlockOn, GarbageCollect, SharedMemoryProviderBuilder, + posix_shm_provider_backend::PosixShmProviderBackend, protocol_id::POSIX_PROTOCOL_ID, }, + provider::shm_provider::{BlockOn, GarbageCollect, ShmProviderBuilder}, }, - SharedMemoryBuf, + ShmBufInner, }; use zenoh_transport::{ multicast::TransportMulticast, unicast::TransportUnicast, TransportEventHandler, @@ -118,11 +115,10 @@ mod tests { NetworkBody::Push(m) => match m.payload { PushBody::Put(Put { payload, .. }) => { for zs in payload.zslices() { - if self.is_shm && zs.downcast_ref::().is_none() { - panic!("Expected SharedMemoryBuf: {:?}", zs); - } else if !self.is_shm && zs.downcast_ref::().is_some() - { - panic!("Not Expected SharedMemoryBuf: {:?}", zs); + if self.is_shm && zs.downcast_ref::().is_none() { + panic!("Expected ShmBufInner: {:?}", zs); + } else if !self.is_shm && zs.downcast_ref::().is_some() { + panic!("Not Expected ShmBufInner: {:?}", zs); } } payload.contiguous().into_owned() @@ -162,12 +158,12 @@ mod tests { let peer_net01 = ZenohId::try_from([3]).unwrap(); // create SHM provider - let backend = PosixSharedMemoryProviderBackend::builder() + let backend = PosixShmProviderBackend::builder() .with_size(2 * MSG_SIZE) .unwrap() .res() .unwrap(); - let shm01 = SharedMemoryProviderBuilder::builder() + let shm01 = ShmProviderBuilder::builder() .protocol_id::() .backend(backend) .res(); diff --git a/plugins/zenoh-plugin-rest/src/lib.rs b/plugins/zenoh-plugin-rest/src/lib.rs index a230aa8748..cfed5772f1 100644 --- a/plugins/zenoh-plugin-rest/src/lib.rs +++ b/plugins/zenoh-plugin-rest/src/lib.rs @@ -26,7 +26,7 @@ use http_types::Method; use serde::{Deserialize, Serialize}; use tide::{http::Mime, sse::Sender, Request, Response, Server, StatusCode}; use zenoh::{ - bytes::{StringOrBase64, ZBytes}, + bytes::ZBytes, encoding::Encoding, internal::{ plugins::{RunningPluginTrait, ZenohPlugin}, @@ -76,11 +76,11 @@ fn payload_to_json(payload: &ZBytes, encoding: &Encoding) -> serde_json::Value { payload .deserialize::() .unwrap_or_else(|_| { - serde_json::Value::String(StringOrBase64::from(payload).into_string()) + serde_json::Value::String(base64_encode(&Cow::from(payload))) }) } // otherwise convert to JSON string - _ => serde_json::Value::String(StringOrBase64::from(payload).into_string()), + _ => serde_json::Value::String(base64_encode(&Cow::from(payload))), } } } diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/align_queryable.rs b/plugins/zenoh-plugin-storage-manager/src/replica/align_queryable.rs index 30a40abe30..51c674cb1b 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/align_queryable.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/align_queryable.rs @@ -12,6 +12,7 @@ // ZettaScale Zenoh Team, // use std::{ + borrow::Cow, cmp::Ordering, collections::{BTreeSet, HashMap, HashSet}, str, @@ -20,8 +21,8 @@ use std::{ use async_std::sync::Arc; use zenoh::{ - bytes::StringOrBase64, key_expr::OwnedKeyExpr, prelude::*, sample::Sample, - selector::Parameters, time::Timestamp, value::Value, Session, + key_expr::OwnedKeyExpr, prelude::*, sample::Sample, selector::Parameters, time::Timestamp, + value::Value, Session, }; use super::{digest::*, Snapshotter}; @@ -233,8 +234,11 @@ impl AlignQueryable { tracing::trace!( "[ALIGN QUERYABLE] Received ('{}': '{}' @ {:?})", sample.key_expr().as_str(), - StringOrBase64::from(sample.payload()), - sample.timestamp() + sample + .payload() + .deserialize::>() + .unwrap_or(Cow::Borrowed("")), + sample.timestamp(), ); if let Some(timestamp) = sample.timestamp() { match timestamp.cmp(&logentry.timestamp) { diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs b/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs index c20f074e1b..7022885d2a 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs @@ -13,6 +13,7 @@ // use std::{ + borrow::Cow, collections::{HashMap, HashSet}, str, }; @@ -20,7 +21,6 @@ use std::{ use async_std::sync::{Arc, RwLock}; use flume::{Receiver, Sender}; use zenoh::{ - bytes::StringOrBase64, key_expr::{KeyExpr, OwnedKeyExpr}, prelude::*, sample::{Sample, SampleBuilder}, @@ -216,7 +216,7 @@ impl Aligner { let mut other_intervals: HashMap = HashMap::new(); // expecting sample.payload to be a vec of intervals with their checksum for each in reply_content { - match serde_json::from_str(&StringOrBase64::from(each.payload())) { + match serde_json::from_reader(each.payload().reader()) { Ok((i, c)) => { other_intervals.insert(i, c); } @@ -224,7 +224,7 @@ impl Aligner { tracing::error!("[ALIGNER] Error decoding reply: {}", e); no_err = false; } - }; + } } (other_intervals, no_err) } else { @@ -262,7 +262,7 @@ impl Aligner { let (reply_content, mut no_err) = self.perform_query(other_rep, properties).await; let mut other_subintervals: HashMap = HashMap::new(); for each in reply_content { - match serde_json::from_str(&StringOrBase64::from(each.payload())) { + match serde_json::from_reader(each.payload().reader()) { Ok((i, c)) => { other_subintervals.insert(i, c); } @@ -270,7 +270,7 @@ impl Aligner { tracing::error!("[ALIGNER] Error decoding reply: {}", e); no_err = false; } - }; + } } (other_subintervals, no_err) }; @@ -303,7 +303,7 @@ impl Aligner { let (reply_content, mut no_err) = self.perform_query(other_rep, properties).await; let mut other_content: HashMap> = HashMap::new(); for each in reply_content { - match serde_json::from_str(&StringOrBase64::from(each.payload())) { + match serde_json::from_reader(each.payload().reader()) { Ok((i, c)) => { other_content.insert(i, c); } @@ -311,7 +311,7 @@ impl Aligner { tracing::error!("[ALIGNER] Error decoding reply: {}", e); no_err = false; } - }; + } } // get subintervals diff let result = this.get_full_content_diff(other_content); @@ -343,7 +343,10 @@ impl Aligner { tracing::trace!( "[ALIGNER] Received ('{}': '{}')", sample.key_expr().as_str(), - StringOrBase64::from(sample.payload()) + sample + .payload() + .deserialize::>() + .unwrap_or(Cow::Borrowed("")) ); return_val.push(sample); } diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/mod.rs b/plugins/zenoh-plugin-storage-manager/src/replica/mod.rs index 0e4ffbd70a..114e5c206b 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/mod.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/mod.rs @@ -16,8 +16,7 @@ use std::{ collections::{HashMap, HashSet}, - str, - str::FromStr, + str::{self, FromStr}, time::{Duration, SystemTime}, }; @@ -44,9 +43,7 @@ pub use aligner::Aligner; pub use digest::{Digest, DigestConfig, EraType, LogEntry}; pub use snapshotter::Snapshotter; pub use storage::{ReplicationService, StorageService}; -use zenoh::{ - bytes::StringOrBase64, key_expr::OwnedKeyExpr, sample::Locality, time::Timestamp, Session, -}; +use zenoh::{key_expr::OwnedKeyExpr, sample::Locality, time::Timestamp, Session}; const ERA: &str = "era"; const INTERVALS: &str = "intervals"; @@ -227,21 +224,23 @@ impl Replica { }; let from = &sample.key_expr().as_str() [Replica::get_digest_key(&self.key_expr, ALIGN_PREFIX).len() + 1..]; - tracing::trace!( - "[DIGEST_SUB] From {} Received {} ('{}': '{}')", - from, - sample.kind(), - sample.key_expr().as_str(), - StringOrBase64::from(sample.payload()) - ); - let digest: Digest = match serde_json::from_str(&StringOrBase64::from(sample.payload())) - { + + let digest: Digest = match serde_json::from_reader(sample.payload().reader()) { Ok(digest) => digest, Err(e) => { tracing::error!("[DIGEST_SUB] Error in decoding the digest: {}", e); continue; } }; + + tracing::trace!( + "[DIGEST_SUB] From {} Received {} ('{}': '{:?}')", + from, + sample.kind(), + sample.key_expr().as_str(), + digest, + ); + let ts = digest.timestamp; let to_be_processed = self .processing_needed( @@ -260,7 +259,7 @@ impl Replica { tracing::error!("[DIGEST_SUB] Error sending digest to aligner: {}", e) } } - }; + } received.insert(from.to_string(), ts); } } diff --git a/plugins/zenoh-plugin-storage-manager/tests/operations.rs b/plugins/zenoh-plugin-storage-manager/tests/operations.rs index c6c473d77b..505634e6fb 100644 --- a/plugins/zenoh-plugin-storage-manager/tests/operations.rs +++ b/plugins/zenoh-plugin-storage-manager/tests/operations.rs @@ -16,12 +16,12 @@ // 1. normal case, just some wild card puts and deletes on existing keys and ensure it works // 2. check for dealing with out of order updates -use std::{str::FromStr, thread::sleep}; +use std::{borrow::Cow, str::FromStr, thread::sleep}; use async_std::task; use zenoh::{ - bytes::StringOrBase64, internal::zasync_executor_init, prelude::*, query::Reply, - sample::Sample, time::Timestamp, Config, Session, + internal::zasync_executor_init, prelude::*, query::Reply, sample::Sample, time::Timestamp, + Config, Session, }; use zenoh_plugin_trait::Plugin; @@ -96,7 +96,7 @@ async fn test_updates_in_order() { // expects exactly one sample let data = get_data(&session, "operation/test/a").await; assert_eq!(data.len(), 1); - assert_eq!(StringOrBase64::from(data[0].payload()).as_str(), "1"); + assert_eq!(data[0].payload().deserialize::>().unwrap(), "1"); put_data( &session, @@ -112,7 +112,7 @@ async fn test_updates_in_order() { // expects exactly one sample let data = get_data(&session, "operation/test/b").await; assert_eq!(data.len(), 1); - assert_eq!(StringOrBase64::from(data[0].payload()).as_str(), "2"); + assert_eq!(data[0].payload().deserialize::>().unwrap(), "2"); delete_data( &session, @@ -131,7 +131,7 @@ async fn test_updates_in_order() { // expects exactly one sample let data = get_data(&session, "operation/test/b").await; assert_eq!(data.len(), 1); - assert_eq!(StringOrBase64::from(data[0].payload()).as_str(), "2"); + assert_eq!(data[0].payload().deserialize::>().unwrap(), "2"); assert_eq!(data[0].key_expr().as_str(), "operation/test/b"); drop(storage); diff --git a/plugins/zenoh-plugin-storage-manager/tests/wildcard.rs b/plugins/zenoh-plugin-storage-manager/tests/wildcard.rs index 9b29dba77c..04e4549508 100644 --- a/plugins/zenoh-plugin-storage-manager/tests/wildcard.rs +++ b/plugins/zenoh-plugin-storage-manager/tests/wildcard.rs @@ -16,13 +16,13 @@ // 1. normal case, just some wild card puts and deletes on existing keys and ensure it works // 2. check for dealing with out of order updates -use std::{str::FromStr, thread::sleep}; +use std::{borrow::Cow, str::FromStr, thread::sleep}; // use std::collections::HashMap; use async_std::task; use zenoh::{ - bytes::StringOrBase64, internal::zasync_executor_init, prelude::*, query::Reply, - sample::Sample, time::Timestamp, Config, Session, + internal::zasync_executor_init, prelude::*, query::Reply, sample::Sample, time::Timestamp, + Config, Session, }; use zenoh_plugin_trait::Plugin; @@ -113,7 +113,7 @@ async fn test_wild_card_in_order() { let data = get_data(&session, "wild/test/*").await; assert_eq!(data.len(), 1); assert_eq!(data[0].key_expr().as_str(), "wild/test/a"); - assert_eq!(StringOrBase64::from(data[0].payload()).as_str(), "2"); + assert_eq!(data[0].payload().deserialize::>().unwrap(), "2"); put_data( &session, @@ -131,8 +131,20 @@ async fn test_wild_card_in_order() { assert_eq!(data.len(), 2); assert!(["wild/test/a", "wild/test/b"].contains(&data[0].key_expr().as_str())); assert!(["wild/test/a", "wild/test/b"].contains(&data[1].key_expr().as_str())); - assert!(["2", "3"].contains(&StringOrBase64::from(data[0].payload()).as_str())); - assert!(["2", "3"].contains(&StringOrBase64::from(data[1].payload()).as_str())); + assert!(["2", "3"].contains( + &data[0] + .payload() + .deserialize::>() + .unwrap() + .as_ref() + )); + assert!(["2", "3"].contains( + &data[1] + .payload() + .deserialize::>() + .unwrap() + .as_ref() + )); put_data( &session, @@ -150,8 +162,8 @@ async fn test_wild_card_in_order() { assert_eq!(data.len(), 2); assert!(["wild/test/a", "wild/test/b"].contains(&data[0].key_expr().as_str())); assert!(["wild/test/a", "wild/test/b"].contains(&data[1].key_expr().as_str())); - assert_eq!(StringOrBase64::from(data[0].payload()).as_str(), "4"); - assert_eq!(StringOrBase64::from(data[1].payload()).as_str(), "4"); + assert_eq!(data[0].payload().deserialize::>().unwrap(), "4"); + assert_eq!(data[1].payload().deserialize::>().unwrap(), "4"); delete_data( &session, diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index 9ebf25cba6..279bcb072a 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -314,6 +314,8 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { priority: self.priority, is_express: self.is_express, destination: self.destination, + matching_listeners: Default::default(), + undeclare_on_drop: true, }) } } @@ -365,6 +367,8 @@ impl<'a, 'b> Wait for PublisherBuilder<'a, 'b> { priority: self.priority, is_express: self.is_express, destination: self.destination, + matching_listeners: Default::default(), + undeclare_on_drop: true, }) } } diff --git a/zenoh/src/api/bytes.rs b/zenoh/src/api/bytes.rs index 920be0bbaa..f1168a826b 100644 --- a/zenoh/src/api/bytes.rs +++ b/zenoh/src/api/bytes.rs @@ -14,7 +14,7 @@ //! ZBytes primitives. use std::{ - borrow::Cow, convert::Infallible, fmt::Debug, marker::PhantomData, ops::Deref, str::Utf8Error, + borrow::Cow, convert::Infallible, fmt::Debug, marker::PhantomData, str::Utf8Error, string::FromUtf8Error, sync::Arc, }; @@ -34,7 +34,7 @@ use zenoh_shm::{ zshm::{zshm, ZShm}, zshmmut::{zshmmut, ZShmMut}, }, - SharedMemoryBuf, + ShmBufInner, }; /// Trait to encode a type `T` into a [`Value`]. @@ -1613,7 +1613,7 @@ impl<'a> Deserialize<'a, &'a zshm> for ZSerde { // A ZShm is expected to have only one slice let mut zslices = v.0.zslices(); if let Some(zs) = zslices.next() { - if let Some(shmb) = zs.downcast_ref::() { + if let Some(shmb) = zs.downcast_ref::() { return Ok(shmb.into()); } } @@ -1648,7 +1648,7 @@ impl<'a> Deserialize<'a, &'a mut zshm> for ZSerde { // A ZSliceShmBorrowMut is expected to have only one slice let mut zslices = v.0.zslices_mut(); if let Some(zs) = zslices.next() { - if let Some(shmb) = zs.downcast_mut::() { + if let Some(shmb) = zs.downcast_mut::() { return Ok(shmb.into()); } } @@ -1665,7 +1665,7 @@ impl<'a> Deserialize<'a, &'a mut zshmmut> for ZSerde { // A ZSliceShmBorrowMut is expected to have only one slice let mut zslices = v.0.zslices_mut(); if let Some(zs) = zslices.next() { - if let Some(shmb) = zs.downcast_mut::() { + if let Some(shmb) = zs.downcast_mut::() { return shmb.try_into().map_err(|_| ZDeserializeError); } } @@ -1806,53 +1806,6 @@ where } } -// For convenience to always convert a Value in the examples -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum StringOrBase64 { - String(String), - Base64(String), -} - -impl StringOrBase64 { - pub fn into_string(self) -> String { - match self { - StringOrBase64::String(s) | StringOrBase64::Base64(s) => s, - } - } -} - -impl Deref for StringOrBase64 { - type Target = String; - - fn deref(&self) -> &Self::Target { - match self { - Self::String(s) | Self::Base64(s) => s, - } - } -} - -impl std::fmt::Display for StringOrBase64 { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str(self) - } -} - -impl From<&ZBytes> for StringOrBase64 { - fn from(v: &ZBytes) -> Self { - use base64::{engine::general_purpose::STANDARD as b64_std_engine, Engine}; - match v.deserialize::() { - Ok(s) => StringOrBase64::String(s), - Err(_) => StringOrBase64::Base64(b64_std_engine.encode(v.into::>())), - } - } -} - -impl From<&mut ZBytes> for StringOrBase64 { - fn from(v: &mut ZBytes) -> Self { - StringOrBase64::from(&*v) - } -} - // Protocol attachment extension impl From for AttachmentType { fn from(this: ZBytes) -> Self { @@ -1882,10 +1835,9 @@ mod tests { use zenoh_shm::api::{ buffer::zshm::{zshm, ZShm}, protocol_implementations::posix::{ - posix_shared_memory_provider_backend::PosixSharedMemoryProviderBackend, - protocol_id::POSIX_PROTOCOL_ID, + posix_shm_provider_backend::PosixShmProviderBackend, protocol_id::POSIX_PROTOCOL_ID, }, - provider::shared_memory_provider::SharedMemoryProviderBuilder, + provider::shm_provider::ShmProviderBuilder, }; use super::ZBytes; @@ -1995,13 +1947,13 @@ mod tests { #[cfg(feature = "shared-memory")] { // create an SHM backend... - let backend = PosixSharedMemoryProviderBackend::builder() + let backend = PosixShmProviderBackend::builder() .with_size(4096) .unwrap() .res() .unwrap(); // ...and an SHM provider - let provider = SharedMemoryProviderBuilder::builder() + let provider = ShmProviderBuilder::builder() .protocol_id::() .backend(backend) .res(); diff --git a/zenoh/src/api/liveliness.rs b/zenoh/src/api/liveliness.rs index aa558237fe..1737540a42 100644 --- a/zenoh/src/api/liveliness.rs +++ b/zenoh/src/api/liveliness.rs @@ -238,7 +238,7 @@ impl Wait for LivelinessTokenBuilder<'_, '_> { .map(|tok_state| LivelinessToken { session, state: tok_state, - alive: true, + undeclare_on_drop: true, }) } } @@ -294,7 +294,7 @@ pub(crate) struct LivelinessTokenState { pub struct LivelinessToken<'a> { pub(crate) session: SessionRef<'a>, pub(crate) state: Arc, - pub(crate) alive: bool, + undeclare_on_drop: bool, } /// A [`Resolvable`] returned when undeclaring a [`LivelinessToken`](LivelinessToken). @@ -329,7 +329,8 @@ impl Resolvable for LivelinessTokenUndeclaration<'_> { #[zenoh_macros::unstable] impl Wait for LivelinessTokenUndeclaration<'_> { fn wait(mut self) -> ::To { - self.token.alive = false; + // set the flag first to avoid double panic if this function panic + self.token.undeclare_on_drop = false; self.token.session.undeclare_liveliness(self.token.state.id) } } @@ -372,6 +373,16 @@ impl<'a> LivelinessToken<'a> { pub fn undeclare(self) -> impl Resolve> + 'a { Undeclarable::undeclare_inner(self, ()) } + + /// Keep this liveliness token in background, until the session is closed. + #[inline] + #[zenoh_macros::unstable] + pub fn background(mut self) { + // It's not necessary to undeclare this resource when session close, as other sessions + // will clean all resources related to the closed one. + // So we can just never undeclare it. + self.undeclare_on_drop = false; + } } #[zenoh_macros::unstable] @@ -384,7 +395,7 @@ impl<'a> Undeclarable<(), LivelinessTokenUndeclaration<'a>> for LivelinessToken< #[zenoh_macros::unstable] impl Drop for LivelinessToken<'_> { fn drop(&mut self) { - if self.alive { + if self.undeclare_on_drop { let _ = self.session.undeclare_liveliness(self.state.id); } } @@ -556,7 +567,7 @@ where subscriber: SubscriberInner { session, state: sub_state, - alive: true, + undeclare_on_drop: true, }, handler, }) diff --git a/zenoh/src/api/publisher.rs b/zenoh/src/api/publisher.rs index c4cff83848..ec8a8aff35 100644 --- a/zenoh/src/api/publisher.rs +++ b/zenoh/src/api/publisher.rs @@ -13,16 +13,17 @@ // use std::{ + collections::HashSet, convert::TryFrom, fmt, future::{IntoFuture, Ready}, pin::Pin, + sync::{Arc, Mutex}, task::{Context, Poll}, }; use futures::Sink; use zenoh_core::{zread, Resolvable, Resolve, Wait}; -use zenoh_keyexpr::keyexpr; use zenoh_protocol::{ core::CongestionControl, network::{push::ext, Push}, @@ -134,6 +135,8 @@ pub struct Publisher<'a> { pub(crate) priority: Priority, pub(crate) is_express: bool, pub(crate) destination: Locality, + pub(crate) matching_listeners: Arc>>, + pub(crate) undeclare_on_drop: bool, } impl<'a> Publisher<'a> { @@ -160,28 +163,33 @@ impl<'a> Publisher<'a> { } } + #[inline] pub fn key_expr(&self) -> &KeyExpr<'a> { &self.key_expr } + #[inline] + /// Get the `congestion_control` applied when routing the data. + pub fn congestion_control(&self) -> CongestionControl { + self.congestion_control + } + /// Change the `congestion_control` to apply when routing the data. #[inline] pub fn set_congestion_control(&mut self, congestion_control: CongestionControl) { self.congestion_control = congestion_control; } - /// Change the priority of the written data. + /// Get the priority of the written data. #[inline] - pub fn set_priority(&mut self, priority: Priority) { - self.priority = priority; + pub fn priority(&self) -> Priority { + self.priority } - /// Restrict the matching subscribers that will receive the published data - /// to the ones that have the given [`Locality`](crate::prelude::Locality). - #[zenoh_macros::unstable] + /// Change the priority of the written data. #[inline] - pub fn set_allowed_destination(&mut self, destination: Locality) { - self.destination = destination; + pub fn set_priority(&mut self, priority: Priority) { + self.priority = priority; } /// Consumes the given `Publisher`, returning a thread-safe reference-counting @@ -350,6 +358,14 @@ impl<'a> Publisher<'a> { pub fn undeclare(self) -> impl Resolve> + 'a { Undeclarable::undeclare_inner(self, ()) } + + fn undeclare_matching_listeners(&self) -> ZResult<()> { + let ids: Vec = zlock!(self.matching_listeners).drain().collect(); + for id in ids { + self.session.undeclare_matches_listener_inner(id)? + } + Ok(()) + } } /// Functions to create zenoh entities with `'static` lifetime. @@ -470,12 +486,12 @@ impl Resolvable for PublisherUndeclaration<'_> { impl Wait for PublisherUndeclaration<'_> { fn wait(mut self) -> ::To { - let Publisher { - session, id: eid, .. - } = &self.publisher; - session.undeclare_publisher_inner(*eid)?; - self.publisher.key_expr = unsafe { keyexpr::from_str_unchecked("") }.into(); - Ok(()) + // set the flag first to avoid double panic if this function panic + self.publisher.undeclare_on_drop = false; + self.publisher.undeclare_matching_listeners()?; + self.publisher + .session + .undeclare_publisher_inner(self.publisher.id) } } @@ -490,7 +506,8 @@ impl IntoFuture for PublisherUndeclaration<'_> { impl Drop for Publisher<'_> { fn drop(&mut self) { - if !self.key_expr.is_empty() { + if self.undeclare_on_drop { + let _ = self.undeclare_matching_listeners(); let _ = self.session.undeclare_publisher_inner(self.id); } } @@ -887,17 +904,19 @@ where #[zenoh_macros::unstable] fn wait(self) -> ::To { let (callback, receiver) = self.handler.into_handler(); - self.publisher + let state = self + .publisher .session - .declare_matches_listener_inner(&self.publisher, callback) - .map(|listener_state| MatchingListener { - listener: MatchingListenerInner { - publisher: self.publisher, - state: listener_state, - alive: true, - }, - receiver, - }) + .declare_matches_listener_inner(&self.publisher, callback)?; + zlock!(self.publisher.matching_listeners).insert(state.id); + Ok(MatchingListener { + listener: MatchingListenerInner { + publisher: self.publisher, + state, + undeclare_on_drop: true, + }, + receiver, + }) } } @@ -939,7 +958,7 @@ impl std::fmt::Debug for MatchingListenerState { pub(crate) struct MatchingListenerInner<'a> { pub(crate) publisher: PublisherRef<'a>, pub(crate) state: std::sync::Arc, - pub(crate) alive: bool, + undeclare_on_drop: bool, } #[zenoh_macros::unstable] @@ -1007,6 +1026,14 @@ impl<'a, Receiver> MatchingListener<'a, Receiver> { pub fn undeclare(self) -> MatchingListenerUndeclaration<'a> { self.listener.undeclare() } + + /// Make the matching listener run in background, until the publisher is undeclared. + #[inline] + #[zenoh_macros::unstable] + pub fn background(mut self) { + // The matching listener will be undeclared as part of publisher undeclaration. + self.listener.undeclare_on_drop = false; + } } #[zenoh_macros::unstable] @@ -1044,7 +1071,9 @@ impl Resolvable for MatchingListenerUndeclaration<'_> { #[zenoh_macros::unstable] impl Wait for MatchingListenerUndeclaration<'_> { fn wait(mut self) -> ::To { - self.subscriber.alive = false; + // set the flag first to avoid double panic if this function panic + self.subscriber.undeclare_on_drop = false; + zlock!(self.subscriber.publisher.matching_listeners).remove(&self.subscriber.state.id); self.subscriber .publisher .session @@ -1065,7 +1094,8 @@ impl IntoFuture for MatchingListenerUndeclaration<'_> { #[zenoh_macros::unstable] impl Drop for MatchingListenerInner<'_> { fn drop(&mut self) { - if self.alive { + if self.undeclare_on_drop { + zlock!(self.publisher.matching_listeners).remove(&self.state.id); let _ = self .publisher .session diff --git a/zenoh/src/api/queryable.rs b/zenoh/src/api/queryable.rs index f4f16e8ecf..bb4e63d45d 100644 --- a/zenoh/src/api/queryable.rs +++ b/zenoh/src/api/queryable.rs @@ -605,7 +605,7 @@ impl fmt::Debug for QueryableState { pub(crate) struct CallbackQueryable<'a> { pub(crate) session: SessionRef<'a>, pub(crate) state: Arc, - pub(crate) alive: bool, + undeclare_on_drop: bool, } impl<'a> Undeclarable<(), QueryableUndeclaration<'a>> for CallbackQueryable<'a> { @@ -638,7 +638,8 @@ impl Resolvable for QueryableUndeclaration<'_> { impl Wait for QueryableUndeclaration<'_> { fn wait(mut self) -> ::To { - self.queryable.alive = false; + // set the flag first to avoid double panic if this function panic + self.queryable.undeclare_on_drop = false; self.queryable .session .close_queryable(self.queryable.state.id) @@ -656,7 +657,7 @@ impl<'a> IntoFuture for QueryableUndeclaration<'a> { impl Drop for CallbackQueryable<'_> { fn drop(&mut self) { - if self.alive { + if self.undeclare_on_drop { let _ = self.session.close_queryable(self.state.id); } } @@ -889,6 +890,16 @@ impl<'a, Handler> Queryable<'a, Handler> { pub fn undeclare(self) -> impl Resolve> + 'a { Undeclarable::undeclare_inner(self, ()) } + + /// Make the queryable run in background, until the session is closed. + #[inline] + #[zenoh_macros::unstable] + pub fn background(mut self) { + // It's not necessary to undeclare this resource when session close, as other sessions + // will clean all resources related to the closed one. + // So we can just never undeclare it. + self.queryable.undeclare_on_drop = false; + } } impl<'a, T> Undeclarable<(), QueryableUndeclaration<'a>> for Queryable<'a, T> { @@ -938,7 +949,7 @@ where queryable: CallbackQueryable { session, state: qable_state, - alive: true, + undeclare_on_drop: true, }, handler: receiver, }) diff --git a/zenoh/src/api/scouting.rs b/zenoh/src/api/scouting.rs index 566f18f061..e16f31da2e 100644 --- a/zenoh/src/api/scouting.rs +++ b/zenoh/src/api/scouting.rs @@ -21,7 +21,7 @@ use std::{ use tokio::net::UdpSocket; use zenoh_core::{Resolvable, Wait}; -use zenoh_protocol::{core::WhatAmIMatcher, scouting::Hello}; +use zenoh_protocol::core::WhatAmIMatcher; use zenoh_result::ZResult; use zenoh_task::TerminatableTask; @@ -30,6 +30,36 @@ use crate::{ net::runtime::{orchestrator::Loop, Runtime}, }; +/// A zenoh Hello message. +pub struct Hello(zenoh_protocol::scouting::Hello); + +impl Hello { + /// Get the locators of this Hello message. + pub fn locators(&self) -> &[zenoh_protocol::core::Locator] { + &self.0.locators + } + + /// Get the zenoh id of this Hello message. + pub fn zid(&self) -> zenoh_protocol::core::ZenohId { + self.0.zid + } + + /// Get the whatami of this Hello message. + pub fn whatami(&self) -> zenoh_protocol::core::WhatAmI { + self.0.whatami + } +} + +impl fmt::Display for Hello { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Hello") + .field("zid", &self.zid()) + .field("whatami", &self.whatami()) + .field("locators", &self.locators()) + .finish() + } +} + /// A builder for initializing a [`Scout`]. /// /// # Examples @@ -324,7 +354,7 @@ fn _scout( let scout = Runtime::scout(&sockets, what, &addr, move |hello| { let callback = callback.clone(); async move { - callback(hello); + callback(Hello(hello)); Loop::Continue } }); diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 667b44f994..21311a572e 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -57,7 +57,7 @@ use zenoh_protocol::{ }; use zenoh_result::ZResult; #[cfg(feature = "shared-memory")] -use zenoh_shm::api::client_storage::SharedMemoryClientStorage; +use zenoh_shm::api::client_storage::ShmClientStorage; use zenoh_task::TaskController; use super::{ @@ -408,7 +408,7 @@ pub struct Session { pub(crate) runtime: Runtime, pub(crate) state: Arc>, pub(crate) id: u16, - pub(crate) alive: bool, + close_on_drop: bool, owns_runtime: bool, task_controller: TaskController, } @@ -430,7 +430,7 @@ impl Session { runtime: runtime.clone(), state: state.clone(), id: SESSION_ID_COUNTER.fetch_add(1, Ordering::SeqCst), - alive: true, + close_on_drop: true, owns_runtime: false, task_controller: TaskController::default(), }; @@ -537,6 +537,8 @@ impl Session { pub fn close(mut self) -> impl Resolve> { ResolveFuture::new(async move { trace!("close()"); + // set the flag first to avoid double panic if this function panic + self.close_on_drop = false; self.task_controller.terminate_all(Duration::from_secs(10)); if self.owns_runtime { self.runtime.close().await?; @@ -547,7 +549,6 @@ impl Session { state.queryables.clear(); drop(state); primitives.as_ref().unwrap().send_close(); - self.alive = false; Ok(()) }) } @@ -830,11 +831,11 @@ impl Session { impl Session { pub(crate) fn clone(&self) -> Self { - Session { + Self { runtime: self.runtime.clone(), state: self.state.clone(), id: self.id, - alive: false, + close_on_drop: false, owns_runtime: self.owns_runtime, task_controller: self.task_controller.clone(), } @@ -843,7 +844,7 @@ impl Session { #[allow(clippy::new_ret_no_self)] pub(super) fn new( config: Config, - #[cfg(feature = "shared-memory")] shm_clients: Option>, + #[cfg(feature = "shared-memory")] shm_clients: Option>, ) -> impl Resolve> { ResolveFuture::new(async move { tracing::debug!("Config: {:?}", &config); @@ -2437,7 +2438,7 @@ impl Primitives for Session { impl Drop for Session { fn drop(&mut self) { - if self.alive { + if self.close_on_drop { let _ = self.clone().close().wait(); } } @@ -2698,7 +2699,7 @@ where { config: TryIntoConfig, #[cfg(feature = "shared-memory")] - shm_clients: Option>, + shm_clients: Option>, } #[cfg(feature = "shared-memory")] @@ -2707,7 +2708,7 @@ where TryIntoConfig: std::convert::TryInto + Send + 'static, >::Error: std::fmt::Debug, { - pub fn with_shm_clients(mut self, shm_clients: Arc) -> Self { + pub fn with_shm_clients(mut self, shm_clients: Arc) -> Self { self.shm_clients = Some(shm_clients); self } diff --git a/zenoh/src/api/subscriber.rs b/zenoh/src/api/subscriber.rs index e298e3c9c9..4628f9e95d 100644 --- a/zenoh/src/api/subscriber.rs +++ b/zenoh/src/api/subscriber.rs @@ -78,7 +78,7 @@ impl fmt::Debug for SubscriberState { pub(crate) struct SubscriberInner<'a> { pub(crate) session: SessionRef<'a>, pub(crate) state: Arc, - pub(crate) alive: bool, + pub(crate) undeclare_on_drop: bool, } impl<'a> SubscriberInner<'a> { @@ -142,7 +142,8 @@ impl Resolvable for SubscriberUndeclaration<'_> { impl Wait for SubscriberUndeclaration<'_> { fn wait(mut self) -> ::To { - self.subscriber.alive = false; + // set the flag first to avoid double panic if this function panic + self.subscriber.undeclare_on_drop = false; self.subscriber .session .undeclare_subscriber_inner(self.subscriber.state.id) @@ -160,7 +161,7 @@ impl IntoFuture for SubscriberUndeclaration<'_> { impl Drop for SubscriberInner<'_> { fn drop(&mut self) { - if self.alive { + if self.undeclare_on_drop { let _ = self.session.undeclare_subscriber_inner(self.state.id); } } @@ -387,7 +388,7 @@ where subscriber: SubscriberInner { session, state: sub_state, - alive: true, + undeclare_on_drop: true, }, handler: receiver, }) @@ -505,6 +506,16 @@ impl<'a, Handler> Subscriber<'a, Handler> { pub fn undeclare(self) -> SubscriberUndeclaration<'a> { self.subscriber.undeclare() } + + /// Make the subscriber run in background, until the session is closed. + #[inline] + #[zenoh_macros::unstable] + pub fn background(mut self) { + // It's not necessary to undeclare this resource when session close, as other sessions + // will clean all resources related to the closed one. + // So we can just never undeclare it. + self.subscriber.undeclare_on_drop = false; + } } impl<'a, T> Undeclarable<(), SubscriberUndeclaration<'a>> for Subscriber<'a, T> { diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index 6b1c300f72..8490679827 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -240,8 +240,8 @@ pub mod encoding { /// Payload primitives pub mod bytes { pub use crate::api::bytes::{ - Deserialize, OptionZBytes, Serialize, StringOrBase64, ZBytes, ZBytesIterator, ZBytesReader, - ZBytesWriter, ZDeserializeError, ZSerde, + Deserialize, OptionZBytes, Serialize, ZBytes, ZBytesIterator, ZBytesReader, ZBytesWriter, + ZDeserializeError, ZSerde, }; } @@ -322,10 +322,7 @@ pub mod handlers { /// Scouting primitives pub mod scouting { - /// A zenoh Hello message. - pub use zenoh_protocol::scouting::Hello; - - pub use crate::api::scouting::{scout, Scout, ScoutBuilder}; + pub use crate::api::scouting::{scout, Hello, Scout, ScoutBuilder}; } /// Liveliness primitives @@ -412,30 +409,27 @@ pub mod shm { zshm::{zshm, ZShm}, zshmmut::{zshmmut, ZShmMut}, }, - client::{ - shared_memory_client::SharedMemoryClient, shared_memory_segment::SharedMemorySegment, - }, - client_storage::{SharedMemoryClientStorage, GLOBAL_CLIENT_STORAGE}, + client::{shm_client::ShmClient, shm_segment::ShmSegment}, + client_storage::{ShmClientStorage, GLOBAL_CLIENT_STORAGE}, common::types::{ChunkID, ProtocolID, SegmentID}, protocol_implementations::posix::{ - posix_shared_memory_client::PosixSharedMemoryClient, - posix_shared_memory_provider_backend::{ - LayoutedPosixSharedMemoryProviderBackendBuilder, PosixSharedMemoryProviderBackend, - PosixSharedMemoryProviderBackendBuilder, + posix_shm_client::PosixShmClient, + posix_shm_provider_backend::{ + LayoutedPosixShmProviderBackendBuilder, PosixShmProviderBackend, + PosixShmProviderBackendBuilder, }, protocol_id::POSIX_PROTOCOL_ID, }, provider::{ chunk::{AllocatedChunk, ChunkDescriptor}, - shared_memory_provider::{ + shm_provider::{ AllocBuilder, AllocBuilder2, AllocLayout, AllocLayoutSizedBuilder, AllocPolicy, AsyncAllocPolicy, BlockOn, DeallocEldest, DeallocOptimal, DeallocYoungest, Deallocate, Defragment, DynamicProtocolID, ForceDeallocPolicy, GarbageCollect, - JustAlloc, ProtocolIDSource, SharedMemoryProvider, SharedMemoryProviderBuilder, - SharedMemoryProviderBuilderBackendID, SharedMemoryProviderBuilderID, - StaticProtocolID, + JustAlloc, ProtocolIDSource, ShmProvider, ShmProviderBuilder, + ShmProviderBuilderBackendID, ShmProviderBuilderID, StaticProtocolID, }, - shared_memory_provider_backend::SharedMemoryProviderBackend, + shm_provider_backend::ShmProviderBackend, types::{ AllocAlignment, BufAllocResult, BufLayoutAllocResult, ChunkAllocResult, MemoryLayout, ZAllocError, ZLayoutAllocError, ZLayoutError, diff --git a/zenoh/src/net/routing/dispatcher/face.rs b/zenoh/src/net/routing/dispatcher/face.rs index 6eb9ee5b90..b21253d55f 100644 --- a/zenoh/src/net/routing/dispatcher/face.rs +++ b/zenoh/src/net/routing/dispatcher/face.rs @@ -370,6 +370,8 @@ impl Primitives for Face { &self.state, &msg.wire_expr, msg.id, + msg.ext_qos, + msg.ext_tstamp, msg.ext_target, msg.ext_budget, msg.ext_timeout, @@ -385,6 +387,8 @@ impl Primitives for Face { &self.tables, &mut self.state.clone(), msg.rid, + msg.ext_qos, + msg.ext_tstamp, msg.ext_respid, msg.wire_expr, msg.payload, diff --git a/zenoh/src/net/routing/dispatcher/queries.rs b/zenoh/src/net/routing/dispatcher/queries.rs index c557e3da50..240ddb3a7d 100644 --- a/zenoh/src/net/routing/dispatcher/queries.rs +++ b/zenoh/src/net/routing/dispatcher/queries.rs @@ -473,6 +473,8 @@ impl Timed for QueryCleanup { &self.tables, &mut face, self.qid, + response::ext::QoSType::RESPONSE, + None, ext_respid, WireExpr::empty(), ResponseBody::Err(zenoh::Err { @@ -609,6 +611,8 @@ pub fn route_query( face: &Arc, expr: &WireExpr, qid: RequestId, + ext_qos: ext::QoSType, + ext_tstamp: Option, ext_target: TargetType, ext_budget: Option, ext_timeout: Option, @@ -733,8 +737,8 @@ pub fn route_query( Request { id: *qid, wire_expr: key_expr.into(), - ext_qos: ext::QoSType::REQUEST, - ext_tstamp: None, + ext_qos, + ext_tstamp, ext_nodeid: ext::NodeIdType { node_id: *context }, ext_target, ext_budget, @@ -781,10 +785,13 @@ pub fn route_query( } } +#[allow(clippy::too_many_arguments)] pub(crate) fn route_send_response( tables_ref: &Arc, face: &mut Arc, qid: RequestId, + ext_qos: ext::QoSType, + ext_tstamp: Option, ext_respid: Option, key_expr: WireExpr, body: ResponseBody, @@ -819,8 +826,8 @@ pub(crate) fn route_send_response( rid: query.src_qid, wire_expr: key_expr.to_owned(), payload: body, - ext_qos: response::ext::QoSType::RESPONSE, - ext_tstamp: None, + ext_qos, + ext_tstamp, ext_respid, }, "".to_string(), // @TODO provide the proper key expression of the response for interceptors diff --git a/zenoh/src/net/runtime/mod.rs b/zenoh/src/net/runtime/mod.rs index d5b42ecdd2..81a904a3da 100644 --- a/zenoh/src/net/runtime/mod.rs +++ b/zenoh/src/net/runtime/mod.rs @@ -45,9 +45,9 @@ use zenoh_protocol::{ }; use zenoh_result::{bail, ZResult}; #[cfg(feature = "shared-memory")] -use zenoh_shm::api::client_storage::SharedMemoryClientStorage; +use zenoh_shm::api::client_storage::ShmClientStorage; #[cfg(feature = "shared-memory")] -use zenoh_shm::reader::SharedMemoryReader; +use zenoh_shm::reader::ShmReader; use zenoh_sync::get_mut_unchecked; use zenoh_task::TaskController; use zenoh_transport::{ @@ -98,7 +98,7 @@ pub struct RuntimeBuilder { #[cfg(feature = "plugins")] plugins_manager: Option, #[cfg(feature = "shared-memory")] - shm_clients: Option>, + shm_clients: Option>, } impl RuntimeBuilder { @@ -119,7 +119,7 @@ impl RuntimeBuilder { } #[cfg(feature = "shared-memory")] - pub fn shm_clients(mut self, shm_clients: Option>) -> Self { + pub fn shm_clients(mut self, shm_clients: Option>) -> Self { self.shm_clients = shm_clients; self } @@ -157,7 +157,7 @@ impl RuntimeBuilder { #[cfg(feature = "unstable")] let transport_manager = zcondfeat!( "shared-memory", - transport_manager.shm_reader(shm_clients.map(SharedMemoryReader::new)), + transport_manager.shm_reader(shm_clients.map(ShmReader::new)), transport_manager ) .build(handler.clone())?; diff --git a/zenoh/tests/bytes.rs b/zenoh/tests/bytes.rs index e97475c237..34c9837d04 100644 --- a/zenoh/tests/bytes.rs +++ b/zenoh/tests/bytes.rs @@ -16,21 +16,21 @@ use zenoh::{ bytes::ZBytes, prelude::*, shm::{ - zshm, zshmmut, PosixSharedMemoryProviderBackend, SharedMemoryProviderBuilder, ZShm, - ZShmMut, POSIX_PROTOCOL_ID, + zshm, zshmmut, PosixShmProviderBackend, ShmProviderBuilder, ZShm, ZShmMut, + POSIX_PROTOCOL_ID, }, }; #[test] fn shm_bytes_single_buf() { // create an SHM backend... - let backend = PosixSharedMemoryProviderBackend::builder() + let backend = PosixShmProviderBackend::builder() .with_size(4096) .unwrap() .res() .unwrap(); // ...and an SHM provider - let provider = SharedMemoryProviderBuilder::builder() + let provider = ShmProviderBuilder::builder() .protocol_id::() .backend(backend) .res(); diff --git a/zenoh/tests/shm.rs b/zenoh/tests/shm.rs index 81e5fdece1..c2cbc4e89a 100644 --- a/zenoh/tests/shm.rs +++ b/zenoh/tests/shm.rs @@ -26,8 +26,7 @@ use zenoh::{ prelude::*, publisher::CongestionControl, shm::{ - BlockOn, GarbageCollect, PosixSharedMemoryProviderBackend, SharedMemoryProviderBuilder, - POSIX_PROTOCOL_ID, + BlockOn, GarbageCollect, PosixShmProviderBackend, ShmProviderBuilder, POSIX_PROTOCOL_ID, }, subscriber::Reliability, Session, @@ -117,13 +116,13 @@ async fn test_session_pubsub(peer01: &Session, peer02: &Session, reliability: Re tokio::time::sleep(SLEEP).await; // create SHM backend... - let backend = PosixSharedMemoryProviderBackend::builder() + let backend = PosixShmProviderBackend::builder() .with_size(size * MSG_COUNT / 10) .unwrap() .res() .unwrap(); // ...and SHM provider - let shm01 = SharedMemoryProviderBuilder::builder() + let shm01 = ShmProviderBuilder::builder() .protocol_id::() .backend(backend) .res();