Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance subscribers, queryables and liveliness tokens propagation to improve scalability #814

Merged
merged 43 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
1eb8629
Router implements interests protocol for clients
OlivierHecart Mar 11, 2024
16f7789
Send WireExpr in UndeclareSubscriber/UndeclareQueryable to clients fo…
OlivierHecart Mar 11, 2024
cf1f579
Fix WireExprExt M flag encoding/decoding
OlivierHecart Mar 12, 2024
6047d75
Fix decl_key
OlivierHecart Mar 15, 2024
5de298f
Clients send all samples and queries to routers and peers
OlivierHecart Mar 15, 2024
961bec7
Avoid self declaration loop on interest
OlivierHecart Mar 19, 2024
eb976c8
Fix query/replies copy/paste bugs
OlivierHecart Mar 21, 2024
b8f1a9c
Peers implement interests protocol for clients
OlivierHecart Mar 21, 2024
83a51e4
Merge branch 'protocol_changes' into interests
OlivierHecart Mar 21, 2024
26bbd8e
Don't send WireExpr in UndeclareSubscriber/UndeclareQueryable to clients
OlivierHecart Mar 22, 2024
cede672
Add client writer-side filtering (#863)
OlivierHecart Mar 27, 2024
76fb3ed
Merge branch 'protocol_changes' into interests
OlivierHecart Mar 27, 2024
df2ea58
Fix pubsub interest based routing after router failover
OlivierHecart Mar 27, 2024
41f59d3
Declare message can be Push/Request/RequestContinuous/Response
Mallets Apr 4, 2024
43a61c7
Address review comments
Mallets Apr 4, 2024
bce8855
Remove F: Future flag from DeclareInterest
Mallets Apr 5, 2024
3da2aed
cargo fmt --all
Mallets Apr 5, 2024
c753e82
Remove unused Interest flags field
OlivierHecart Apr 5, 2024
52ff7d0
Update doc
OlivierHecart Apr 5, 2024
8c9abc1
Remove unneeded interest_id field
OlivierHecart Apr 5, 2024
3a4161b
Merge branch 'protocol_changes' into protocol_declare
OlivierHecart Apr 5, 2024
9aa2079
Update commons/zenoh-protocol/src/network/declare.rs
Mallets Apr 5, 2024
dd2ef80
Remove unused UndeclareInterest
OlivierHecart Apr 5, 2024
b6dc311
Merge branch 'protocol_changes' into protocol_declare
OlivierHecart Apr 5, 2024
83d781f
Merge branch 'protocol_changes' into interests
OlivierHecart Apr 5, 2024
7f55917
Implement proper Declare Request/Response id correlation
OlivierHecart Apr 5, 2024
62192b9
Add new Interest network message
OlivierHecart Apr 8, 2024
3ebad65
Merge branch 'protocol_changes' into protocol_declare
OlivierHecart Apr 8, 2024
e3a8eb2
Update doc
OlivierHecart Apr 8, 2024
a80ce2b
Update codec
OlivierHecart Apr 8, 2024
bac3acb
Merge branch 'protocol_declare' into interests
OlivierHecart Apr 18, 2024
4e0ccae
Merge branch 'protocol_changes' into interests
OlivierHecart Apr 18, 2024
d8ba33c
Merge branch 'protocol_changes' into interests
OlivierHecart Apr 18, 2024
59ae98f
Fix stable build
OlivierHecart Apr 18, 2024
6a9c4f7
Fix test_acl
OlivierHecart Apr 18, 2024
9a2a539
Fix writer side filtering
OlivierHecart Apr 23, 2024
d4dcf14
Merge branch 'protocol_changes' into interests
OlivierHecart Apr 26, 2024
c6e8b53
Merge branch 'protocol_changes' into interests
OlivierHecart Apr 26, 2024
0d5df18
Add separate functions to compute matching status
OlivierHecart May 2, 2024
1141291
Merge branch 'protocol_changes' into interests
OlivierHecart May 3, 2024
b43b159
Fix unstable imports
OlivierHecart May 3, 2024
0eb4e98
Remove useless checks
OlivierHecart May 3, 2024
902c958
Merge branch 'protocol_changes' into interests
OlivierHecart May 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions commons/zenoh-codec/src/network/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1114,7 +1114,7 @@ where
if x.wire_expr.has_suffix() {
flags |= 1;
}
if let Mapping::Receiver = wire_expr.mapping {
if let Mapping::Sender = wire_expr.mapping {
flags |= 1 << 1;
}
codec.write(&mut zriter, flags)?;
Expand Down Expand Up @@ -1154,9 +1154,9 @@ where
String::new()
};
let mapping = if imsg::has_flag(flags, 1 << 1) {
Mapping::Receiver
} else {
Mapping::Sender
} else {
Mapping::Receiver
};

Ok((
Expand Down
13 changes: 13 additions & 0 deletions commons/zenoh-protocol/src/network/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,19 @@ pub mod common {
pub mod ext {
use super::*;

/// Flags:
/// - N: Named If N==1 then the key expr has name/suffix
/// - M: Mapping if M==1 then key expr mapping is the one declared by the sender, else it is the one declared by the receiver
///
/// 7 6 5 4 3 2 1 0
/// +-+-+-+-+-+-+-+-+
/// |X|X|X|X|X|X|M|N|
/// +-+-+-+---------+
/// ~ key_scope:z16 ~
/// +---------------+
/// ~ key_suffix ~ if N==1 -- <u8;z16>
/// +---------------+
///
pub type WireExprExt = zextzbuf!(0x0f, true);
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct WireExprType {
Expand Down
84 changes: 79 additions & 5 deletions zenoh/src/net/routing/dispatcher/face.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ use super::tables::TablesLock;
use super::{resource::*, tables};
use crate::net::primitives::{McastMux, Mux, Primitives};
use crate::net::routing::interceptor::{InterceptorTrait, InterceptorsChain};
use crate::net::routing::RoutingContext;
use crate::KeyExpr;
use std::any::Any;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use zenoh_protocol::network::declare::{FinalInterest, Interest, InterestId};
use zenoh_protocol::network::{ext, Declare, DeclareBody};
use zenoh_protocol::zenoh::RequestBody;
use zenoh_protocol::{
core::{ExprId, WhatAmI, ZenohId},
Expand All @@ -38,6 +41,8 @@ pub struct FaceState {
#[cfg(feature = "stats")]
pub(crate) stats: Option<Arc<TransportStats>>,
pub(crate) primitives: Arc<dyn crate::net::primitives::EPrimitives + Send + Sync>,
pub(crate) local_interests: HashMap<InterestId, (Interest, Option<Arc<Resource>>, bool)>,
pub(crate) remote_key_interests: HashMap<InterestId, Option<Arc<Resource>>>,
pub(crate) local_mappings: HashMap<ExprId, Arc<Resource>>,
pub(crate) remote_mappings: HashMap<ExprId, Arc<Resource>>,
pub(crate) next_qid: RequestId,
Expand Down Expand Up @@ -66,6 +71,8 @@ impl FaceState {
#[cfg(feature = "stats")]
stats,
primitives,
local_interests: HashMap::new(),
remote_key_interests: HashMap::new(),
local_mappings: HashMap::new(),
remote_mappings: HashMap::new(),
next_qid: 0,
Expand Down Expand Up @@ -208,11 +215,78 @@ impl Primitives for Face {
msg.ext_nodeid.node_id,
);
}
zenoh_protocol::network::DeclareBody::DeclareToken(_m) => todo!(),
zenoh_protocol::network::DeclareBody::UndeclareToken(_m) => todo!(),
zenoh_protocol::network::DeclareBody::DeclareInterest(_m) => todo!(),
zenoh_protocol::network::DeclareBody::FinalInterest(_m) => todo!(),
zenoh_protocol::network::DeclareBody::UndeclareInterest(_m) => todo!(),
zenoh_protocol::network::DeclareBody::DeclareToken(m) => {
log::warn!("Received unsupported {m:?}")
}
zenoh_protocol::network::DeclareBody::UndeclareToken(m) => {
log::warn!("Received unsupported {m:?}")
}
zenoh_protocol::network::DeclareBody::DeclareInterest(m) => {
if m.interest.keyexprs() && m.interest.future() {
register_expr_interest(
&self.tables,
&mut self.state.clone(),
m.id,
m.wire_expr.as_ref(),
);
}
if m.interest.subscribers() {
declare_sub_interest(
ctrl_lock.as_ref(),
&self.tables,
&mut self.state.clone(),
m.id,
m.wire_expr.as_ref(),
m.interest.current(),
m.interest.future(),
m.interest.aggregate(),
);
}
if m.interest.queryables() {
declare_qabl_interest(
ctrl_lock.as_ref(),
&self.tables,
&mut self.state.clone(),
m.id,
m.wire_expr.as_ref(),
m.interest.current(),
m.interest.future(),
m.interest.aggregate(),
);
}
if m.interest.current() {
self.state.primitives.send_declare(RoutingContext::new_out(
Declare {
ext_qos: ext::QoSType::DECLARE,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
body: DeclareBody::FinalInterest(FinalInterest { id: m.id }),
},
self.clone(),
));
}
}
zenoh_protocol::network::DeclareBody::FinalInterest(m) => {
get_mut_unchecked(&mut self.state.clone())
.local_interests
.entry(m.id)
.and_modify(|interest| interest.2 = true);
}
zenoh_protocol::network::DeclareBody::UndeclareInterest(m) => {
unregister_expr_interest(&self.tables, &mut self.state.clone(), m.id);
undeclare_sub_interest(
ctrl_lock.as_ref(),
&self.tables,
&mut self.state.clone(),
m.id,
);
undeclare_qabl_interest(
ctrl_lock.as_ref(),
&self.tables,
&mut self.state.clone(),
m.id,
);
}
}
drop(ctrl_lock);
}
Expand Down
85 changes: 84 additions & 1 deletion zenoh/src/net/routing/dispatcher/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,97 @@ use std::sync::Arc;
use zenoh_core::zread;
use zenoh_protocol::core::key_expr::keyexpr;
use zenoh_protocol::network::declare::subscriber::ext::SubscriberInfo;
use zenoh_protocol::network::declare::SubscriberId;
use zenoh_protocol::network::declare::{InterestId, SubscriberId};
use zenoh_protocol::{
core::{WhatAmI, WireExpr},
network::{declare::ext, Push},
zenoh::PushBody,
};
use zenoh_sync::get_mut_unchecked;

#[allow(clippy::too_many_arguments)] // TODO refactor
pub(crate) fn declare_sub_interest(
hat_code: &(dyn HatTrait + Send + Sync),
tables: &TablesLock,
face: &mut Arc<FaceState>,
id: InterestId,
expr: Option<&WireExpr>,
current: bool,
future: bool,
aggregate: bool,
) {
if let Some(expr) = expr {
let rtables = zread!(tables.tables);
match rtables
.get_mapping(face, &expr.scope, expr.mapping)
.cloned()
{
Some(mut prefix) => {
log::debug!(
"{} Declare sub interest {} ({}{})",
face,
id,
prefix.expr(),
expr.suffix
);
let res = Resource::get_resource(&prefix, &expr.suffix);
let (mut res, mut wtables) = if res
.as_ref()
.map(|r| r.context.is_some())
.unwrap_or(false)
{
drop(rtables);
let wtables = zwrite!(tables.tables);
(res.unwrap(), wtables)
} else {
let mut fullexpr = prefix.expr();
fullexpr.push_str(expr.suffix.as_ref());
let mut matches = keyexpr::new(fullexpr.as_str())
.map(|ke| Resource::get_matches(&rtables, ke))
.unwrap_or_default();
drop(rtables);
let mut wtables = zwrite!(tables.tables);
let mut res =
Resource::make_resource(&mut wtables, &mut prefix, expr.suffix.as_ref());
matches.push(Arc::downgrade(&res));
Resource::match_resource(&wtables, &mut res, matches);
(res, wtables)
};

hat_code.declare_sub_interest(
&mut wtables,
face,
id,
Some(&mut res),
current,
future,
aggregate,
);
}
None => log::error!(
"{} Declare sub interest {} for unknown scope {}!",
face,
id,
expr.scope
),
}
} else {
let mut wtables = zwrite!(tables.tables);
hat_code.declare_sub_interest(&mut wtables, face, id, None, current, future, aggregate);
}
}

pub(crate) fn undeclare_sub_interest(
hat_code: &(dyn HatTrait + Send + Sync),
tables: &TablesLock,
face: &mut Arc<FaceState>,
id: InterestId,
) {
log::debug!("{} Undeclare sub interest {}", face, id,);
let mut wtables = zwrite!(tables.tables);
hat_code.undeclare_sub_interest(&mut wtables, face, id);
}

pub(crate) fn declare_subscription(
hat_code: &(dyn HatTrait + Send + Sync),
tables: &TablesLock,
Expand Down
84 changes: 84 additions & 0 deletions zenoh/src/net/routing/dispatcher/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::{Arc, Weak};
use zenoh_config::WhatAmI;
use zenoh_protocol::network::declare::InterestId;
use zenoh_protocol::{
core::{key_expr::keyexpr, Encoding, WireExpr},
network::{
Expand All @@ -33,6 +34,89 @@ use zenoh_protocol::{
use zenoh_sync::get_mut_unchecked;
use zenoh_util::Timed;

#[allow(clippy::too_many_arguments)] // TODO refactor
pub(crate) fn declare_qabl_interest(
hat_code: &(dyn HatTrait + Send + Sync),
tables: &TablesLock,
face: &mut Arc<FaceState>,
id: InterestId,
expr: Option<&WireExpr>,
current: bool,
future: bool,
aggregate: bool,
) {
if let Some(expr) = expr {
let rtables = zread!(tables.tables);
match rtables
.get_mapping(face, &expr.scope, expr.mapping)
.cloned()
{
Some(mut prefix) => {
log::debug!(
"{} Declare qabl interest {} ({}{})",
face,
id,
prefix.expr(),
expr.suffix
);
let res = Resource::get_resource(&prefix, &expr.suffix);
let (mut res, mut wtables) = if res
.as_ref()
.map(|r| r.context.is_some())
.unwrap_or(false)
{
drop(rtables);
let wtables = zwrite!(tables.tables);
(res.unwrap(), wtables)
} else {
let mut fullexpr = prefix.expr();
fullexpr.push_str(expr.suffix.as_ref());
let mut matches = keyexpr::new(fullexpr.as_str())
.map(|ke| Resource::get_matches(&rtables, ke))
.unwrap_or_default();
drop(rtables);
let mut wtables = zwrite!(tables.tables);
let mut res =
Resource::make_resource(&mut wtables, &mut prefix, expr.suffix.as_ref());
matches.push(Arc::downgrade(&res));
Resource::match_resource(&wtables, &mut res, matches);
(res, wtables)
};

hat_code.declare_qabl_interest(
&mut wtables,
face,
id,
Some(&mut res),
current,
future,
aggregate,
);
}
None => log::error!(
"{} Declare qabl interest {} for unknown scope {}!",
face,
id,
expr.scope
),
}
} else {
let mut wtables = zwrite!(tables.tables);
hat_code.declare_qabl_interest(&mut wtables, face, id, None, current, future, aggregate);
}
}

pub(crate) fn undeclare_qabl_interest(
hat_code: &(dyn HatTrait + Send + Sync),
tables: &TablesLock,
face: &mut Arc<FaceState>,
id: InterestId,
) {
log::debug!("{} Undeclare qabl interest {}", face, id,);
let mut wtables = zwrite!(tables.tables);
hat_code.undeclare_qabl_interest(&mut wtables, face, id);
}

pub(crate) struct Query {
src_face: Arc<FaceState>,
src_qid: RequestId,
Expand Down
Loading
Loading