Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add start/stop extension markers for fragment chain #1597

Merged
merged 11 commits into from
Dec 5, 2024
38 changes: 36 additions & 2 deletions commons/zenoh-codec/src/transport/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ where
more,
sn,
ext_qos,
ext_start,
ext_stop,
} = x;

// Header
Expand All @@ -49,7 +51,10 @@ where
if *more {
header |= flag::M;
}
if ext_qos != &ext::QoSType::DEFAULT {
let mut n_exts = (ext_qos != &ext::QoSType::DEFAULT) as u8
+ ext_start.is_some() as u8
+ ext_stop.is_some() as u8;
if n_exts != 0 {
header |= flag::Z;
}
self.write(&mut *writer, header)?;
Expand All @@ -59,7 +64,16 @@ where

// Extensions
if ext_qos != &ext::QoSType::DEFAULT {
self.write(&mut *writer, (*ext_qos, false))?;
n_exts -= 1;
self.write(&mut *writer, (*ext_qos, n_exts != 0))?;
}
if let Some(start) = ext_start {
n_exts -= 1;
self.write(&mut *writer, (start, n_exts != 0))?
}
if let Some(stop) = ext_stop {
n_exts -= 1;
self.write(&mut *writer, (stop, n_exts != 0))?
}

Ok(())
Expand Down Expand Up @@ -99,6 +113,8 @@ where

// Extensions
let mut ext_qos = ext::QoSType::DEFAULT;
let mut ext_start = None;
let mut ext_stop = None;

let mut has_ext = imsg::has_flag(self.header, flag::Z);
while has_ext {
Expand All @@ -110,6 +126,16 @@ where
ext_qos = q;
has_ext = ext;
}
ext::Start::ID => {
let (start, ext): (ext::Start, bool) = eodec.read(&mut *reader)?;
ext_start = Some(start);
has_ext = ext;
}
ext::Stop::ID => {
let (stop, ext): (ext::Stop, bool) = eodec.read(&mut *reader)?;
ext_stop = Some(stop);
has_ext = ext;
}
_ => {
has_ext = extension::skip(reader, "Fragment", ext)?;
}
Expand All @@ -121,6 +147,8 @@ where
more,
sn,
ext_qos,
ext_start,
ext_stop,
})
}
}
Expand All @@ -139,6 +167,8 @@ where
sn,
payload,
ext_qos,
ext_start,
ext_stop,
} = x;

// Header
Expand All @@ -147,6 +177,8 @@ where
more: *more,
sn: *sn,
ext_qos: *ext_qos,
ext_start: *ext_start,
ext_stop: *ext_stop,
};
self.write(&mut *writer, &header)?;

Expand Down Expand Up @@ -185,6 +217,8 @@ where
more: header.more,
sn: header.sn,
ext_qos: header.ext_qos,
ext_start: header.ext_start,
ext_stop: header.ext_stop,
payload,
})
}
Expand Down
30 changes: 28 additions & 2 deletions commons/zenoh-codec/src/transport/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ where
ext_mlink,
ext_lowlatency,
ext_compression,
ext_patch,
} = x;

// Header
Expand All @@ -64,7 +65,8 @@ where
+ (ext_auth.is_some() as u8)
+ (ext_mlink.is_some() as u8)
+ (ext_lowlatency.is_some() as u8)
+ (ext_compression.is_some() as u8);
+ (ext_compression.is_some() as u8)
+ (*ext_patch != ext::PatchType::DEFAULT) as u8;

#[cfg(feature = "shared-memory")]
{
Expand Down Expand Up @@ -125,6 +127,10 @@ where
n_exts -= 1;
self.write(&mut *writer, (compression, n_exts != 0))?;
}
if *ext_patch != ext::PatchType::DEFAULT {
n_exts -= 1;
self.write(&mut *writer, (*ext_patch, n_exts != 0))?;
}

Ok(())
}
Expand Down Expand Up @@ -186,6 +192,7 @@ where
let mut ext_mlink = None;
let mut ext_lowlatency = None;
let mut ext_compression = None;
let mut ext_patch = ext::PatchType::DEFAULT;

let mut has_ext = imsg::has_flag(self.header, flag::Z);
while has_ext {
Expand Down Expand Up @@ -228,6 +235,11 @@ where
ext_compression = Some(q);
has_ext = ext;
}
ext::Patch::ID => {
let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?;
ext_patch = p;
has_ext = ext;
}
_ => {
has_ext = extension::skip(reader, "InitSyn", ext)?;
}
Expand All @@ -248,6 +260,7 @@ where
ext_mlink,
ext_lowlatency,
ext_compression,
ext_patch,
})
}
}
Expand Down Expand Up @@ -275,6 +288,7 @@ where
ext_mlink,
ext_lowlatency,
ext_compression,
ext_patch,
} = x;

