From 7980bfb8f944c3a3e7a7d3ee4aa44abe9fb9231a Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Mon, 18 Mar 2024 11:11:25 +0100 Subject: [PATCH] Remove unsupported Put/Del in Request/Response --- commons/zenoh-codec/src/zenoh/mod.rs | 6 -- commons/zenoh-protocol/src/zenoh/mod.rs | 22 +---- io/zenoh-transport/src/shm.rs | 8 +- zenoh/src/net/routing/dispatcher/face.rs | 3 - zenoh/src/net/routing/dispatcher/queries.rs | 15 ---- zenoh/src/net/runtime/adminspace.rs | 96 +++++++++++---------- zenoh/src/session.rs | 7 -- 7 files changed, 52 insertions(+), 105 deletions(-) diff --git a/commons/zenoh-codec/src/zenoh/mod.rs b/commons/zenoh-codec/src/zenoh/mod.rs index 1bc006e464..dc38e5ee84 100644 --- a/commons/zenoh-codec/src/zenoh/mod.rs +++ b/commons/zenoh-codec/src/zenoh/mod.rs @@ -80,8 +80,6 @@ where fn write(self, writer: &mut W, x: &RequestBody) -> Self::Output { match x { RequestBody::Query(b) => self.write(&mut *writer, b), - RequestBody::Put(b) => self.write(&mut *writer, b), - RequestBody::Del(b) => self.write(&mut *writer, b), } } } @@ -98,8 +96,6 @@ where let codec = Zenoh080Header::new(header); let body = match imsg::mid(codec.header) { id::QUERY => RequestBody::Query(codec.read(&mut *reader)?), - id::PUT => RequestBody::Put(codec.read(&mut *reader)?), - id::DEL => RequestBody::Del(codec.read(&mut *reader)?), _ => return Err(DidntRead), }; @@ -118,7 +114,6 @@ where match x { ResponseBody::Reply(b) => self.write(&mut *writer, b), ResponseBody::Err(b) => self.write(&mut *writer, b), - ResponseBody::Put(b) => self.write(&mut *writer, b), } } } @@ -136,7 +131,6 @@ where let body = match imsg::mid(codec.header) { id::REPLY => ResponseBody::Reply(codec.read(&mut *reader)?), id::ERR => ResponseBody::Err(codec.read(&mut *reader)?), - id::PUT => ResponseBody::Put(codec.read(&mut *reader)?), _ => return Err(DidntRead), }; diff --git a/commons/zenoh-protocol/src/zenoh/mod.rs b/commons/zenoh-protocol/src/zenoh/mod.rs index dde83eb53c..7bca48f3ba 100644 --- a/commons/zenoh-protocol/src/zenoh/mod.rs +++ b/commons/zenoh-protocol/src/zenoh/mod.rs @@ -77,8 +77,6 @@ impl From for PushBody { #[derive(Debug, Clone, PartialEq, Eq)] pub enum RequestBody { Query(Query), - Put(Put), - Del(Del), } impl RequestBody { @@ -88,10 +86,8 @@ impl RequestBody { let mut rng = rand::thread_rng(); - match rng.gen_range(0..3) { + match rng.gen_range(0..1) { 0 => RequestBody::Query(Query::rand()), - 1 => RequestBody::Put(Put::rand()), - 2 => RequestBody::Del(Del::rand()), _ => unreachable!(), } } @@ -103,24 +99,11 @@ impl From for RequestBody { } } -impl From for RequestBody { - fn from(p: Put) -> RequestBody { - RequestBody::Put(p) - } -} - -impl From for RequestBody { - fn from(d: Del) -> RequestBody { - RequestBody::Del(d) - } -} - // Response #[derive(Debug, Clone, PartialEq, Eq)] pub enum ResponseBody { Reply(Reply), Err(Err), - Put(Put), } impl ResponseBody { @@ -129,10 +112,9 @@ impl ResponseBody { use rand::Rng; let mut rng = rand::thread_rng(); - match rng.gen_range(0..3) { + match rng.gen_range(0..2) { 0 => ResponseBody::Reply(Reply::rand()), 1 => ResponseBody::Err(Err::rand()), - 2 => ResponseBody::Put(Put::rand()), _ => unreachable!(), } } diff --git a/io/zenoh-transport/src/shm.rs b/io/zenoh-transport/src/shm.rs index 88b5ec4d3d..0dd6662286 100644 --- a/io/zenoh-transport/src/shm.rs +++ b/io/zenoh-transport/src/shm.rs @@ -140,12 +140,9 @@ pub fn map_zmsg_to_shminfo(msg: &mut NetworkMessage) -> ZResult { }, NetworkBody::Request(Request { payload, .. }) => match payload { RequestBody::Query(b) => b.map_to_shminfo(), - RequestBody::Put(b) => b.map_to_shminfo(), - RequestBody::Del(_) => Ok(false), }, NetworkBody::Response(Response { payload, .. }) => match payload { ResponseBody::Reply(b) => b.map_to_shminfo(), - ResponseBody::Put(b) => b.map_to_shminfo(), ResponseBody::Err(b) => b.map_to_shminfo(), }, NetworkBody::ResponseFinal(_) | NetworkBody::Declare(_) | NetworkBody::OAM(_) => Ok(false), @@ -194,13 +191,10 @@ pub fn map_zmsg_to_shmbuf( }, NetworkBody::Request(Request { payload, .. }) => match payload { RequestBody::Query(b) => b.map_to_shmbuf(shmr), - RequestBody::Put(b) => b.map_to_shmbuf(shmr), - RequestBody::Del(_) => Ok(false), }, NetworkBody::Response(Response { payload, .. }) => match payload { - ResponseBody::Put(b) => b.map_to_shmbuf(shmr), - ResponseBody::Err(b) => b.map_to_shmbuf(shmr), ResponseBody::Reply(b) => b.map_to_shmbuf(shmr), + ResponseBody::Err(b) => b.map_to_shmbuf(shmr), }, NetworkBody::ResponseFinal(_) | NetworkBody::Declare(_) | NetworkBody::OAM(_) => Ok(false), } diff --git a/zenoh/src/net/routing/dispatcher/face.rs b/zenoh/src/net/routing/dispatcher/face.rs index 54e049bf28..cb565053c9 100644 --- a/zenoh/src/net/routing/dispatcher/face.rs +++ b/zenoh/src/net/routing/dispatcher/face.rs @@ -244,9 +244,6 @@ impl Primitives for Face { msg.ext_nodeid.node_id, ); } - _ => { - log::error!("{} Unsupported request!", self); - } } } diff --git a/zenoh/src/net/routing/dispatcher/queries.rs b/zenoh/src/net/routing/dispatcher/queries.rs index ac58ae900d..04262e555d 100644 --- a/zenoh/src/net/routing/dispatcher/queries.rs +++ b/zenoh/src/net/routing/dispatcher/queries.rs @@ -460,13 +460,6 @@ macro_rules! inc_req_stats { if let Some(stats) = $face.stats.as_ref() { use zenoh_buffers::buffer::Buffer; match &$body { - RequestBody::Put(p) => { - stats.[<$txrx _z_put_msgs>].[](1); - stats.[<$txrx _z_put_pl_bytes>].[](p.payload.len()); - } - RequestBody::Del(_) => { - stats.[<$txrx _z_del_msgs>].[](1); - } RequestBody::Query(q) => { stats.[<$txrx _z_query_msgs>].[](1); stats.[<$txrx _z_query_pl_bytes>].[]( @@ -491,14 +484,6 @@ macro_rules! inc_res_stats { if let Some(stats) = $face.stats.as_ref() { use zenoh_buffers::buffer::Buffer; match &$body { - ResponseBody::Put(p) => { - stats.[<$txrx _z_put_msgs>].[](1); - let mut n = p.payload.len(); - if let Some(a) = p.ext_attachment.as_ref() { - n += a.buffer.len(); - } - stats.[<$txrx _z_put_pl_bytes>].[](n); - } ResponseBody::Reply(r) => { stats.[<$txrx _z_reply_msgs>].[](1); let mut n = 0; diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index b67692e704..29106cb89d 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -388,58 +388,60 @@ impl Primitives for AdminSpace { fn send_request(&self, msg: Request) { trace!("recv Request {:?}", msg); - if let RequestBody::Query(query) = msg.payload { - let primitives = zlock!(self.primitives).as_ref().unwrap().clone(); - { - let conf = self.context.runtime.state.config.lock(); - if !conf.adminspace.permissions().read { - log::error!( + match msg.payload { + RequestBody::Query(query) => { + let primitives = zlock!(self.primitives).as_ref().unwrap().clone(); + { + let conf = self.context.runtime.state.config.lock(); + if !conf.adminspace.permissions().read { + log::error!( "Received GET on '{}' but adminspace.permissions.read=false in configuration", msg.wire_expr ); - primitives.send_response_final(ResponseFinal { - rid: msg.id, - ext_qos: ext::QoSType::RESPONSE_FINAL, - ext_tstamp: None, - }); - return; - } - } - - let key_expr = match self.key_expr_to_string(&msg.wire_expr) { - Ok(key_expr) => key_expr.into_owned(), - Err(e) => { - log::error!("Unknown KeyExpr: {}", e); - primitives.send_response_final(ResponseFinal { - rid: msg.id, - ext_qos: ext::QoSType::RESPONSE_FINAL, - ext_tstamp: None, - }); - return; + primitives.send_response_final(ResponseFinal { + rid: msg.id, + ext_qos: ext::QoSType::RESPONSE_FINAL, + ext_tstamp: None, + }); + return; + } } - }; - - let zid = self.zid; - let parameters = query.parameters.to_owned(); - let query = Query { - inner: Arc::new(QueryInner { - key_expr: key_expr.clone(), - parameters, - value: query - .ext_body - .map(|b| Value::from(b.payload).with_encoding(b.encoding)), - qid: msg.id, - zid, - primitives, - #[cfg(feature = "unstable")] - attachment: query.ext_attachment.map(Into::into), - }), - eid: self.queryable_id, - }; - for (key, handler) in &self.handlers { - if key_expr.intersects(key) { - handler(&self.context, query.clone()); + let key_expr = match self.key_expr_to_string(&msg.wire_expr) { + Ok(key_expr) => key_expr.into_owned(), + Err(e) => { + log::error!("Unknown KeyExpr: {}", e); + primitives.send_response_final(ResponseFinal { + rid: msg.id, + ext_qos: ext::QoSType::RESPONSE_FINAL, + ext_tstamp: None, + }); + return; + } + }; + + let zid = self.zid; + let parameters = query.parameters.to_owned(); + let query = Query { + inner: Arc::new(QueryInner { + key_expr: key_expr.clone(), + parameters, + value: query + .ext_body + .map(|b| Value::from(b.payload).with_encoding(b.encoding)), + qid: msg.id, + zid, + primitives, + #[cfg(feature = "unstable")] + attachment: query.ext_attachment.map(Into::into), + }), + eid: self.queryable_id, + }; + + for (key, handler) in &self.handlers { + if key_expr.intersects(key) { + handler(&self.context, query.clone()); + } } } } diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index 9dca26391d..69e5a9cb7d 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -2087,19 +2087,12 @@ impl Primitives for Session { #[cfg(feature = "unstable")] m.ext_attachment.map(Into::into), ), - RequestBody::Put(_) => (), - RequestBody::Del(_) => (), } } fn send_response(&self, msg: Response) { trace!("recv Response {:?}", msg); match msg.payload { - ResponseBody::Put(_) => { - log::warn!( - "Received a ResponseBody::Put, but this isn't supported yet. Dropping message." - ) - } ResponseBody::Err(e) => { let mut state = zwrite!(self.state); match state.queries.get_mut(&msg.rid) {