diff --git a/Cargo.lock b/Cargo.lock index d9eae88f3e..04348c0dde 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -942,7 +942,7 @@ dependencies = [ "clap", "criterion-plot", "is-terminal", - "itertools", + "itertools 0.10.5", "num-traits", "once_cell", "oorandom", @@ -963,7 +963,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" dependencies = [ "cast", - "itertools", + "itertools 0.10.5", ] [[package]] @@ -2015,6 +2015,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.9" @@ -2992,7 +3001,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" dependencies = [ "anyhow", - "itertools", + "itertools 0.10.5", "proc-macro2", "quote", "syn 1.0.109", @@ -3005,7 +3014,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", - "itertools", + "itertools 0.10.5", "proc-macro2", "quote", "syn 2.0.52", @@ -5388,6 +5397,7 @@ dependencies = [ "form_urlencoded", "futures", "git-version", + "itertools 0.13.0", "lazy_static", "once_cell", "ordered-float", diff --git a/Cargo.toml b/Cargo.toml index b1d5f4bf37..254cdc19b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -107,6 +107,7 @@ hmac = { version = "0.12.1", features = ["std"] } home = "0.5.4" http-types = "2.12.0" humantime = "2.1.0" +itertools = "0.13.0" json5 = "0.4.1" jsonschema = { version = "0.18.0", default-features = false } keyed-set = "1.0.0" diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index f6e10f77ca..33c9b3acdd 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -186,27 +186,52 @@ // }, // ], - // /// configure access control (ACL) rules + // /// Configure access control (ACL) rules // access_control: { - // ///[true/false] acl will be activated only if this is set to true + // /// [true/false] acl will be activated only if this is set to true // "enabled": false, - // ///[deny/allow] default permission is deny (even if this is left empty or not specified) + // /// [deny/allow] default permission is deny (even if this is left empty or not specified) // "default_permission": "deny", - // ///rule set for permissions allowing or denying access to key-expressions + // /// Rule set for permissions allowing or denying access to key-expressions // "rules": // [ // { - // "actions": [ - // "put", "get", "declare_subscriber", "declare_queryable" + // /// Id has to be unique within the rule set + // "id": "rule1", + // "messages": [ + // "put", "query", "declare_subscriber", "declare_queryable" // ], // "flows":["egress","ingress"], // "permission": "allow", // "key_exprs": [ // "test/demo" // ], + // }, + // { + // "id": "rule2", + // "messages": [ + // "put", "query", "declare_subscriber", "declare_queryable" + // ], + // "flows":["ingress"], + // "permission": "allow", + // "key_exprs": [ + // "**" + // ], + // }, + // ], + // /// List of combinations of subjects. + // /// + // /// If a subject property (i.e. username, certificate common name or interface) is empty + // /// it is interpreted as a wildcard. Moreover, a subject property cannot be an empty list. + // "subjects": + // [ + // { + // /// Id has to be unique within the subjects list + // "id": "subject1", // /// Subjects can be interfaces // "interfaces": [ - // "lo0" + // "lo0", + // "en0", // ], // /// Subjects can be cert_common_names when using TLS or Quic // "cert_common_names": [ @@ -215,9 +240,43 @@ // /// Subjects can be usernames when using user/password authentication // "usernames": [ // "zenoh-example" - // ] + // ], + // /// This instance translates internally to this filter: + // /// (interface="lo0" && cert_common_name="example.zenoh.io" && username="zenoh-example") || + // /// (interface="en0" && cert_common_name="example.zenoh.io" && username="zenoh-example") // }, - // ] + // { + // "id": "subject2", + // "interfaces": [ + // "lo0", + // "en0", + // ], + // "cert_common_names": [ + // "example2.zenoh.io" + // ], + // /// This instance translates internally to this filter: + // /// (interface="lo0" && cert_common_name="example2.zenoh.io") || + // /// (interface="en0" && cert_common_name="example2.zenoh.io") + // }, + // { + // "id": "subject3", + // /// An empty subject combination is a wildcard + // }, + // ], + // /// The policies list associates rules to subjects + // "policies": + // [ + // /// Each policy associates one or multiple rules to one or multiple subject combinations + // { + // /// Rules and Subjects are identified with their unique IDs declared above + // "rules": ["rule1"], + // "subjects": ["subject1", "subject2"], + // }, + // { + // "rules": ["rule2"], + // "subjects": ["subject3"], + // }, + // ] //}, /// Configure internal transport parameters diff --git a/commons/zenoh-config/src/defaults.rs b/commons/zenoh-config/src/defaults.rs index bbb03a7eff..810e0931e2 100644 --- a/commons/zenoh-config/src/defaults.rs +++ b/commons/zenoh-config/src/defaults.rs @@ -257,6 +257,8 @@ impl Default for AclConfig { enabled: false, default_permission: Permission::Deny, rules: None, + subjects: None, + policies: None, } } } diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index fec1a1cf8d..270cf950c3 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -104,40 +104,70 @@ pub struct DownsamplingItemConf { } #[derive(Serialize, Debug, Deserialize, Clone)] -pub struct AclConfigRules { - pub interfaces: Option>, - pub cert_common_names: Option>, - pub usernames: Option>, +pub struct AclConfigRule { + pub id: String, pub key_exprs: Vec, - pub actions: Vec, + pub messages: Vec, pub flows: Option>, pub permission: Permission, } +#[derive(Serialize, Debug, Deserialize, Clone)] +pub struct AclConfigSubjects { + pub id: String, + pub interfaces: Option>, + pub cert_common_names: Option>, + pub usernames: Option>, +} + +#[derive(Serialize, Debug, Deserialize, Clone, PartialEq, Eq, Hash)] +pub struct Interface(pub String); + +impl std::fmt::Display for Interface { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Interface({})", self.0) + } +} + +#[derive(Serialize, Debug, Deserialize, Clone, PartialEq, Eq, Hash)] +pub struct CertCommonName(pub String); + +impl std::fmt::Display for CertCommonName { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "CertCommonName({})", self.0) + } +} + +#[derive(Serialize, Debug, Deserialize, Clone, PartialEq, Eq, Hash)] +pub struct Username(pub String); + +impl std::fmt::Display for Username { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Username({})", self.0) + } +} + +#[derive(Serialize, Debug, Deserialize, Clone, PartialEq, Eq, Hash)] +pub struct AclConfigPolicyEntry { + pub rules: Vec, + pub subjects: Vec, +} + #[derive(Clone, Serialize, Debug, Deserialize)] pub struct PolicyRule { - pub subject: Subject, + pub subject_id: usize, pub key_expr: String, - pub action: Action, + pub message: AclMessage, pub permission: Permission, pub flow: InterceptorFlow, } -#[derive(Serialize, Debug, Deserialize, Eq, PartialEq, Hash, Clone)] -#[serde(untagged)] -#[serde(rename_all = "snake_case")] -pub enum Subject { - Interface(String), - CertCommonName(String), - Username(String), -} - #[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, Hash, PartialEq)] #[serde(rename_all = "snake_case")] -pub enum Action { +pub enum AclMessage { Put, DeclareSubscriber, - Get, + Query, DeclareQueryable, } @@ -505,7 +535,9 @@ validated_struct::validator! { pub access_control: AclConfig { pub enabled: bool, pub default_permission: Permission, - pub rules: Option> + pub rules: Option>, + pub subjects: Option>, + pub policies: Option>, }, /// A list of directories where plugins may be searched for if no `__path__` was specified for them. diff --git a/zenoh/Cargo.toml b/zenoh/Cargo.toml index d0ac151c01..605efd16a0 100644 --- a/zenoh/Cargo.toml +++ b/zenoh/Cargo.toml @@ -77,6 +77,7 @@ flume = { workspace = true } form_urlencoded = { workspace = true } futures = { workspace = true } git-version = { workspace = true } +itertools = { workspace = true } lazy_static = { workspace = true } tracing = { workspace = true } ordered-float = { workspace = true } diff --git a/zenoh/src/net/routing/interceptor/access_control.rs b/zenoh/src/net/routing/interceptor/access_control.rs index 1e95104967..9e749e1258 100644 --- a/zenoh/src/net/routing/interceptor/access_control.rs +++ b/zenoh/src/net/routing/interceptor/access_control.rs @@ -18,9 +18,12 @@ //! //! [Click here for Zenoh's documentation](../zenoh/index.html) -use std::{any::Any, sync::Arc}; +use std::{any::Any, collections::HashSet, iter, sync::Arc}; -use zenoh_config::{AclConfig, Action, InterceptorFlow, Permission, Subject}; +use itertools::Itertools; +use zenoh_config::{ + AclConfig, AclMessage, CertCommonName, InterceptorFlow, Interface, Permission, Username, +}; use zenoh_protocol::{ core::ZenohIdProto, network::{Declare, DeclareBody, NetworkBody, NetworkMessage, Push, Request}, @@ -36,11 +39,14 @@ use super::{ authorization::PolicyEnforcer, EgressInterceptor, IngressInterceptor, InterceptorFactory, InterceptorFactoryTrait, InterceptorTrait, }; -use crate::{api::key_expr::KeyExpr, net::routing::RoutingContext}; +use crate::{ + api::key_expr::KeyExpr, + net::routing::{interceptor::authorization::SubjectQuery, RoutingContext}, +}; pub struct AclEnforcer { enforcer: Arc, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct AuthSubject { id: usize, name: String, @@ -86,73 +92,112 @@ impl InterceptorFactoryTrait for AclEnforcer { &self, transport: &TransportUnicast, ) -> (Option, Option) { - let mut authn_ids = vec![]; - if let Ok(ids) = transport.get_auth_ids() { - for auth_id in ids { - match auth_id { - AuthId::CertCommonName(name) => { - let subject = &Subject::CertCommonName(name.clone()); - if let Some(val) = self.enforcer.subject_map.get(subject) { - authn_ids.push(AuthSubject { id: *val, name }); - } - } - AuthId::Username(name) => { - let subject = &Subject::Username(name.clone()); - if let Some(val) = self.enforcer.subject_map.get(subject) { - authn_ids.push(AuthSubject { id: *val, name }); - } - } - AuthId::None => {} - } + let auth_ids = match transport.get_auth_ids() { + Ok(auth_ids) => auth_ids, + Err(err) => { + tracing::error!("Couldn't get Transport Auth IDs: {}", err); + return (None, None); } - } - match transport.get_zid() { - Ok(zid) => { - match transport.get_links() { - Ok(links) => { - for link in links { - for face in link.interfaces { - let subject = &Subject::Interface(face.clone()); - if let Some(val) = self.enforcer.subject_map.get(subject) { - authn_ids.push(AuthSubject { - id: *val, - name: face, - }); - } - } - } - } - Err(e) => { - tracing::error!("Couldn't get interface list with error: {}", e); + }; + + let mut cert_common_names = Vec::new(); + let mut username = None; + + for auth_id in auth_ids { + match auth_id { + AuthId::CertCommonName(value) => { + cert_common_names.push(Some(CertCommonName(value))); + } + AuthId::Username(value) => { + if username.is_some() { + tracing::error!("Transport should not report more than one username"); return (None, None); } + username = Some(Username(value)); } - let ingress_interceptor = Box::new(IngressAclEnforcer { - policy_enforcer: self.enforcer.clone(), - zid, - subject: authn_ids.clone(), - }); - let egress_interceptor = Box::new(EgressAclEnforcer { - policy_enforcer: self.enforcer.clone(), - zid, - subject: authn_ids, + AuthId::None => {} + } + } + if cert_common_names.is_empty() { + cert_common_names.push(None); + } + + let links = match transport.get_links() { + Ok(links) => links, + Err(err) => { + tracing::error!("Couldn't get Transport links: {}", err); + return (None, None); + } + }; + let mut interfaces = links + .into_iter() + .flat_map(|link| { + link.interfaces + .into_iter() + .map(|interface| Some(Interface(interface))) + }) + .collect::>(); + if interfaces.is_empty() { + interfaces.push(None); + } else if interfaces.len() > 1 { + tracing::warn!("Transport returned multiple network interfaces, current ACL logic might incorrectly apply filters in this case!"); + } + + let mut auth_subjects = HashSet::new(); + + for ((username, interface), cert_common_name) in iter::once(username) + .cartesian_product(interfaces.into_iter()) + .cartesian_product(cert_common_names.into_iter()) + { + let query = SubjectQuery { + interface, + cert_common_name, + username, + }; + + if let Some(entry) = self.enforcer.subject_store.query(&query) { + auth_subjects.insert(AuthSubject { + id: entry.id, + name: format!("{query}"), }); - ( - self.enforcer - .interface_enabled - .ingress - .then_some(ingress_interceptor), - self.enforcer - .interface_enabled - .egress - .then_some(egress_interceptor), - ) } - Err(e) => { - tracing::error!("Failed to get zid with error :{}", e); - (None, None) + } + + let zid = match transport.get_zid() { + Ok(zid) => zid, + Err(err) => { + tracing::error!("Couldn't get Transport zid: {}", err); + return (None, None); } + }; + // FIXME: Investigate if `AuthSubject` can have duplicates above and try to avoid this conversion + let auth_subjects = auth_subjects.into_iter().collect::>(); + if auth_subjects.is_empty() { + tracing::info!( + "{zid} did not match any configured ACL subject. Default permission `{:?}` will be applied on all messages", + self.enforcer.default_permission + ); } + let ingress_interceptor = Box::new(IngressAclEnforcer { + policy_enforcer: self.enforcer.clone(), + zid, + subject: auth_subjects.clone(), + }); + let egress_interceptor = Box::new(EgressAclEnforcer { + policy_enforcer: self.enforcer.clone(), + zid, + subject: auth_subjects, + }); + ( + self.enforcer + .interface_enabled + .ingress + .then_some(ingress_interceptor), + self.enforcer + .interface_enabled + .egress + .then_some(egress_interceptor), + ) } fn new_transport_multicast( @@ -194,7 +239,7 @@ impl InterceptorTrait for IngressAclEnforcer { payload: PushBody::Put(_), .. }) => { - if self.action(Action::Put, "Put (ingress)", key_expr?) == Permission::Deny { + if self.action(AclMessage::Put, "Put (ingress)", key_expr?) == Permission::Deny { return None; } } @@ -202,7 +247,8 @@ impl InterceptorTrait for IngressAclEnforcer { payload: RequestBody::Query(_), .. }) => { - if self.action(Action::Get, "Get (ingress)", key_expr?) == Permission::Deny { + if self.action(AclMessage::Query, "Query (ingress)", key_expr?) == Permission::Deny + { return None; } } @@ -211,7 +257,7 @@ impl InterceptorTrait for IngressAclEnforcer { .. }) => { if self.action( - Action::DeclareSubscriber, + AclMessage::DeclareSubscriber, "Declare Subscriber (ingress)", key_expr?, ) == Permission::Deny @@ -224,7 +270,7 @@ impl InterceptorTrait for IngressAclEnforcer { .. }) => { if self.action( - Action::DeclareQueryable, + AclMessage::DeclareQueryable, "Declare Queryable (ingress)", key_expr?, ) == Permission::Deny @@ -263,7 +309,7 @@ impl InterceptorTrait for EgressAclEnforcer { payload: PushBody::Put(_), .. }) => { - if self.action(Action::Put, "Put (egress)", key_expr?) == Permission::Deny { + if self.action(AclMessage::Put, "Put (egress)", key_expr?) == Permission::Deny { return None; } } @@ -271,7 +317,7 @@ impl InterceptorTrait for EgressAclEnforcer { payload: RequestBody::Query(_), .. }) => { - if self.action(Action::Get, "Get (egress)", key_expr?) == Permission::Deny { + if self.action(AclMessage::Query, "Query (egress)", key_expr?) == Permission::Deny { return None; } } @@ -280,7 +326,7 @@ impl InterceptorTrait for EgressAclEnforcer { .. }) => { if self.action( - Action::DeclareSubscriber, + AclMessage::DeclareSubscriber, "Declare Subscriber (egress)", key_expr?, ) == Permission::Deny @@ -293,7 +339,7 @@ impl InterceptorTrait for EgressAclEnforcer { .. }) => { if self.action( - Action::DeclareQueryable, + AclMessage::DeclareQueryable, "Declare Queryable (egress)", key_expr?, ) == Permission::Deny @@ -311,7 +357,7 @@ pub trait AclActionMethods { fn zid(&self) -> ZenohIdProto; fn flow(&self) -> InterceptorFlow; fn authn_ids(&self) -> Vec; - fn action(&self, action: Action, log_msg: &str, key_expr: &str) -> Permission { + fn action(&self, action: AclMessage, log_msg: &str, key_expr: &str) -> Permission { let policy_enforcer = self.policy_enforcer(); let authn_ids: Vec = self.authn_ids(); let zid = self.zid(); diff --git a/zenoh/src/net/routing/interceptor/authorization.rs b/zenoh/src/net/routing/interceptor/authorization.rs index 283a02248b..8b8789fc3b 100644 --- a/zenoh/src/net/routing/interceptor/authorization.rs +++ b/zenoh/src/net/routing/interceptor/authorization.rs @@ -17,22 +17,146 @@ //! This module is intended for Zenoh's internal use. //! //! [Click here for Zenoh's documentation](../zenoh/index.html) -use std::{collections::HashMap, net::Ipv4Addr}; +use std::collections::HashMap; use ahash::RandomState; +use itertools::Itertools; use zenoh_config::{ - AclConfig, AclConfigRules, Action, InterceptorFlow, Permission, PolicyRule, Subject, + AclConfig, AclConfigPolicyEntry, AclConfigRule, AclConfigSubjects, AclMessage, CertCommonName, + InterceptorFlow, Interface, Permission, PolicyRule, Username, }; use zenoh_keyexpr::{ keyexpr, keyexpr_tree::{IKeyExprTree, IKeyExprTreeMut, KeBoxTree}, }; use zenoh_result::ZResult; -use zenoh_util::net::get_interface_names_by_addr; type PolicyForSubject = FlowPolicy; type PolicyMap = HashMap; -type SubjectMap = HashMap; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub(crate) struct Subject { + pub(crate) interface: SubjectProperty, + pub(crate) cert_common_name: SubjectProperty, + pub(crate) username: SubjectProperty, +} + +impl Subject { + fn matches(&self, query: &SubjectQuery) -> bool { + self.interface.matches(query.interface.as_ref()) + && self.username.matches(query.username.as_ref()) + && self + .cert_common_name + .matches(query.cert_common_name.as_ref()) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub(crate) enum SubjectProperty { + Wildcard, + Exactly(T), +} + +impl SubjectProperty { + fn matches(&self, other: Option<&T>) -> bool { + match (self, other) { + (SubjectProperty::Wildcard, None) => true, + // NOTE: This match arm is the reason why `SubjectProperty` cannot simply be `Option` + (SubjectProperty::Wildcard, Some(_)) => true, + (SubjectProperty::Exactly(_), None) => false, + (SubjectProperty::Exactly(lhs), Some(rhs)) => lhs == rhs, + } + } +} + +#[derive(Debug)] +pub(crate) struct SubjectQuery { + pub(crate) interface: Option, + pub(crate) cert_common_name: Option, + pub(crate) username: Option, +} + +impl std::fmt::Display for SubjectQuery { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let subject_names = [ + self.interface.as_ref().map(|face| format!("{face}")), + self.cert_common_name.as_ref().map(|ccn| format!("{ccn}")), + self.username.as_ref().map(|username| format!("{username}")), + ]; + write!( + f, + "{}", + subject_names + .iter() + .filter_map(|v| v.as_ref()) + .cloned() + .collect::>() + .join("+") + ) + } +} + +#[derive(Debug, Clone)] +pub(crate) struct SubjectEntry { + pub(crate) subject: Subject, + pub(crate) id: usize, +} + +#[derive(Debug, Clone)] +pub(crate) struct SubjectStore { + inner: Vec, +} + +impl SubjectStore { + pub(crate) fn query(&self, query: &SubjectQuery) -> Option<&SubjectEntry> { + // FIXME: Can this search be better than linear? + self.inner.iter().find(|entry| entry.subject.matches(query)) + } +} + +impl Default for SubjectStore { + fn default() -> Self { + SubjectMapBuilder::new().build() + } +} + +pub(crate) struct SubjectMapBuilder { + builder: HashMap, + id_counter: usize, +} + +impl SubjectMapBuilder { + pub(crate) fn new() -> Self { + Self { + // FIXME: Capacity can be calculated from the length of subject properties in configuration + builder: HashMap::new(), + id_counter: 0, + } + } + + pub(crate) fn build(self) -> SubjectStore { + SubjectStore { + inner: self + .builder + .into_iter() + .map(|(subject, id)| SubjectEntry { subject, id }) + .collect(), + } + } + + /// Assumes subject contains at most one instance of each Subject variant + pub(crate) fn insert_or_get(&mut self, subject: Subject) -> usize { + match self.builder.get(&subject).copied() { + Some(id) => id, + None => { + self.id_counter += 1; + self.builder.insert(subject, self.id_counter); + self.id_counter + } + } + } +} + type KeTreeRule = KeBoxTree; #[derive(Default)] @@ -58,27 +182,27 @@ impl PermissionPolicy { } #[derive(Default)] struct ActionPolicy { - get: PermissionPolicy, + query: PermissionPolicy, put: PermissionPolicy, declare_subscriber: PermissionPolicy, declare_queryable: PermissionPolicy, } impl ActionPolicy { - fn action(&self, action: Action) -> &PermissionPolicy { + fn action(&self, action: AclMessage) -> &PermissionPolicy { match action { - Action::Get => &self.get, - Action::Put => &self.put, - Action::DeclareSubscriber => &self.declare_subscriber, - Action::DeclareQueryable => &self.declare_queryable, + AclMessage::Query => &self.query, + AclMessage::Put => &self.put, + AclMessage::DeclareSubscriber => &self.declare_subscriber, + AclMessage::DeclareQueryable => &self.declare_queryable, } } - fn action_mut(&mut self, action: Action) -> &mut PermissionPolicy { + fn action_mut(&mut self, action: AclMessage) -> &mut PermissionPolicy { match action { - Action::Get => &mut self.get, - Action::Put => &mut self.put, - Action::DeclareSubscriber => &mut self.declare_subscriber, - Action::DeclareQueryable => &mut self.declare_queryable, + AclMessage::Query => &mut self.query, + AclMessage::Put => &mut self.put, + AclMessage::DeclareSubscriber => &mut self.declare_subscriber, + AclMessage::DeclareQueryable => &mut self.declare_queryable, } } } @@ -113,14 +237,14 @@ pub struct InterfaceEnabled { pub struct PolicyEnforcer { pub(crate) acl_enabled: bool, pub(crate) default_permission: Permission, - pub(crate) subject_map: SubjectMap, + pub(crate) subject_store: SubjectStore, pub(crate) policy_map: PolicyMap, pub(crate) interface_enabled: InterfaceEnabled, } #[derive(Debug, Clone)] pub struct PolicyInformation { - subject_map: SubjectMap, + subject_map: SubjectStore, policy_rules: Vec, } @@ -129,7 +253,7 @@ impl PolicyEnforcer { PolicyEnforcer { acl_enabled: true, default_permission: Permission::Deny, - subject_map: SubjectMap::default(), + subject_store: SubjectStore::default(), policy_map: PolicyMap::default(), interface_enabled: InterfaceEnabled::default(), } @@ -143,11 +267,23 @@ impl PolicyEnforcer { self.acl_enabled = mut_acl_config.enabled; self.default_permission = mut_acl_config.default_permission; if self.acl_enabled { - if let Some(mut rules) = mut_acl_config.rules { - if rules.is_empty() { - tracing::warn!("Access control rules are empty in config file"); + if let (Some(mut rules), Some(mut subjects), Some(policies)) = ( + mut_acl_config.rules, + mut_acl_config.subjects, + mut_acl_config.policies, + ) { + if rules.is_empty() || subjects.is_empty() || policies.is_empty() { + rules.is_empty().then(|| { + tracing::warn!("Access control rules list is empty in config file") + }); + subjects.is_empty().then(|| { + tracing::warn!("Access control subjects list is empty in config file") + }); + policies.is_empty().then(|| { + tracing::warn!("Access control policies list is empty in config file") + }); self.policy_map = PolicyMap::default(); - self.subject_map = SubjectMap::default(); + self.subject_store = SubjectStore::default(); if self.default_permission == Permission::Deny { self.interface_enabled = InterfaceEnabled { ingress: true, @@ -156,59 +292,71 @@ impl PolicyEnforcer { } } else { // check for undefined values in rules and initialize them to defaults - for (rule_offset, rule) in rules.iter_mut().enumerate() { - if rule.interfaces.is_none() { - tracing::warn!("ACL config interfaces list is empty. Applying rule #{} to all network interfaces", rule_offset); - rule.interfaces = - Some(get_interface_names_by_addr(Ipv4Addr::UNSPECIFIED.into())?); + for rule in rules.iter_mut() { + if rule.id.trim().is_empty() { + bail!("Found empty rule id in rules list"); } if rule.flows.is_none() { - tracing::warn!("ACL config flows list is empty. Applying rule #{} to both Ingress and Egress flows", rule_offset); + tracing::warn!("Rule '{}' flows list is not set. Setting it to both Ingress and Egress", rule.id); rule.flows = Some([InterceptorFlow::Ingress, InterceptorFlow::Egress].into()); } - if rule.usernames.is_none() { - rule.usernames = Some(Vec::new()); + } + // check for undefined values in subjects and initialize them to defaults + for subject in subjects.iter_mut() { + if subject.id.trim().is_empty() { + bail!("Found empty subject id in subjects list"); + } + + if subject + .cert_common_names + .as_ref() + .is_some_and(Vec::is_empty) + { + bail!("Subject property `cert_common_names` cannot be empty"); + } + + if subject.usernames.as_ref().is_some_and(Vec::is_empty) { + bail!("Subject property `usernames` cannot be empty"); } - if rule.cert_common_names.is_none() { - rule.cert_common_names = Some(Vec::new()); + + if subject.interfaces.as_ref().is_some_and(Vec::is_empty) { + bail!("Subject property `interfaces` cannot be empty"); } } - let policy_information = self.policy_information_point(&rules)?; - let subject_map = policy_information.subject_map; - let mut main_policy: PolicyMap = PolicyMap::default(); + let policy_information = + self.policy_information_point(subjects, rules, policies)?; + let mut main_policy: PolicyMap = PolicyMap::default(); for rule in policy_information.policy_rules { - if let Some(index) = subject_map.get(&rule.subject) { - let single_policy = main_policy.entry(*index).or_default(); - single_policy - .flow_mut(rule.flow) - .action_mut(rule.action) - .permission_mut(rule.permission) - .insert(keyexpr::new(&rule.key_expr)?, true); - - if self.default_permission == Permission::Deny { - self.interface_enabled = InterfaceEnabled { - ingress: true, - egress: true, - }; - } else { - match rule.flow { - InterceptorFlow::Ingress => { - self.interface_enabled.ingress = true; - } - InterceptorFlow::Egress => { - self.interface_enabled.egress = true; - } + let subject_policy = main_policy.entry(rule.subject_id).or_default(); + subject_policy + .flow_mut(rule.flow) + .action_mut(rule.message) + .permission_mut(rule.permission) + .insert(keyexpr::new(&rule.key_expr)?, true); + + if self.default_permission == Permission::Deny { + self.interface_enabled = InterfaceEnabled { + ingress: true, + egress: true, + }; + } else { + match rule.flow { + InterceptorFlow::Ingress => { + self.interface_enabled.ingress = true; + } + InterceptorFlow::Egress => { + self.interface_enabled.egress = true; } } - }; + } } self.policy_map = main_policy; - self.subject_map = subject_map; + self.subject_store = policy_information.subject_map; } } else { - tracing::warn!("Access control rules are empty in config file"); + bail!("All ACL rules/subjects/policies config lists must be provided"); } } Ok(()) @@ -219,14 +367,27 @@ impl PolicyEnforcer { */ pub fn policy_information_point( &self, - config_rule_set: &Vec, + subjects: Vec, + rules: Vec, + policies: Vec, ) -> ZResult { let mut policy_rules: Vec = Vec::new(); - for config_rule in config_rule_set { + let mut rule_map = HashMap::new(); + let mut subject_id_map = HashMap::>::new(); + let mut subject_map_builder = SubjectMapBuilder::new(); + + // validate rules config and insert them in hashmaps + for config_rule in rules { + if rule_map.contains_key(&config_rule.id) { + bail!( + "Rule id must be unique: id '{}' is repeated", + config_rule.id + ); + } // Config validation let mut validation_err = String::new(); - if config_rule.actions.is_empty() { - validation_err.push_str("ACL config actions list is empty. "); + if config_rule.messages.is_empty() { + validation_err.push_str("ACL config messages list is empty. "); } if config_rule.flows.as_ref().unwrap().is_empty() { validation_err.push_str("ACL config flows list is empty. "); @@ -235,105 +396,163 @@ impl PolicyEnforcer { validation_err.push_str("ACL config key_exprs list is empty. "); } if !validation_err.is_empty() { - bail!("{}", validation_err); + bail!("Rule '{}' is malformed: {}", config_rule.id, validation_err); } + for key_expr in config_rule.key_exprs.iter() { + if key_expr.trim().is_empty() { + bail!("Found empty key expression in rule '{}'", config_rule.id); + } + } + rule_map.insert(config_rule.id.clone(), config_rule); + } - // At least one must not be empty - let mut subject_validation_err: usize = 0; - validation_err = String::new(); - - if config_rule.interfaces.as_ref().unwrap().is_empty() { - subject_validation_err += 1; - validation_err.push_str("ACL config interfaces list is empty. "); + for config_subject in subjects.into_iter() { + if subject_id_map.contains_key(&config_subject.id) { + bail!( + "Subject id must be unique: id '{}' is repeated", + config_subject.id + ); } - if config_rule.cert_common_names.as_ref().unwrap().is_empty() { - subject_validation_err += 1; - validation_err.push_str("ACL config certificate common names list is empty. "); + // validate subject config fields + if config_subject + .interfaces + .as_ref() + .is_some_and(|interfaces| interfaces.iter().any(|face| face.0.trim().is_empty())) + { + bail!( + "Found empty interface value in subject '{}'", + config_subject.id + ); } - if config_rule.usernames.as_ref().unwrap().is_empty() { - subject_validation_err += 1; - validation_err.push_str("ACL config usernames list is empty. "); + if config_subject + .cert_common_names + .as_ref() + .is_some_and(|cert_common_names| { + cert_common_names.iter().any(|ccn| ccn.0.trim().is_empty()) + }) + { + bail!( + "Found empty cert_common_name value in subject '{}'", + config_subject.id + ); } - - if subject_validation_err == 3 { - bail!("{}", validation_err); + if config_subject.usernames.as_ref().is_some_and(|usernames| { + usernames + .iter() + .any(|username| username.0.trim().is_empty()) + }) { + bail!( + "Found empty username value in subject '{}'", + config_subject.id + ); } + // Map properties to SubjectProperty type + // FIXME: Unnecessary .collect() because of different iterator types + let interfaces = config_subject + .interfaces + .map(|interfaces| { + interfaces + .into_iter() + .map(SubjectProperty::Exactly) + .collect::>() + }) + .unwrap_or(vec![SubjectProperty::Wildcard]); + // FIXME: Unnecessary .collect() because of different iterator types + let cert_common_names = config_subject + .cert_common_names + .map(|cert_common_names| { + cert_common_names + .into_iter() + .map(SubjectProperty::Exactly) + .collect::>() + }) + .unwrap_or(vec![SubjectProperty::Wildcard]); + // FIXME: Unnecessary .collect() because of different iterator types + let usernames = config_subject + .usernames + .map(|usernames| { + usernames + .into_iter() + .map(SubjectProperty::Exactly) + .collect::>() + }) + .unwrap_or(vec![SubjectProperty::Wildcard]); - for subject in config_rule.interfaces.as_ref().unwrap() { - if subject.trim().is_empty() { - bail!("found an empty interface value in interfaces list"); - } - for flow in config_rule.flows.as_ref().unwrap() { - for action in &config_rule.actions { - for key_expr in &config_rule.key_exprs { - if key_expr.trim().is_empty() { - bail!("found an empty key-expression value in key_exprs list"); - } - policy_rules.push(PolicyRule { - subject: Subject::Interface(subject.clone()), - key_expr: key_expr.clone(), - action: *action, - permission: config_rule.permission, - flow: *flow, - }) - } - } - } + // create ACL subject combinations + let subject_combination_ids = interfaces + .into_iter() + .cartesian_product(cert_common_names) + .cartesian_product(usernames) + .map(|((interface, cert_common_name), username)| { + let subject = Subject { + interface, + cert_common_name, + username, + }; + subject_map_builder.insert_or_get(subject) + }) + .collect(); + subject_id_map.insert(config_subject.id.clone(), subject_combination_ids); + } + // finally, handle policy content + for (entry_id, entry) in policies.iter().enumerate() { + // validate policy config lists + if entry.rules.is_empty() || entry.subjects.is_empty() { + bail!( + "Policy #{} is malformed: empty subjects or rules list", + entry_id + ); } - for subject in config_rule.cert_common_names.as_ref().unwrap() { - if subject.trim().is_empty() { - bail!("found an empty value in certificate common names list"); + for subject_config_id in &entry.subjects { + if subject_config_id.trim().is_empty() { + bail!("Found empty subject id in policy #{}", entry_id) } - for flow in config_rule.flows.as_ref().unwrap() { - for action in &config_rule.actions { - for key_expr in &config_rule.key_exprs { - if key_expr.trim().is_empty() { - bail!("found an empty key-expression value in key_exprs list"); - } - policy_rules.push(PolicyRule { - subject: Subject::CertCommonName(subject.clone()), - key_expr: key_expr.clone(), - action: *action, - permission: config_rule.permission, - flow: *flow, - }) - } - } + if !subject_id_map.contains_key(subject_config_id) { + bail!( + "Subject '{}' in policy #{} does not exist in subjects list", + subject_config_id, + entry_id + ) } } - for subject in config_rule.usernames.as_ref().unwrap() { - if subject.trim().is_empty() { - bail!("found an empty value in usernames list"); + // Create PolicyRules + for rule_id in &entry.rules { + if rule_id.trim().is_empty() { + bail!("Found empty rule id in policy #{}", entry_id) } - for flow in config_rule.flows.as_ref().unwrap() { - for action in &config_rule.actions { - for key_expr in &config_rule.key_exprs { - if key_expr.trim().is_empty() { - bail!("found an empty key-expression value in key_exprs list"); + let rule = rule_map.get(rule_id).ok_or(zerror!( + "Rule '{}' in policy #{} does not exist in rules list", + rule_id, + entry_id + ))?; + for subject_config_id in &entry.subjects { + let subject_combination_ids = subject_id_map + .get(subject_config_id) + .expect("config subject id should exist in subject_id_map"); + for subject_id in subject_combination_ids { + for flow in rule + .flows + .as_ref() + .expect("flows list should be defined in rule") + { + for message in &rule.messages { + for key_expr in &rule.key_exprs { + policy_rules.push(PolicyRule { + subject_id: *subject_id, + key_expr: key_expr.clone(), + message: *message, + permission: rule.permission, + flow: *flow, + }); + } } - policy_rules.push(PolicyRule { - subject: Subject::Username(subject.clone()), - key_expr: key_expr.clone(), - action: *action, - permission: config_rule.permission, - flow: *flow, - }) } } } } } - let mut subject_map = SubjectMap::default(); - let mut counter = 1; - // Starting at 1 since 0 is the init value and should not match anything - for rule in policy_rules.iter() { - if !subject_map.contains_key(&rule.subject) { - subject_map.insert(rule.subject.clone(), counter); - counter += 1; - } - } Ok(PolicyInformation { - subject_map, + subject_map: subject_map_builder.build(), policy_rules, }) } @@ -345,7 +564,7 @@ impl PolicyEnforcer { &self, subject: usize, flow: InterceptorFlow, - action: Action, + message: AclMessage, key_expr: &str, ) -> ZResult { let policy_map = &self.policy_map; @@ -356,7 +575,7 @@ impl PolicyEnforcer { Some(single_policy) => { let deny_result = single_policy .flow(flow) - .action(action) + .action(message) .deny .nodes_including(keyexpr::new(&key_expr)?) .count(); @@ -368,7 +587,7 @@ impl PolicyEnforcer { } else { let allow_result = single_policy .flow(flow) - .action(action) + .action(message) .allow .nodes_including(keyexpr::new(&key_expr)?) .count(); diff --git a/zenoh/tests/acl.rs b/zenoh/tests/acl.rs index d1790dc009..0a08090569 100644 --- a/zenoh/tests/acl.rs +++ b/zenoh/tests/acl.rs @@ -33,24 +33,29 @@ mod test { const VALUE: &str = "zenoh"; #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn test_acl() { + async fn test_acl_pub_sub() { zenoh::try_init_log_from_env(); - test_pub_sub_deny().await; - test_pub_sub_allow().await; - test_pub_sub_deny_then_allow().await; - test_pub_sub_allow_then_deny().await; - test_get_qbl_deny().await; - test_get_qbl_allow().await; - test_get_qbl_allow_then_deny().await; - test_get_qbl_deny_then_allow().await; + test_pub_sub_deny(27447).await; + test_pub_sub_allow(27447).await; + test_pub_sub_deny_then_allow(27447).await; + test_pub_sub_allow_then_deny(27447).await; } - async fn get_basic_router_config() -> Config { + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn test_acl_get_queryable() { + test_get_qbl_deny(27448).await; + test_get_qbl_allow(27448).await; + test_get_qbl_allow_then_deny(27448).await; + test_get_qbl_deny_then_allow(27448).await; + } + + async fn get_basic_router_config(port: u16) -> Config { let mut config = config::default(); config.set_mode(Some(WhatAmI::Router)).unwrap(); config .listen .endpoints - .set(vec!["tcp/127.0.0.1:27447".parse().unwrap()]) + .set(vec![format!("tcp/127.0.0.1:{port}").parse().unwrap()]) .unwrap(); config.scouting.multicast.set_enabled(Some(false)).unwrap(); config @@ -61,11 +66,11 @@ mod test { ztimeout!(s.close()).unwrap(); } - async fn get_client_sessions() -> (Session, Session) { + async fn get_client_sessions(port: u16) -> (Session, Session) { println!("Opening client sessions"); - let config = config::client(["tcp/127.0.0.1:27447".parse::().unwrap()]); + let config = config::client([format!("tcp/127.0.0.1:{port}").parse::().unwrap()]); let s01 = ztimeout!(zenoh::open(config)).unwrap(); - let config = config::client(["tcp/127.0.0.1:27447".parse::().unwrap()]); + let config = config::client([format!("tcp/127.0.0.1:{port}").parse::().unwrap()]); let s02 = ztimeout!(zenoh::open(config)).unwrap(); (s01, s02) } @@ -76,27 +81,27 @@ mod test { ztimeout!(s02.close()).unwrap(); } - async fn test_pub_sub_deny() { + async fn test_pub_sub_deny(port: u16) { println!("test_pub_sub_deny"); - let mut config_router = get_basic_router_config().await; + let mut config_router = get_basic_router_config(port).await; config_router .insert_json5( "access_control", r#"{ - "enabled": true, - "default_permission": "deny", - "rules": - [ - ] - }"#, + "enabled": true, + "default_permission": "deny", + "rules": [], + "subjects": [], + "policies": [], + }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (sub_session, pub_session) = get_client_sessions().await; + let (sub_session, pub_session) = get_client_sessions(port).await; { let publisher = pub_session.declare_publisher(KEY_EXPR).await.unwrap(); let received_value = Arc::new(Mutex::new(String::new())); @@ -120,27 +125,25 @@ mod test { close_router_session(session).await; } - async fn test_pub_sub_allow() { + async fn test_pub_sub_allow(port: u16) { println!("test_pub_sub_allow"); - let mut config_router = get_basic_router_config().await; + let mut config_router = get_basic_router_config(port).await; config_router .insert_json5( "access_control", r#"{ - - "enabled": false, - "default_permission": "allow", - "rules": - [ - ] - - }"#, + "enabled": true, + "default_permission": "allow", + "rules": [], + "subjects": [], + "policies": [], + }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (sub_session, pub_session) = get_client_sessions().await; + let (sub_session, pub_session) = get_client_sessions(port).await; { let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); let received_value = Arc::new(Mutex::new(String::new())); @@ -167,41 +170,51 @@ mod test { close_router_session(session).await; } - async fn test_pub_sub_allow_then_deny() { + async fn test_pub_sub_allow_then_deny(port: u16) { println!("test_pub_sub_allow_then_deny"); - let mut config_router = get_basic_router_config().await; + let mut config_router = get_basic_router_config(port).await; config_router .insert_json5( "access_control", - r#" - {"enabled": true, - "default_permission": "allow", - "rules": - [ - { - "permission": "deny", - "flows": ["egress"], - "actions": [ - "put", - "declare_subscriber" - ], - "key_exprs": [ - "test/demo" - ], - "interfaces": [ - "lo","lo0" - ] - }, - ] - } - "#, + r#"{ + "enabled": true, + "default_permission": "allow", + "rules": [ + { + "id": "r1", + "permission": "deny", + "flows": ["egress"], + "messages": [ + "put", + "declare_subscriber" + ], + "key_exprs": [ + "test/demo" + ], + }, + ], + "subjects": [ + { + "id": "s1", + "interfaces": [ + "lo", "lo0" + ], + } + ], + "policies": [ + { + "rules": ["r1"], + "subjects": ["s1"], + } + ] + }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (sub_session, pub_session) = get_client_sessions().await; + let (sub_session, pub_session) = get_client_sessions(port).await; { let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); let received_value = Arc::new(Mutex::new(String::new())); @@ -227,41 +240,51 @@ mod test { close_router_session(session).await; } - async fn test_pub_sub_deny_then_allow() { + async fn test_pub_sub_deny_then_allow(port: u16) { println!("test_pub_sub_deny_then_allow"); - let mut config_router = get_basic_router_config().await; + let mut config_router = get_basic_router_config(port).await; config_router .insert_json5( "access_control", - r#" - {"enabled": true, - "default_permission": "deny", - "rules": - [ - { - "permission": "allow", - "flows": ["egress","ingress"], - "actions": [ - "put", - "declare_subscriber" - ], - "key_exprs": [ - "test/demo" - ], - "interfaces": [ - "lo","lo0" - ] - }, - ] - } - "#, + r#"{ + "enabled": true, + "default_permission": "deny", + "rules": [ + { + "id": "r1", + "permission": "allow", + "flows": ["egress", "ingress"], + "messages": [ + "put", + "declare_subscriber" + ], + "key_exprs": [ + "test/demo" + ], + }, + ], + "subjects": [ + { + "id": "s1", + "interfaces": [ + "lo", "lo0" + ], + } + ], + "policies": [ + { + "rules": ["r1"], + "subjects": ["s1"], + } + ] + }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (sub_session, pub_session) = get_client_sessions().await; + let (sub_session, pub_session) = get_client_sessions(port).await; { let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); let received_value = Arc::new(Mutex::new(String::new())); @@ -287,27 +310,27 @@ mod test { close_router_session(session).await; } - async fn test_get_qbl_deny() { + async fn test_get_qbl_deny(port: u16) { println!("test_get_qbl_deny"); - let mut config_router = get_basic_router_config().await; + let mut config_router = get_basic_router_config(port).await; config_router .insert_json5( "access_control", r#"{ - "enabled": true, - "default_permission": "deny", - "rules": - [ - ] - }"#, + "enabled": true, + "default_permission": "deny", + "rules": [], + "subjects": [], + "policies": [], + }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (get_session, qbl_session) = get_client_sessions().await; + let (get_session, qbl_session) = get_client_sessions(port).await; { let mut received_value = String::new(); @@ -341,27 +364,27 @@ mod test { close_router_session(session).await; } - async fn test_get_qbl_allow() { + async fn test_get_qbl_allow(port: u16) { println!("test_get_qbl_allow"); - let mut config_router = get_basic_router_config().await; + let mut config_router = get_basic_router_config(port).await; config_router .insert_json5( "access_control", r#"{ - "enabled": true, - "default_permission": "allow", - "rules": - [ - ] - }"#, + "enabled": true, + "default_permission": "allow", + "rules": [], + "subjects": [], + "policies": [], + }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (get_session, qbl_session) = get_client_sessions().await; + let (get_session, qbl_session) = get_client_sessions(port).await; { let mut received_value = String::new(); @@ -395,34 +418,45 @@ mod test { close_router_session(session).await; } - async fn test_get_qbl_deny_then_allow() { + async fn test_get_qbl_deny_then_allow(port: u16) { println!("test_get_qbl_deny_then_allow"); - let mut config_router = get_basic_router_config().await; + let mut config_router = get_basic_router_config(port).await; config_router .insert_json5( "access_control", - r#" - {"enabled": true, - "default_permission": "deny", - "rules": - [ - { - "permission": "allow", - "flows": ["egress","ingress"], - "actions": [ - "get", - "declare_queryable"], - "key_exprs": [ - "test/demo" - ], - "interfaces": [ - "lo","lo0" - ] - }, - ] - } - "#, + r#"{ + "enabled": true, + "default_permission": "deny", + "rules": [ + { + "id": "r1", + "permission": "allow", + "flows": ["egress", "ingress"], + "messages": [ + "query", + "declare_queryable" + ], + "key_exprs": [ + "test/demo" + ], + }, + ], + "subjects": [ + { + "id": "s1", + "interfaces": [ + "lo", "lo0" + ], + } + ], + "policies": [ + { + "rules": ["r1"], + "subjects": ["s1"], + } + ] + }"#, ) .unwrap(); @@ -430,7 +464,7 @@ mod test { let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (get_session, qbl_session) = get_client_sessions().await; + let (get_session, qbl_session) = get_client_sessions(port).await; { let mut received_value = String::new(); @@ -464,41 +498,52 @@ mod test { close_router_session(session).await; } - async fn test_get_qbl_allow_then_deny() { + async fn test_get_qbl_allow_then_deny(port: u16) { println!("test_get_qbl_allow_then_deny"); - let mut config_router = get_basic_router_config().await; + let mut config_router = get_basic_router_config(port).await; config_router .insert_json5( "access_control", - r#" - {"enabled": true, - "default_permission": "allow", - "rules": - [ - { - "permission": "deny", - "flows": ["egress"], - "actions": [ - "get", - "declare_queryable" ], - "key_exprs": [ - "test/demo" - ], - "interfaces": [ - "lo","lo0" - ] - }, - ] - } - "#, + r#"{ + "enabled": true, + "default_permission": "allow", + "rules": [ + { + "id": "r1", + "permission": "deny", + "flows": ["egress"], + "messages": [ + "query", + "declare_queryable" + ], + "key_exprs": [ + "test/demo" + ], + }, + ], + "subjects": [ + { + "id": "s1", + "interfaces": [ + "lo", "lo0" + ], + } + ], + "policies": [ + { + "rules": ["r1"], + "subjects": ["s1"], + } + ] + }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (get_session, qbl_session) = get_client_sessions().await; + let (get_session, qbl_session) = get_client_sessions(port).await; { let mut received_value = String::new(); diff --git a/zenoh/tests/authentication.rs b/zenoh/tests/authentication.rs index 39daff0199..09dd3b74eb 100644 --- a/zenoh/tests/authentication.rs +++ b/zenoh/tests/authentication.rs @@ -15,7 +15,7 @@ mod test { use std::{ fs, path::PathBuf, - sync::{Arc, Mutex}, + sync::{atomic::AtomicBool, Arc, Mutex}, time::Duration, }; @@ -34,36 +34,72 @@ mod test { const KEY_EXPR: &str = "test/demo"; const VALUE: &str = "zenoh"; static TESTFILES_PATH: Lazy = Lazy::new(std::env::temp_dir); + static TESTFILES_CREATED: Lazy = Lazy::new(|| AtomicBool::new(false)); #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn test_authentication() { + async fn test_authentication_usrpwd() { zenoh_util::try_init_log_from_env(); create_new_files(TESTFILES_PATH.to_path_buf()) .await .unwrap(); - println!("testfiles created successfully."); - - test_pub_sub_deny_then_allow_usrpswd().await; - test_pub_sub_allow_then_deny_usrpswd().await; - test_get_qbl_allow_then_deny_usrpswd().await; - test_get_qbl_deny_then_allow_usrpswd().await; + test_pub_sub_deny_then_allow_usrpswd(37447).await; + test_pub_sub_allow_then_deny_usrpswd(37447).await; + test_get_qbl_allow_then_deny_usrpswd(37447).await; + test_get_qbl_deny_then_allow_usrpswd(37447).await; + } - test_pub_sub_deny_then_allow_tls(3774).await; - test_pub_sub_allow_then_deny_tls(3775).await; - test_get_qbl_allow_then_deny_tls(3776).await; - test_get_qbl_deny_then_allow_tls(3777).await; + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn test_authentication_tls() { + zenoh_util::try_init_log_from_env(); + create_new_files(TESTFILES_PATH.to_path_buf()) + .await + .unwrap(); + test_pub_sub_deny_then_allow_tls(37448).await; + test_pub_sub_allow_then_deny_tls(37449).await; + test_get_qbl_allow_then_deny_tls(37450).await; + test_get_qbl_deny_then_allow_tls(37451).await; + } - test_pub_sub_deny_then_allow_quic(3774, false).await; - test_pub_sub_allow_then_deny_quic(3775).await; - test_get_qbl_deny_then_allow_quic(3776).await; - test_get_qbl_allow_then_deny_quic(3777).await; + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn test_authentication_quic() { + zenoh_util::try_init_log_from_env(); + create_new_files(TESTFILES_PATH.to_path_buf()) + .await + .unwrap(); + test_pub_sub_deny_then_allow_quic(37452, false).await; + test_pub_sub_allow_then_deny_quic(37453).await; + test_get_qbl_deny_then_allow_quic(37454).await; + test_get_qbl_allow_then_deny_quic(37455).await; + } + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn test_authentication_lowlatency() { // Test link AuthIds accessibility for lowlatency transport - test_pub_sub_deny_then_allow_quic(3778, true).await; + zenoh_util::try_init_log_from_env(); + create_new_files(TESTFILES_PATH.to_path_buf()) + .await + .unwrap(); + test_pub_sub_deny_then_allow_quic(37456, true).await; + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn test_authentication_subject_combinations() { + zenoh_util::try_init_log_from_env(); + create_new_files(TESTFILES_PATH.to_path_buf()) + .await + .unwrap(); + test_deny_allow_combination(37457).await; + test_allow_deny_combination(37458).await; } #[allow(clippy::all)] async fn create_new_files(certs_dir: std::path::PathBuf) -> std::io::Result<()> { + let created = TESTFILES_CREATED.fetch_or(true, std::sync::atomic::Ordering::SeqCst); + if created { + // only create files once per tests + println!("Skipping testfile creation: files already created by another test instance"); + return Ok(()); + } use std::io::prelude::*; let ca_pem = b"-----BEGIN CERTIFICATE----- MIIDiTCCAnGgAwIBAgIUO1x6LAlICgKs5+pYUTo4CughfKEwDQYJKoZIhvcNAQEL @@ -229,6 +265,7 @@ client2name:client2passwd"; file.write_all(test_file.value)?; } + println!("testfiles created successfully."); Ok(()) } @@ -332,13 +369,13 @@ client2name:client2passwd"; config } - async fn get_basic_router_config_usrpswd() -> Config { + async fn get_basic_router_config_usrpswd(port: u16) -> Config { let mut config = config::default(); config.set_mode(Some(WhatAmI::Router)).unwrap(); config .listen .endpoints - .set(vec!["tcp/127.0.0.1:37447".parse().unwrap()]) + .set(vec![format!("tcp/127.0.0.1:{port}").parse().unwrap()]) .unwrap(); config.scouting.multicast.set_enabled(Some(false)).unwrap(); config @@ -370,6 +407,71 @@ client2name:client2passwd"; ztimeout!(s.close()).unwrap(); } + async fn get_basic_router_config_quic_usrpswd(port: u16) -> Config { + let cert_path = TESTFILES_PATH.to_string_lossy(); + let mut config = config::default(); + config.set_mode(Some(WhatAmI::Router)).unwrap(); + config + .listen + .endpoints + .set(vec![ + format!("quic/127.0.0.1:{port}").parse().unwrap(), + format!("tcp/127.0.0.1:{port}").parse().unwrap(), + ]) + .unwrap(); + config.scouting.multicast.set_enabled(Some(false)).unwrap(); + config + .insert_json5( + "transport", + r#"{ + "link": { + "protocols": [ + "quic", "tcp" + ], + "tls": { + "client_auth": true, + "server_name_verification": false + }, + }, + "auth": { + usrpwd: { + user: "routername", + password: "routerpasswd", + }, + }, + }"#, + ) + .unwrap(); + config + .transport + .auth + .usrpwd + .set_dictionary_file(Some(format!( + "{}/credentials.txt", + TESTFILES_PATH.to_string_lossy() + ))) + .unwrap(); + config + .transport + .link + .tls + .set_server_private_key(Some(format!("{}/serversidekey.pem", cert_path))) + .unwrap(); + config + .transport + .link + .tls + .set_server_certificate(Some(format!("{}/serverside.pem", cert_path))) + .unwrap(); + config + .transport + .link + .tls + .set_root_ca_certificate(Some(format!("{}/ca.pem", cert_path))) + .unwrap(); + config + } + async fn get_client_sessions_tls(port: u16) -> (Session, Session) { let cert_path = TESTFILES_PATH.to_string_lossy(); println!("Opening client sessions"); @@ -549,9 +651,10 @@ client2name:client2passwd"; (s01, s02) } - async fn get_client_sessions_usrpswd() -> (Session, Session) { + async fn get_client_sessions_usrpswd(port: u16) -> (Session, Session) { println!("Opening client sessions"); - let mut config = config::client(["tcp/127.0.0.1:37447".parse::().unwrap()]); + let mut config = + config::client([format!("tcp/127.0.0.1:{port}").parse::().unwrap()]); config .insert_json5( "transport", @@ -566,7 +669,8 @@ client2name:client2passwd"; ) .unwrap(); let s01 = ztimeout!(zenoh::open(config)).unwrap(); - let mut config = config::client(["tcp/127.0.0.1:37447".parse::().unwrap()]); + let mut config = + config::client([format!("tcp/127.0.0.1:{port}").parse::().unwrap()]); config .insert_json5( "transport", @@ -584,6 +688,101 @@ client2name:client2passwd"; (s01, s02) } + async fn get_client_sessions_quic_usrpswd(port: u16) -> (Session, Session) { + let cert_path = TESTFILES_PATH.to_string_lossy(); + println!("Opening client sessions"); + let mut config = config::client([format!("quic/127.0.0.1:{port}") + .parse::() + .unwrap()]); + config + .insert_json5( + "transport", + r#"{ + "link": { + "protocols": [ + "quic" + ], + "tls": { + "client_auth": true, + "server_name_verification": false + } + }, + "auth": { + usrpwd: { + user: "client1name", + password: "client1passwd", + }, + } + }"#, + ) + .unwrap(); + config + .transport + .link + .tls + .set_client_private_key(Some(format!("{}/clientsidekey.pem", cert_path))) + .unwrap(); + config + .transport + .link + .tls + .set_client_certificate(Some(format!("{}/clientside.pem", cert_path))) + .unwrap(); + config + .transport + .link + .tls + .set_root_ca_certificate(Some(format!("{}/ca.pem", cert_path))) + .unwrap(); + let s01 = ztimeout!(zenoh::open(config)).unwrap(); + + let mut config = config::client([format!("quic/127.0.0.1:{}", port) + .parse::() + .unwrap()]); + config + .insert_json5( + "transport", + r#"{ + "link": { + "protocols": [ + "quic" + ], + "tls": { + "client_auth": true, + "server_name_verification": false + } + }, + "auth": { + usrpwd: { + user: "client2name", + password: "client2passwd", + }, + } + }"#, + ) + .unwrap(); + config + .transport + .link + .tls + .set_client_private_key(Some(format!("{}/clientsidekey.pem", cert_path))) + .unwrap(); + config + .transport + .link + .tls + .set_client_certificate(Some(format!("{}/clientside.pem", cert_path))) + .unwrap(); + config + .transport + .link + .tls + .set_root_ca_certificate(Some(format!("{}/ca.pem", cert_path))) + .unwrap(); + let s02 = ztimeout!(zenoh::open(config)).unwrap(); + (s01, s02) + } + async fn close_sessions(s01: Session, s02: Session) { println!("Closing client sessions"); ztimeout!(s01.close()).unwrap(); @@ -603,19 +802,31 @@ client2name:client2passwd"; "default_permission": "deny", "rules": [ { + "id": "r1", "permission": "allow", "flows": ["ingress","egress"], - "actions": [ + "messages": [ "put", "declare_subscriber" ], "key_exprs": [ "test/demo" ], + }, + ], + "subjects": [ + { + "id": "s1", "cert_common_names": [ "client_side" ] - }, + } + ], + "policies": [ + { + "rules": ["r1"], + "subjects": ["s1"], + } ] }"#, ) @@ -659,19 +870,31 @@ client2name:client2passwd"; "default_permission": "allow", "rules": [ { + "id": "r1", "permission": "deny", "flows": ["egress"], - "actions": [ + "messages": [ "put", "declare_subscriber" ], "key_exprs": [ "test/demo" ], + }, + ], + "subjects": [ + { + "id": "s1", "cert_common_names": [ "client_side" ] - }, + } + ], + "policies": [ + { + "rules": ["r1"], + "subjects": ["s1"], + } ] }"#, ) @@ -717,19 +940,31 @@ client2name:client2passwd"; "default_permission": "deny", "rules": [ { + "id": "r1", "permission": "allow", - "flows": ["egress","ingress"], - "actions": [ - "get", + "flows": ["egress", "ingress"], + "messages": [ + "query", "declare_queryable" ], "key_exprs": [ "test/demo" ], + }, + ], + "subjects": [ + { + "id": "s1", "cert_common_names": [ "client_side" ] - }, + } + ], + "policies": [ + { + "rules": ["r1"], + "subjects": ["s1"], + } ] }"#, ) @@ -790,19 +1025,31 @@ client2name:client2passwd"; "default_permission": "allow", "rules": [ { + "id": "r1", "permission": "deny", "flows": ["egress"], - "actions": [ - "get", + "messages": [ + "query", "declare_queryable" ], "key_exprs": [ "test/demo" ], + }, + ], + "subjects": [ + { + "id": "s1", "cert_common_names": [ "client_side" ] - }, + } + ], + "policies": [ + { + "rules": ["r1"], + "subjects": ["s1"], + } ] }"#, ) @@ -863,19 +1110,31 @@ client2name:client2passwd"; "default_permission": "deny", "rules": [ { + "id": "r1", "permission": "allow", - "flows": ["ingress","egress"], - "actions": [ + "flows": ["egress", "ingress"], + "messages": [ "put", "declare_subscriber" ], "key_exprs": [ "test/demo" ], + }, + ], + "subjects": [ + { + "id": "s1", "cert_common_names": [ "client_side" ] - }, + } + ], + "policies": [ + { + "rules": ["r1"], + "subjects": ["s1"], + } ] }"#, ) @@ -921,19 +1180,31 @@ client2name:client2passwd"; "default_permission": "allow", "rules": [ { + "id": "r1", "permission": "deny", "flows": ["egress"], - "actions": [ + "messages": [ "put", "declare_subscriber" ], "key_exprs": [ "test/demo" ], + }, + ], + "subjects": [ + { + "id": "s1", "cert_common_names": [ "client_side" ] - }, + } + ], + "policies": [ + { + "rules": ["r1"], + "subjects": ["s1"], + } ] }"#, ) @@ -980,18 +1251,31 @@ client2name:client2passwd"; "default_permission": "deny", "rules": [ { + "id": "r1", "permission": "allow", - "flows": ["egress","ingress"], - "actions": [ - "get", - "declare_queryable"], + "flows": ["egress", "ingress"], + "messages": [ + "query", + "declare_queryable" + ], "key_exprs": [ "test/demo" ], + }, + ], + "subjects": [ + { + "id": "s1", "cert_common_names": [ "client_side" ] - }, + } + ], + "policies": [ + { + "rules": ["r1"], + "subjects": ["s1"], + } ] }"#, ) @@ -1051,22 +1335,33 @@ client2name:client2passwd"; r#"{ "enabled": true, "default_permission": "allow", - "rules": - [ + "rules": [ { + "id": "r1", "permission": "deny", "flows": ["egress"], - "actions": [ - "get", + "messages": [ + "query", "declare_queryable" ], "key_exprs": [ "test/demo" ], + }, + ], + "subjects": [ + { + "id": "s1", "cert_common_names": [ "client_side" ] - }, + } + ], + "policies": [ + { + "rules": ["r1"], + "subjects": ["s1"], + } ] }"#, ) @@ -1114,10 +1409,10 @@ client2name:client2passwd"; close_router_session(session).await; } - async fn test_pub_sub_deny_then_allow_usrpswd() { + async fn test_pub_sub_deny_then_allow_usrpswd(port: u16) { println!("test_pub_sub_deny_then_allow_usrpswd"); - let mut config_router = get_basic_router_config_usrpswd().await; + let mut config_router = get_basic_router_config_usrpswd(port).await; config_router .insert_json5( @@ -1127,20 +1422,32 @@ client2name:client2passwd"; "default_permission": "deny", "rules": [ { + "id": "r1", "permission": "allow", - "flows": ["ingress","egress"], - "actions": [ + "flows": ["ingress", "egress"], + "messages": [ "put", "declare_subscriber" ], "key_exprs": [ "test/demo" ], + }, + ], + "subjects": [ + { + "id": "s1", "usernames": [ "client1name", "client2name" ] - }, + } + ], + "policies": [ + { + "rules": ["r1"], + "subjects": ["s1"], + } ] }"#, ) @@ -1149,7 +1456,7 @@ client2name:client2passwd"; let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (sub_session, pub_session) = get_client_sessions_usrpswd().await; + let (sub_session, pub_session) = get_client_sessions_usrpswd(port).await; { let publisher = pub_session.declare_publisher(KEY_EXPR).await.unwrap(); let received_value = Arc::new(Mutex::new(String::new())); @@ -1173,10 +1480,10 @@ client2name:client2passwd"; close_router_session(session).await; } - async fn test_pub_sub_allow_then_deny_usrpswd() { + async fn test_pub_sub_allow_then_deny_usrpswd(port: u16) { println!("test_pub_sub_allow_then_deny_usrpswd"); - let mut config_router = get_basic_router_config_usrpswd().await; + let mut config_router = get_basic_router_config_usrpswd(port).await; config_router .insert_json5( "access_control", @@ -1185,20 +1492,32 @@ client2name:client2passwd"; "default_permission": "allow", "rules": [ { + "id": "r1", "permission": "deny", "flows": ["egress"], - "actions": [ + "messages": [ "put", "declare_subscriber" ], "key_exprs": [ "test/demo" ], + }, + ], + "subjects": [ + { + "id": "s1", "usernames": [ "client1name", "client2name" ] - }, + } + ], + "policies": [ + { + "rules": ["r1"], + "subjects": ["s1"], + } ] }"#, ) @@ -1206,7 +1525,7 @@ client2name:client2passwd"; println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (sub_session, pub_session) = get_client_sessions_usrpswd().await; + let (sub_session, pub_session) = get_client_sessions_usrpswd(port).await; { let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); let received_value = Arc::new(Mutex::new(String::new())); @@ -1232,10 +1551,10 @@ client2name:client2passwd"; close_router_session(session).await; } - async fn test_get_qbl_deny_then_allow_usrpswd() { + async fn test_get_qbl_deny_then_allow_usrpswd(port: u16) { println!("test_get_qbl_deny_then_allow_usrpswd"); - let mut config_router = get_basic_router_config_usrpswd().await; + let mut config_router = get_basic_router_config_usrpswd(port).await; config_router .insert_json5( "access_control", @@ -1244,20 +1563,32 @@ client2name:client2passwd"; "default_permission": "deny", "rules": [ { + "id": "r1", "permission": "allow", - "flows": ["egress","ingress"], - "actions": [ - "get", + "flows": ["ingress", "egress"], + "messages": [ + "query", "declare_queryable" ], "key_exprs": [ "test/demo" ], + }, + ], + "subjects": [ + { + "id": "s1", "usernames": [ "client1name", "client2name" ] - }, + } + ], + "policies": [ + { + "rules": ["r1"], + "subjects": ["s1"], + } ] }"#, ) @@ -1267,7 +1598,7 @@ client2name:client2passwd"; let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (get_session, qbl_session) = get_client_sessions_usrpswd().await; + let (get_session, qbl_session) = get_client_sessions_usrpswd(port).await; { let mut received_value = String::new(); @@ -1306,10 +1637,10 @@ client2name:client2passwd"; close_router_session(session).await; } - async fn test_get_qbl_allow_then_deny_usrpswd() { + async fn test_get_qbl_allow_then_deny_usrpswd(port: u16) { println!("test_get_qbl_allow_then_deny_usrpswd"); - let mut config_router = get_basic_router_config_usrpswd().await; + let mut config_router = get_basic_router_config_usrpswd(port).await; config_router .insert_json5( "access_control", @@ -1318,20 +1649,32 @@ client2name:client2passwd"; "default_permission": "allow", "rules": [ { + "id": "r1", "permission": "deny", "flows": ["egress"], - "actions": [ - "get", + "messages": [ + "query", "declare_queryable" ], "key_exprs": [ "test/demo" ], + }, + ], + "subjects": [ + { + "id": "s1", "usernames": [ "client1name", "client2name" ] - }, + } + ], + "policies": [ + { + "rules": ["r1"], + "subjects": ["s1"], + } ] }"#, ) @@ -1340,7 +1683,7 @@ client2name:client2passwd"; let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (get_session, qbl_session) = get_client_sessions_usrpswd().await; + let (get_session, qbl_session) = get_client_sessions_usrpswd(port).await; { let mut received_value = String::new(); @@ -1378,4 +1721,196 @@ client2name:client2passwd"; close_sessions(get_session, qbl_session).await; close_router_session(session).await; } + + async fn test_deny_allow_combination(port: u16) { + println!("test_deny_allow_combination"); + + let mut config_router = get_basic_router_config_quic_usrpswd(port).await; + config_router + .insert_json5( + "access_control", + r#"{ + "enabled": true, + "default_permission": "deny", + "rules": [ + { + "id": "r1", + "permission": "allow", + "flows": ["ingress", "egress"], + "messages": [ + "put", + "declare_subscriber" + ], + "key_exprs": [ + "test/demo" + ], + }, + ], + "subjects": [ + { + "id": "s1", + "cert_common_names": [ + "client_side" + ], + "usernames": [ + "client1name", + "client2name" + ] + } + ], + "policies": [ + { + "rules": ["r1"], + "subjects": ["s1"], + } + ] + }"#, + ) + .unwrap(); + + println!("Opening router session"); + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + let (sub_session, pub_session) = get_client_sessions_usrpswd(port).await; + { + let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); + let received_value = Arc::new(Mutex::new(String::new())); + let temp_recv_value = received_value.clone(); + let subscriber = + ztimeout!(sub_session + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + let mut temp_value = zlock!(temp_recv_value); + *temp_value = sample.payload().deserialize::().unwrap(); + })) + .unwrap(); + + tokio::time::sleep(SLEEP).await; + + ztimeout!(publisher.put(VALUE)).unwrap(); + tokio::time::sleep(SLEEP).await; + + assert_ne!(*zlock!(received_value), VALUE); + ztimeout!(subscriber.undeclare()).unwrap(); + } + close_sessions(sub_session, pub_session).await; + let (sub_session, pub_session) = get_client_sessions_quic_usrpswd(port).await; + { + let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); + let received_value = Arc::new(Mutex::new(String::new())); + let temp_recv_value = received_value.clone(); + let subscriber = + ztimeout!(sub_session + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + let mut temp_value = zlock!(temp_recv_value); + *temp_value = sample.payload().deserialize::().unwrap(); + })) + .unwrap(); + + tokio::time::sleep(SLEEP).await; + + ztimeout!(publisher.put(VALUE)).unwrap(); + tokio::time::sleep(SLEEP).await; + + assert_eq!(*zlock!(received_value), VALUE); + ztimeout!(subscriber.undeclare()).unwrap(); + } + close_router_session(session).await; + } + + async fn test_allow_deny_combination(port: u16) { + println!("test_allow_deny_combination"); + + let mut config_router = get_basic_router_config_quic_usrpswd(port).await; + config_router + .insert_json5( + "access_control", + r#"{ + "enabled": true, + "default_permission": "allow", + "rules": [ + { + "id": "r1", + "permission": "deny", + "flows": ["egress"], + "messages": [ + "put", + "declare_subscriber" + ], + "key_exprs": [ + "test/demo" + ], + }, + ], + "subjects": [ + { + "id": "s1", + "cert_common_names": [ + "client_side" + ], + "usernames": [ + "client1name", + "client2name" + ] + } + ], + "policies": [ + { + "rules": ["r1"], + "subjects": ["s1"], + } + ] + }"#, + ) + .unwrap(); + + println!("Opening router session"); + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + let (sub_session, pub_session) = get_client_sessions_usrpswd(port).await; + { + let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); + let received_value = Arc::new(Mutex::new(String::new())); + let temp_recv_value = received_value.clone(); + let subscriber = + ztimeout!(sub_session + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + let mut temp_value = zlock!(temp_recv_value); + *temp_value = sample.payload().deserialize::().unwrap(); + })) + .unwrap(); + + tokio::time::sleep(SLEEP).await; + + ztimeout!(publisher.put(VALUE)).unwrap(); + tokio::time::sleep(SLEEP).await; + + assert_eq!(*zlock!(received_value), VALUE); + ztimeout!(subscriber.undeclare()).unwrap(); + } + close_sessions(sub_session, pub_session).await; + let (sub_session, pub_session) = get_client_sessions_quic_usrpswd(port).await; + { + let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); + let received_value = Arc::new(Mutex::new(String::new())); + let temp_recv_value = received_value.clone(); + let subscriber = + ztimeout!(sub_session + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + let mut temp_value = zlock!(temp_recv_value); + *temp_value = sample.payload().deserialize::().unwrap(); + })) + .unwrap(); + + tokio::time::sleep(SLEEP).await; + + ztimeout!(publisher.put(VALUE)).unwrap(); + tokio::time::sleep(SLEEP).await; + + assert_ne!(*zlock!(received_value), VALUE); + ztimeout!(subscriber.undeclare()).unwrap(); + } + close_router_session(session).await; + } }