// Header
Expand All @@ -287,7 +301,8 @@ where
+ (ext_auth.is_some() as u8)
+ (ext_mlink.is_some() as u8)
+ (ext_lowlatency.is_some() as u8)
+ (ext_compression.is_some() as u8);
+ (ext_compression.is_some() as u8)
+ (*ext_patch != ext::PatchType::DEFAULT) as u8;

#[cfg(feature = "shared-memory")]
{
Expand Down Expand Up @@ -351,6 +366,10 @@ where
n_exts -= 1;
self.write(&mut *writer, (compression, n_exts != 0))?;
}
if *ext_patch != ext::PatchType::DEFAULT {
n_exts -= 1;
self.write(&mut *writer, (*ext_patch, n_exts != 0))?;
}

Ok(())
}
Expand Down Expand Up @@ -415,6 +434,7 @@ where
let mut ext_mlink = None;
let mut ext_lowlatency = None;
let mut ext_compression = None;
let mut ext_patch = ext::PatchType::DEFAULT;

let mut has_ext = imsg::has_flag(self.header, flag::Z);
while has_ext {
Expand Down Expand Up @@ -457,6 +477,11 @@ where
ext_compression = Some(q);
has_ext = ext;
}
ext::Patch::ID => {
let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?;
ext_patch = p;
has_ext = ext;
}
_ => {
has_ext = extension::skip(reader, "InitAck", ext)?;
}
Expand All @@ -478,6 +503,7 @@ where
ext_mlink,
ext_lowlatency,
ext_compression,
ext_patch,
})
}
}
16 changes: 15 additions & 1 deletion commons/zenoh-codec/src/transport/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ where
next_sn,
ext_qos,
ext_shm,
ext_patch,
} = x;

// Header
Expand All @@ -160,7 +161,9 @@ where
if resolution != &Resolution::default() || batch_size != &batch_size::MULTICAST {
header |= flag::S;
}
let mut n_exts = (ext_qos.is_some() as u8) + (ext_shm.is_some() as u8);
let mut n_exts = (ext_qos.is_some() as u8)
+ (ext_shm.is_some() as u8)
+ (*ext_patch != ext::PatchType::DEFAULT) as u8;
if n_exts != 0 {
header |= flag::Z;
}
Expand Down Expand Up @@ -201,6 +204,10 @@ where
n_exts -= 1;
self.write(&mut *writer, (shm, n_exts != 0))?;
}
if *ext_patch != ext::PatchType::DEFAULT {
n_exts -= 1;
self.write(&mut *writer, (*ext_patch, n_exts != 0))?;
}

Ok(())
}
Expand Down Expand Up @@ -264,6 +271,7 @@ where
// Extensions
let mut ext_qos = None;
let mut ext_shm = None;
let mut ext_patch = ext::PatchType::DEFAULT;

let mut has_ext = imsg::has_flag(self.header, flag::Z);
while has_ext {
Expand All @@ -280,6 +288,11 @@ where
ext_shm = Some(s);
has_ext = ext;
}
ext::Patch::ID => {
let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?;
ext_patch = p;
has_ext = ext;
}
_ => {
has_ext = extension::skip(reader, "Join", ext)?;
}
Expand All @@ -296,6 +309,7 @@ where
next_sn,
ext_qos,
ext_shm,
ext_patch,
})
}
}
40 changes: 40 additions & 0 deletions commons/zenoh-codec/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,43 @@ where
Ok((ext.into(), more))
}
}

// Extensions: Patch
impl<W, const ID: u8> WCodec<(ext::PatchType<{ ID }>, bool), &mut W> for Zenoh080
where
W: Writer,
{
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: (ext::PatchType<{ ID }>, bool)) -> Self::Output {
let (x, more) = x;
let ext: ZExtZ64<{ ID }> = x.into();

self.write(&mut *writer, (&ext, more))
}
}

impl<R, const ID: u8> RCodec<(ext::PatchType<{ ID }>, bool), &mut R> for Zenoh080
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<(ext::PatchType<{ ID }>, bool), Self::Error> {
let header: u8 = self.read(&mut *reader)?;
let codec = Zenoh080Header::new(header);
codec.read(reader)
}
}

impl<R, const ID: u8> RCodec<(ext::PatchType<{ ID }>, bool), &mut R> for Zenoh080Header
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<(ext::PatchType<{ ID }>, bool), Self::Error> {
let (ext, more): (ZExtZ64<{ ID }>, bool) = self.read(&mut *reader)?;
Ok((ext.into(), more))
}
}
Loading