diff --git a/zenoh/src/net/primitives/demux.rs b/zenoh/src/net/primitives/demux.rs index 95b89268df..d62e410c81 100644 --- a/zenoh/src/net/primitives/demux.rs +++ b/zenoh/src/net/primitives/demux.rs @@ -17,7 +17,7 @@ use crate::net::routing::{ interceptor::{InterceptorTrait, InterceptorsChain}, RoutingContext, }; -use std::any::Any; +use std::{any::Any, sync::Arc}; use zenoh_link::Link; use zenoh_protocol::network::{NetworkBody, NetworkMessage}; use zenoh_result::ZResult; @@ -27,14 +27,14 @@ use zenoh_transport::TransportPeerEventHandler; pub struct DeMux { face: Face, pub(crate) transport: Option, - pub(crate) interceptor: InterceptorsChain, + pub(crate) interceptor: Arc, } impl DeMux { pub(crate) fn new( face: Face, transport: Option, - interceptor: InterceptorsChain, + interceptor: Arc, ) -> Self { Self { face, @@ -49,7 +49,15 @@ impl TransportPeerEventHandler for DeMux { fn handle_message(&self, mut msg: NetworkMessage) -> ZResult<()> { if !self.interceptor.interceptors.is_empty() { let ctx = RoutingContext::new_in(msg, self.face.clone()); - let ctx = match self.interceptor.intercept(ctx) { + let prefix = ctx + .wire_expr() + .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) + .flatten() + .cloned(); + let cache = prefix + .as_ref() + .and_then(|p| p.get_ingress_cache(&self.face)); + let ctx = match self.interceptor.intercept(ctx, cache) { Some(ctx) => ctx, None => return Ok(()), }; diff --git a/zenoh/src/net/primitives/mod.rs b/zenoh/src/net/primitives/mod.rs index cbfa2e3716..fd85280be0 100644 --- a/zenoh/src/net/primitives/mod.rs +++ b/zenoh/src/net/primitives/mod.rs @@ -14,6 +14,8 @@ mod demux; mod mux; +use std::any::Any; + pub use demux::*; pub use mux::*; use zenoh_protocol::network::{Declare, Push, Request, Response, ResponseFinal}; @@ -35,6 +37,8 @@ pub trait Primitives: Send + Sync { } pub(crate) trait EPrimitives: Send + Sync { + fn as_any(&self) -> &dyn Any; + fn send_declare(&self, ctx: RoutingContext); fn send_push(&self, msg: Push); @@ -77,4 +81,8 @@ impl EPrimitives for DummyPrimitives { fn send_response_final(&self, _ctx: RoutingContext) {} fn send_close(&self) {} + + fn as_any(&self) -> &dyn Any { + self + } } diff --git a/zenoh/src/net/primitives/mux.rs b/zenoh/src/net/primitives/mux.rs index 17aad11311..442c040624 100644 --- a/zenoh/src/net/primitives/mux.rs +++ b/zenoh/src/net/primitives/mux.rs @@ -50,7 +50,13 @@ impl Primitives for Mux { let _ = self.handler.schedule(msg); } else if let Some(face) = self.face.get() { let ctx = RoutingContext::new_out(msg, face.clone()); - if let Some(ctx) = self.interceptor.intercept(ctx) { + let prefix = ctx + .wire_expr() + .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) + .flatten() + .cloned(); + let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(face)); + if let Some(ctx) = self.interceptor.intercept(ctx, cache) { let _ = self.handler.schedule(ctx.msg); } } else { @@ -68,7 +74,13 @@ impl Primitives for Mux { let _ = self.handler.schedule(msg); } else if let Some(face) = self.face.get() { let ctx = RoutingContext::new_out(msg, face.clone()); - if let Some(ctx) = self.interceptor.intercept(ctx) { + let prefix = ctx + .wire_expr() + .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) + .flatten() + .cloned(); + let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(face)); + if let Some(ctx) = self.interceptor.intercept(ctx, cache) { let _ = self.handler.schedule(ctx.msg); } } else { @@ -86,7 +98,13 @@ impl Primitives for Mux { let _ = self.handler.schedule(msg); } else if let Some(face) = self.face.get() { let ctx = RoutingContext::new_out(msg, face.clone()); - if let Some(ctx) = self.interceptor.intercept(ctx) { + let prefix = ctx + .wire_expr() + .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) + .flatten() + .cloned(); + let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(face)); + if let Some(ctx) = self.interceptor.intercept(ctx, cache) { let _ = self.handler.schedule(ctx.msg); } } else { @@ -104,7 +122,13 @@ impl Primitives for Mux { let _ = self.handler.schedule(msg); } else if let Some(face) = self.face.get() { let ctx = RoutingContext::new_out(msg, face.clone()); - if let Some(ctx) = self.interceptor.intercept(ctx) { + let prefix = ctx + .wire_expr() + .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) + .flatten() + .cloned(); + let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(face)); + if let Some(ctx) = self.interceptor.intercept(ctx, cache) { let _ = self.handler.schedule(ctx.msg); } } else { @@ -122,7 +146,13 @@ impl Primitives for Mux { let _ = self.handler.schedule(msg); } else if let Some(face) = self.face.get() { let ctx = RoutingContext::new_out(msg, face.clone()); - if let Some(ctx) = self.interceptor.intercept(ctx) { + let prefix = ctx + .wire_expr() + .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) + .flatten() + .cloned(); + let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(face)); + if let Some(ctx) = self.interceptor.intercept(ctx, cache) { let _ = self.handler.schedule(ctx.msg); } } else { @@ -148,7 +178,15 @@ impl EPrimitives for Mux { prefix: ctx.prefix, full_expr: ctx.full_expr, }; - if let Some(ctx) = self.interceptor.intercept(ctx) { + let prefix = ctx + .wire_expr() + .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) + .flatten() + .cloned(); + let cache = prefix + .as_ref() + .and_then(|p| p.get_egress_cache(ctx.outface.get().unwrap())); + if let Some(ctx) = self.interceptor.intercept(ctx, cache) { let _ = self.handler.schedule(ctx.msg); } } @@ -163,7 +201,13 @@ impl EPrimitives for Mux { let _ = self.handler.schedule(msg); } else if let Some(face) = self.face.get() { let ctx = RoutingContext::new_out(msg, face.clone()); - if let Some(ctx) = self.interceptor.intercept(ctx) { + let prefix = ctx + .wire_expr() + .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) + .flatten() + .cloned(); + let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(face)); + if let Some(ctx) = self.interceptor.intercept(ctx, cache) { let _ = self.handler.schedule(ctx.msg); } } else { @@ -183,7 +227,15 @@ impl EPrimitives for Mux { prefix: ctx.prefix, full_expr: ctx.full_expr, }; - if let Some(ctx) = self.interceptor.intercept(ctx) { + let prefix = ctx + .wire_expr() + .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) + .flatten() + .cloned(); + let cache = prefix + .as_ref() + .and_then(|p| p.get_egress_cache(ctx.outface.get().unwrap())); + if let Some(ctx) = self.interceptor.intercept(ctx, cache) { let _ = self.handler.schedule(ctx.msg); } } @@ -200,7 +252,15 @@ impl EPrimitives for Mux { prefix: ctx.prefix, full_expr: ctx.full_expr, }; - if let Some(ctx) = self.interceptor.intercept(ctx) { + let prefix = ctx + .wire_expr() + .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) + .flatten() + .cloned(); + let cache = prefix + .as_ref() + .and_then(|p| p.get_egress_cache(ctx.outface.get().unwrap())); + if let Some(ctx) = self.interceptor.intercept(ctx, cache) { let _ = self.handler.schedule(ctx.msg); } } @@ -217,7 +277,15 @@ impl EPrimitives for Mux { prefix: ctx.prefix, full_expr: ctx.full_expr, }; - if let Some(ctx) = self.interceptor.intercept(ctx) { + let prefix = ctx + .wire_expr() + .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) + .flatten() + .cloned(); + let cache = prefix + .as_ref() + .and_then(|p| p.get_egress_cache(ctx.outface.get().unwrap())); + if let Some(ctx) = self.interceptor.intercept(ctx, cache) { let _ = self.handler.schedule(ctx.msg); } } @@ -225,6 +293,10 @@ impl EPrimitives for Mux { fn send_close(&self) { // self.handler.closing().await; } + + fn as_any(&self) -> &dyn std::any::Any { + self + } } pub struct McastMux { @@ -254,7 +326,13 @@ impl Primitives for McastMux { let _ = self.handler.schedule(msg); } else if let Some(face) = self.face.get() { let ctx = RoutingContext::new_out(msg, face.clone()); - if let Some(ctx) = self.interceptor.intercept(ctx) { + let prefix = ctx + .wire_expr() + .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) + .flatten() + .cloned(); + let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(face)); + if let Some(ctx) = self.interceptor.intercept(ctx, cache) { let _ = self.handler.schedule(ctx.msg); } } else { @@ -272,7 +350,13 @@ impl Primitives for McastMux { let _ = self.handler.schedule(msg); } else if let Some(face) = self.face.get() { let ctx = RoutingContext::new_out(msg, face.clone()); - if let Some(ctx) = self.interceptor.intercept(ctx) { + let prefix = ctx + .wire_expr() + .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) + .flatten() + .cloned(); + let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(face)); + if let Some(ctx) = self.interceptor.intercept(ctx, cache) { let _ = self.handler.schedule(ctx.msg); } } else { @@ -290,7 +374,13 @@ impl Primitives for McastMux { let _ = self.handler.schedule(msg); } else if let Some(face) = self.face.get() { let ctx = RoutingContext::new_out(msg, face.clone()); - if let Some(ctx) = self.interceptor.intercept(ctx) { + let prefix = ctx + .wire_expr() + .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) + .flatten() + .cloned(); + let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(face)); + if let Some(ctx) = self.interceptor.intercept(ctx, cache) { let _ = self.handler.schedule(ctx.msg); } } else { @@ -308,7 +398,13 @@ impl Primitives for McastMux { let _ = self.handler.schedule(msg); } else if let Some(face) = self.face.get() { let ctx = RoutingContext::new_out(msg, face.clone()); - if let Some(ctx) = self.interceptor.intercept(ctx) { + let prefix = ctx + .wire_expr() + .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) + .flatten() + .cloned(); + let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(face)); + if let Some(ctx) = self.interceptor.intercept(ctx, cache) { let _ = self.handler.schedule(ctx.msg); } } else { @@ -326,7 +422,13 @@ impl Primitives for McastMux { let _ = self.handler.schedule(msg); } else if let Some(face) = self.face.get() { let ctx = RoutingContext::new_out(msg, face.clone()); - if let Some(ctx) = self.interceptor.intercept(ctx) { + let prefix = ctx + .wire_expr() + .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) + .flatten() + .cloned(); + let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(face)); + if let Some(ctx) = self.interceptor.intercept(ctx, cache) { let _ = self.handler.schedule(ctx.msg); } } else { @@ -352,7 +454,15 @@ impl EPrimitives for McastMux { prefix: ctx.prefix, full_expr: ctx.full_expr, }; - if let Some(ctx) = self.interceptor.intercept(ctx) { + let prefix = ctx + .wire_expr() + .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) + .flatten() + .cloned(); + let cache = prefix + .as_ref() + .and_then(|p| p.get_egress_cache(ctx.outface.get().unwrap())); + if let Some(ctx) = self.interceptor.intercept(ctx, cache) { let _ = self.handler.schedule(ctx.msg); } } @@ -367,7 +477,13 @@ impl EPrimitives for McastMux { let _ = self.handler.schedule(msg); } else if let Some(face) = self.face.get() { let ctx = RoutingContext::new_out(msg, face.clone()); - if let Some(ctx) = self.interceptor.intercept(ctx) { + let prefix = ctx + .wire_expr() + .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) + .flatten() + .cloned(); + let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(face)); + if let Some(ctx) = self.interceptor.intercept(ctx, cache) { let _ = self.handler.schedule(ctx.msg); } } else { @@ -387,7 +503,15 @@ impl EPrimitives for McastMux { prefix: ctx.prefix, full_expr: ctx.full_expr, }; - if let Some(ctx) = self.interceptor.intercept(ctx) { + let prefix = ctx + .wire_expr() + .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) + .flatten() + .cloned(); + let cache = prefix + .as_ref() + .and_then(|p| p.get_egress_cache(ctx.outface.get().unwrap())); + if let Some(ctx) = self.interceptor.intercept(ctx, cache) { let _ = self.handler.schedule(ctx.msg); } } @@ -404,7 +528,15 @@ impl EPrimitives for McastMux { prefix: ctx.prefix, full_expr: ctx.full_expr, }; - if let Some(ctx) = self.interceptor.intercept(ctx) { + let prefix = ctx + .wire_expr() + .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) + .flatten() + .cloned(); + let cache = prefix + .as_ref() + .and_then(|p| p.get_egress_cache(ctx.outface.get().unwrap())); + if let Some(ctx) = self.interceptor.intercept(ctx, cache) { let _ = self.handler.schedule(ctx.msg); } } @@ -421,7 +553,15 @@ impl EPrimitives for McastMux { prefix: ctx.prefix, full_expr: ctx.full_expr, }; - if let Some(ctx) = self.interceptor.intercept(ctx) { + let prefix = ctx + .wire_expr() + .and_then(|we| (!we.has_suffix()).then(|| ctx.prefix())) + .flatten() + .cloned(); + let cache = prefix + .as_ref() + .and_then(|p| p.get_egress_cache(ctx.outface.get().unwrap())); + if let Some(ctx) = self.interceptor.intercept(ctx, cache) { let _ = self.handler.schedule(ctx.msg); } } @@ -429,4 +569,8 @@ impl EPrimitives for McastMux { fn send_close(&self) { // self.handler.closing().await; } + + fn as_any(&self) -> &dyn std::any::Any { + self + } } diff --git a/zenoh/src/net/routing/dispatcher/face.rs b/zenoh/src/net/routing/dispatcher/face.rs index fcf6d3c302..6ef5c063d0 100644 --- a/zenoh/src/net/routing/dispatcher/face.rs +++ b/zenoh/src/net/routing/dispatcher/face.rs @@ -14,7 +14,9 @@ use super::super::router::*; use super::tables::TablesLock; use super::{resource::*, tables}; -use crate::net::primitives::Primitives; +use crate::net::primitives::{McastMux, Mux, Primitives}; +use crate::net::routing::interceptor::{InterceptorTrait, InterceptorsChain}; +use crate::KeyExpr; use std::any::Any; use std::collections::HashMap; use std::fmt; @@ -24,6 +26,7 @@ use zenoh_protocol::{ core::{ExprId, WhatAmI, ZenohId}, network::{Mapping, Push, Request, RequestId, Response, ResponseFinal}, }; +use zenoh_sync::get_mut_unchecked; use zenoh_transport::multicast::TransportMulticast; #[cfg(feature = "stats")] use zenoh_transport::stats::TransportStats; @@ -40,10 +43,12 @@ pub struct FaceState { pub(crate) next_qid: RequestId, pub(crate) pending_queries: HashMap>, pub(crate) mcast_group: Option, + pub(crate) in_interceptors: Option>, pub(crate) hat: Box, } impl FaceState { + #[allow(clippy::too_many_arguments)] // @TODO fix warning pub(crate) fn new( id: usize, zid: ZenohId, @@ -51,6 +56,7 @@ impl FaceState { #[cfg(feature = "stats")] stats: Option>, primitives: Arc, mcast_group: Option, + in_interceptors: Option>, hat: Box, ) -> Arc { Arc::new(FaceState { @@ -65,6 +71,7 @@ impl FaceState { next_qid: 0, pending_queries: HashMap::new(), mcast_group, + in_interceptors, hat, }) } @@ -100,6 +107,41 @@ impl FaceState { } id } + + pub(crate) fn update_interceptors_caches(&self, res: &mut Arc) { + if let Ok(expr) = KeyExpr::try_from(res.expr()) { + if let Some(interceptor) = self.in_interceptors.as_ref() { + let cache = interceptor.compute_keyexpr_cache(&expr); + get_mut_unchecked( + get_mut_unchecked(res) + .session_ctxs + .get_mut(&self.id) + .unwrap(), + ) + .in_interceptor_cache = cache; + } + if let Some(mux) = self.primitives.as_any().downcast_ref::() { + let cache = mux.interceptor.compute_keyexpr_cache(&expr); + get_mut_unchecked( + get_mut_unchecked(res) + .session_ctxs + .get_mut(&self.id) + .unwrap(), + ) + .e_interceptor_cache = cache; + } + if let Some(mux) = self.primitives.as_any().downcast_ref::() { + let cache = mux.interceptor.compute_keyexpr_cache(&expr); + get_mut_unchecked( + get_mut_unchecked(res) + .session_ctxs + .get_mut(&self.id) + .unwrap(), + ) + .e_interceptor_cache = cache; + } + } + } } impl fmt::Display for FaceState { diff --git a/zenoh/src/net/routing/dispatcher/resource.rs b/zenoh/src/net/routing/dispatcher/resource.rs index 7fc71c623d..26a498461a 100644 --- a/zenoh/src/net/routing/dispatcher/resource.rs +++ b/zenoh/src/net/routing/dispatcher/resource.rs @@ -13,6 +13,7 @@ // use super::face::FaceState; use super::tables::{Tables, TablesLock}; +use crate::net::routing::dispatcher::face::Face; use crate::net::routing::RoutingContext; use std::any::Any; use std::collections::HashMap; @@ -59,6 +60,8 @@ pub(crate) struct SessionContext { pub(crate) subs: Option, pub(crate) qabl: Option, pub(crate) last_values: HashMap, + pub(crate) in_interceptor_cache: Option>, + pub(crate) e_interceptor_cache: Option>, } #[derive(Default)] @@ -443,6 +446,8 @@ impl Resource { subs: None, qabl: None, last_values: HashMap::new(), + in_interceptor_cache: None, + e_interceptor_cache: None, }) }); @@ -476,6 +481,7 @@ impl Resource { }, nonwild_prefix.expr(), )); + face.update_interceptors_caches(&mut nonwild_prefix); WireExpr { scope: expr_id, suffix: wildsuffix.into(), @@ -631,6 +637,18 @@ impl Resource { get_mut_unchecked(res).context = Some(ResourceContext::new(hat)); } } + + pub(crate) fn get_ingress_cache(&self, face: &Face) -> Option<&Box> { + self.session_ctxs + .get(&face.state.id) + .and_then(|ctx| ctx.in_interceptor_cache.as_ref()) + } + + pub(crate) fn get_egress_cache(&self, face: &Face) -> Option<&Box> { + self.session_ctxs + .get(&face.state.id) + .and_then(|ctx| ctx.e_interceptor_cache.as_ref()) + } } pub fn register_expr( @@ -687,6 +705,8 @@ pub fn register_expr( subs: None, qabl: None, last_values: HashMap::new(), + in_interceptor_cache: None, + e_interceptor_cache: None, }) }); @@ -694,6 +714,7 @@ pub fn register_expr( .remote_mappings .insert(expr_id, res.clone()); wtables.update_matches_routes(&mut res); + face.update_interceptors_caches(&mut res); drop(wtables); } }, diff --git a/zenoh/src/net/routing/hat/client/pubsub.rs b/zenoh/src/net/routing/hat/client/pubsub.rs index 7becff4b4d..acd504d3c2 100644 --- a/zenoh/src/net/routing/hat/client/pubsub.rs +++ b/zenoh/src/net/routing/hat/client/pubsub.rs @@ -110,6 +110,8 @@ fn register_client_subscription( subs: Some(*sub_info), qabl: None, last_values: HashMap::new(), + in_interceptor_cache: None, + e_interceptor_cache: None, }), ); } diff --git a/zenoh/src/net/routing/hat/client/queries.rs b/zenoh/src/net/routing/hat/client/queries.rs index 35a10557dc..e89cfb174d 100644 --- a/zenoh/src/net/routing/hat/client/queries.rs +++ b/zenoh/src/net/routing/hat/client/queries.rs @@ -129,6 +129,8 @@ fn register_client_queryable( subs: None, qabl: None, last_values: HashMap::new(), + in_interceptor_cache: None, + e_interceptor_cache: None, }) })) .qabl = Some(*qabl_info); diff --git a/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs b/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs index a83d72de20..0c05c39c7b 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs @@ -227,6 +227,8 @@ fn register_client_subscription( subs: Some(*sub_info), qabl: None, last_values: HashMap::new(), + in_interceptor_cache: None, + e_interceptor_cache: None, }), ); } diff --git a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs index 6281993c93..b965a6f58b 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs @@ -284,6 +284,8 @@ fn register_client_queryable( subs: None, qabl: None, last_values: HashMap::new(), + in_interceptor_cache: None, + e_interceptor_cache: None, }) })) .qabl = Some(*qabl_info); diff --git a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs index 8f91335f0a..90eae923a6 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs @@ -110,6 +110,8 @@ fn register_client_subscription( subs: Some(*sub_info), qabl: None, last_values: HashMap::new(), + in_interceptor_cache: None, + e_interceptor_cache: None, }), ); } diff --git a/zenoh/src/net/routing/hat/p2p_peer/queries.rs b/zenoh/src/net/routing/hat/p2p_peer/queries.rs index 662cf5b7bd..95d357fe11 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/queries.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/queries.rs @@ -129,6 +129,8 @@ fn register_client_queryable( subs: None, qabl: None, last_values: HashMap::new(), + in_interceptor_cache: None, + e_interceptor_cache: None, }) })) .qabl = Some(*qabl_info); diff --git a/zenoh/src/net/routing/hat/router/pubsub.rs b/zenoh/src/net/routing/hat/router/pubsub.rs index d8b3220116..8f349d9c78 100644 --- a/zenoh/src/net/routing/hat/router/pubsub.rs +++ b/zenoh/src/net/routing/hat/router/pubsub.rs @@ -284,6 +284,8 @@ fn register_client_subscription( subs: Some(*sub_info), qabl: None, last_values: HashMap::new(), + in_interceptor_cache: None, + e_interceptor_cache: None, }), ); } diff --git a/zenoh/src/net/routing/hat/router/queries.rs b/zenoh/src/net/routing/hat/router/queries.rs index 90944a524f..12338eb339 100644 --- a/zenoh/src/net/routing/hat/router/queries.rs +++ b/zenoh/src/net/routing/hat/router/queries.rs @@ -417,6 +417,8 @@ fn register_client_queryable( subs: None, qabl: None, last_values: HashMap::new(), + in_interceptor_cache: None, + e_interceptor_cache: None, }) })) .qabl = Some(*qabl_info); diff --git a/zenoh/src/net/routing/interceptor/downsampling.rs b/zenoh/src/net/routing/interceptor/downsampling.rs index 765dab8925..5435c30bed 100644 --- a/zenoh/src/net/routing/interceptor/downsampling.rs +++ b/zenoh/src/net/routing/interceptor/downsampling.rs @@ -114,9 +114,14 @@ pub(crate) struct DownsamplingInterceptor { } impl InterceptorTrait for DownsamplingInterceptor { + fn compute_keyexpr_cache(&self, _key_expr: &KeyExpr<'_>) -> Option> { + None + } + fn intercept( &self, ctx: RoutingContext, + _cache: Option<&Box>, ) -> Option> { if matches!(ctx.msg.body, NetworkBody::Push(_)) { if let Some(key_expr) = ctx.full_key_expr() { diff --git a/zenoh/src/net/routing/interceptor/mod.rs b/zenoh/src/net/routing/interceptor/mod.rs index 81ff6d15da..9dfc03ac7e 100644 --- a/zenoh/src/net/routing/interceptor/mod.rs +++ b/zenoh/src/net/routing/interceptor/mod.rs @@ -18,6 +18,8 @@ //! //! [Click here for Zenoh's documentation](../zenoh/index.html) use super::RoutingContext; +use crate::KeyExpr; +use std::any::Any; use zenoh_config::Config; use zenoh_protocol::network::NetworkMessage; use zenoh_result::ZResult; @@ -27,9 +29,12 @@ pub mod downsampling; use crate::net::routing::interceptor::downsampling::downsampling_interceptor_factories; pub(crate) trait InterceptorTrait { + fn compute_keyexpr_cache(&self, key_expr: &KeyExpr<'_>) -> Option>; + fn intercept( &self, ctx: RoutingContext, + cache: Option<&Box>, ) -> Option>; } @@ -79,12 +84,27 @@ impl From> for InterceptorsChain { } impl InterceptorTrait for InterceptorsChain { - fn intercept( + fn compute_keyexpr_cache(&self, key_expr: &KeyExpr<'_>) -> Option> { + Some(Box::new( + self.interceptors + .iter() + .map(|i| i.compute_keyexpr_cache(key_expr)) + .collect::>>>(), + )) + } + + fn intercept<'a>( &self, mut ctx: RoutingContext, + caches: Option<&Box>, ) -> Option> { - for interceptor in &self.interceptors { - match interceptor.intercept(ctx) { + let caches = + caches.and_then(|i| i.downcast_ref::>>>()); + for (idx, interceptor) in self.interceptors.iter().enumerate() { + let cache = caches + .and_then(|caches| caches.get(idx).map(|k| k.as_ref())) + .flatten(); + match interceptor.intercept(ctx, cache) { Some(newctx) => ctx = newctx, None => { log::trace!("Msg intercepted!"); @@ -96,20 +116,67 @@ impl InterceptorTrait for InterceptorsChain { } } +pub(crate) struct ComputeOnMiss { + interceptor: T, +} + +impl ComputeOnMiss { + #[allow(dead_code)] + pub(crate) fn new(interceptor: T) -> Self { + Self { interceptor } + } +} + +impl InterceptorTrait for ComputeOnMiss { + #[inline] + fn compute_keyexpr_cache(&self, key_expr: &KeyExpr<'_>) -> Option> { + self.interceptor.compute_keyexpr_cache(key_expr) + } + + #[inline] + fn intercept<'a>( + &self, + ctx: RoutingContext, + cache: Option<&Box>, + ) -> Option> { + if cache.is_some() { + self.interceptor.intercept(ctx, cache) + } else if let Some(key_expr) = ctx.full_key_expr() { + self.interceptor.intercept( + ctx, + self.interceptor + .compute_keyexpr_cache(&key_expr.into()) + .as_ref(), + ) + } else { + self.interceptor.intercept(ctx, cache) + } + } +} + pub(crate) struct IngressMsgLogger {} impl InterceptorTrait for IngressMsgLogger { + fn compute_keyexpr_cache(&self, key_expr: &KeyExpr<'_>) -> Option> { + Some(Box::new(key_expr.to_string())) + } + fn intercept( &self, ctx: RoutingContext, + cache: Option<&Box>, ) -> Option> { + let expr = cache + .and_then(|i| i.downcast_ref::().map(|e| e.as_str())) + .or_else(|| ctx.full_expr()); + log::debug!( - "Recv {} {} Expr:{:?}", + "{} Recv {} Expr:{:?}", ctx.inface() .map(|f| f.to_string()) .unwrap_or("None".to_string()), ctx.msg, - ctx.full_expr(), + expr, ); Some(ctx) } @@ -117,11 +184,26 @@ impl InterceptorTrait for IngressMsgLogger { pub(crate) struct EgressMsgLogger {} impl InterceptorTrait for EgressMsgLogger { + fn compute_keyexpr_cache(&self, key_expr: &KeyExpr<'_>) -> Option> { + Some(Box::new(key_expr.to_string())) + } + fn intercept( &self, ctx: RoutingContext, + cache: Option<&Box>, ) -> Option> { - log::debug!("Send {} Expr:{:?}", ctx.msg, ctx.full_expr()); + let expr = cache + .and_then(|i| i.downcast_ref::().map(|e| e.as_str())) + .or_else(|| ctx.full_expr()); + log::debug!( + "{} Send {} Expr:{:?}", + ctx.outface() + .map(|f| f.to_string()) + .unwrap_or("None".to_string()), + ctx.msg, + expr + ); Some(ctx) } } diff --git a/zenoh/src/net/routing/mod.rs b/zenoh/src/net/routing/mod.rs index 373f7d8273..afc49003f8 100644 --- a/zenoh/src/net/routing/mod.rs +++ b/zenoh/src/net/routing/mod.rs @@ -125,10 +125,7 @@ impl RoutingContext { } #[inline] - pub(crate) fn full_expr(&self) -> Option<&str> { - if self.full_expr.get().is_some() { - return Some(self.full_expr.get().as_ref().unwrap()); - } + pub(crate) fn prefix(&self) -> Option<&Arc> { if let Some(face) = self.outface.get() { if let Some(wire_expr) = self.wire_expr() { let wire_expr = wire_expr.to_owned(); @@ -140,12 +137,7 @@ impl RoutingContext { let _ = self.prefix.set(prefix); } } - if let Some(prefix) = self.prefix.get().cloned() { - let _ = self - .full_expr - .set(prefix.expr() + wire_expr.suffix.as_ref()); - return Some(self.full_expr.get().as_ref().unwrap()); - } + return self.prefix.get(); } } if let Some(face) = self.inface.get() { @@ -159,17 +151,27 @@ impl RoutingContext { let _ = self.prefix.set(prefix); } } - if let Some(prefix) = self.prefix.get().cloned() { - let _ = self - .full_expr - .set(prefix.expr() + wire_expr.suffix.as_ref()); - return Some(self.full_expr.get().as_ref().unwrap()); - } + return self.prefix.get(); } } None } + #[inline] + #[allow(dead_code)] + pub(crate) fn full_expr(&self) -> Option<&str> { + if self.full_expr.get().is_some() { + return Some(self.full_expr.get().as_ref().unwrap()); + } + if let Some(prefix) = self.prefix() { + let _ = self + .full_expr + .set(prefix.expr() + self.wire_expr().unwrap().suffix.as_ref()); + return Some(self.full_expr.get().as_ref().unwrap()); + } + None + } + #[inline] pub(crate) fn full_key_expr(&self) -> Option { let full_expr = self.full_expr()?; diff --git a/zenoh/src/net/routing/router.rs b/zenoh/src/net/routing/router.rs index ba0249af1b..d67a2baa9d 100644 --- a/zenoh/src/net/routing/router.rs +++ b/zenoh/src/net/routing/router.rs @@ -90,6 +90,7 @@ impl Router { None, primitives.clone(), None, + None, ctrl_lock.new_face(), ) }) @@ -124,7 +125,9 @@ impl Router { .map(|itor| itor.new_transport_unicast(&transport)) .unzip(); let (ingress, egress) = ( - InterceptorsChain::from(ingress.into_iter().flatten().collect::>()), + Arc::new(InterceptorsChain::from( + ingress.into_iter().flatten().collect::>(), + )), InterceptorsChain::from(egress.into_iter().flatten().collect::>()), ); let mux = Arc::new(Mux::new(transport.clone(), egress)); @@ -140,6 +143,7 @@ impl Router { Some(stats), mux.clone(), None, + Some(ingress.clone()), ctrl_lock.new_face(), ) }) @@ -179,6 +183,7 @@ impl Router { None, mux.clone(), Some(transport), + None, ctrl_lock.new_face(), ); let _ = mux.face.set(Face { @@ -202,13 +207,13 @@ impl Router { let mut tables = zwrite!(self.tables.tables); let fid = tables.face_counter; tables.face_counter += 1; - let interceptor = InterceptorsChain::from( + let interceptor = Arc::new(InterceptorsChain::from( tables .interceptors .iter() .filter_map(|itor| itor.new_peer_multicast(&transport)) .collect::>(), - ); + )); let face_state = FaceState::new( fid, peer.zid, @@ -217,6 +222,7 @@ impl Router { Some(transport.get_stats().unwrap()), Arc::new(DummyPrimitives), Some(transport), + Some(interceptor.clone()), ctrl_lock.new_face(), ); tables.mcast_faces.push(face_state.clone()); diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index f6fb13e76e..1283ee0fce 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -480,6 +480,10 @@ impl crate::net::primitives::EPrimitives for AdminSpace { fn send_close(&self) { (self as &dyn Primitives).send_close() } + + fn as_any(&self) -> &dyn std::any::Any { + self + } } fn router_data(context: &AdminContext, query: Query) { diff --git a/zenoh/src/net/tests/tables.rs b/zenoh/src/net/tests/tables.rs index ddcdc0084e..e8b6f6ac9f 100644 --- a/zenoh/src/net/tests/tables.rs +++ b/zenoh/src/net/tests/tables.rs @@ -471,6 +471,10 @@ impl EPrimitives for ClientPrimitives { fn send_response_final(&self, _ctx: RoutingContext) {} fn send_close(&self) {} + + fn as_any(&self) -> &dyn std::any::Any { + self + } } #[test] diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index 43ef74d58f..374320bde9 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -2679,4 +2679,8 @@ impl crate::net::primitives::EPrimitives for Session { fn send_close(&self) { (self as &dyn Primitives).send_close() } + + fn as_any(&self) -> &dyn std::any::Any { + self + } }