Skip to content

Commit

Permalink
Interceptors caches (#744)
Browse files Browse the repository at this point in the history
* Interceptors caches

* Update LoggerInterceptor

* Add ComputeOnMiss Interceptor
  • Loading branch information
OlivierHecart authored Mar 4, 2024
1 parent 200dd28 commit a1cf2be
Show file tree
Hide file tree
Showing 20 changed files with 396 additions and 50 deletions.
16 changes: 12 additions & 4 deletions zenoh/src/net/primitives/demux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::net::routing::{
interceptor::{InterceptorTrait, InterceptorsChain},
RoutingContext,
};
use std::any::Any;
use std::{any::Any, sync::Arc};
use zenoh_link::Link;
use zenoh_protocol::network::{NetworkBody, NetworkMessage};
use zenoh_result::ZResult;
Expand All @@ -27,14 +27,14 @@ use zenoh_transport::TransportPeerEventHandler;
pub struct DeMux {
face: Face,
pub(crate) transport: Option<TransportUnicast>,
pub(crate) interceptor: InterceptorsChain,
pub(crate) interceptor: Arc<InterceptorsChain>,
}

impl DeMux {
pub(crate) fn new(
face: Face,
transport: Option<TransportUnicast>,
interceptor: InterceptorsChain,
interceptor: Arc<InterceptorsChain>,
) -> Self {
Self {
face,
Expand All @@ -49,7 +49,15 @@ impl TransportPeerEventHandler for DeMux {
fn handle_message(&self, mut msg: NetworkMessage) -> ZResult<()> {
if !self.interceptor.interceptors.is_empty() {
let ctx = RoutingContext::new_in(msg, self.face.clone());
let ctx = match self.interceptor.intercept(ctx) {
let prefix = ctx
.wire_expr()
.and_then(|we| (!we.has_suffix()).then(|| ctx.prefix()))
.flatten()
.cloned();
let cache = prefix
.as_ref()
.and_then(|p| p.get_ingress_cache(&self.face));
let ctx = match self.interceptor.intercept(ctx, cache) {
Some(ctx) => ctx,
None => return Ok(()),
};
Expand Down
8 changes: 8 additions & 0 deletions zenoh/src/net/primitives/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
mod demux;
mod mux;

use std::any::Any;

pub use demux::*;
pub use mux::*;
use zenoh_protocol::network::{Declare, Push, Request, Response, ResponseFinal};
Expand All @@ -35,6 +37,8 @@ pub trait Primitives: Send + Sync {
}

pub(crate) trait EPrimitives: Send + Sync {
fn as_any(&self) -> &dyn Any;

fn send_declare(&self, ctx: RoutingContext<Declare>);

fn send_push(&self, msg: Push);
Expand Down Expand Up @@ -77,4 +81,8 @@ impl EPrimitives for DummyPrimitives {
fn send_response_final(&self, _ctx: RoutingContext<ResponseFinal>) {}

fn send_close(&self) {}

fn as_any(&self) -> &dyn Any {
self
}
}
Loading

0 comments on commit a1cf2be

Please sign in to comment.