Skip to content

Commit

Permalink
Send WireExpr in UndeclareSubscriber/UndeclareQueryable to clients fo…
Browse files Browse the repository at this point in the history
…r pico
  • Loading branch information
OlivierHecart committed Mar 12, 2024
1 parent 1eb8629 commit 16f7789
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 33 deletions.
20 changes: 18 additions & 2 deletions zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,14 +307,22 @@ fn send_forget_sourced_subscription_to_net_childs(
fn propagate_forget_simple_subscription(tables: &mut Tables, res: &Arc<Resource>) {
for face in tables.faces.values_mut() {
if let Some(id) = face_hat_mut!(face).local_subs.remove(res) {
// Still send WireExpr in UndeclareSubscriber to clients for pico
let ext_wire_expr = if face.whatami == WhatAmI::Client {
WireExprType {
wire_expr: Resource::get_best_key(res, "", face.id),
}
} else {
WireExprType::null()
};
face.primitives.send_declare(RoutingContext::with_expr(
Declare {
ext_qos: ext::QoSType::DECLARE,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber {
id,
ext_wire_expr: WireExprType::null(),
ext_wire_expr,
}),
},
res.expr(),
Expand Down Expand Up @@ -412,14 +420,22 @@ pub(super) fn undeclare_client_subscription(
let face = &mut client_subs[0];
if !(face.whatami == WhatAmI::Client && res.expr().starts_with(PREFIX_LIVELINESS)) {
if let Some(id) = face_hat_mut!(face).local_subs.remove(res) {
// Still send WireExpr in UndeclareSubscriber to clients for pico
let ext_wire_expr = if face.whatami == WhatAmI::Client {
WireExprType {
wire_expr: Resource::get_best_key(res, "", face.id),
}
} else {
WireExprType::null()
};
face.primitives.send_declare(RoutingContext::with_expr(
Declare {
ext_qos: ext::QoSType::DECLARE,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber {
id,
ext_wire_expr: WireExprType::null(),
ext_wire_expr,
}),
},
res.expr(),
Expand Down
23 changes: 18 additions & 5 deletions zenoh/src/net/routing/hat/linkstate_peer/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,15 +365,20 @@ fn send_forget_sourced_queryable_to_net_childs(
fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc<Resource>) {
for face in tables.faces.values_mut() {
if let Some((id, _)) = face_hat_mut!(face).local_qabls.remove(res) {
// Still send WireExpr in UndeclareQueryable to clients for pico
let ext_wire_expr = if face.whatami == WhatAmI::Client {
WireExprType {
wire_expr: Resource::get_best_key(res, "", face.id),
}
} else {
WireExprType::null()
};
face.primitives.send_declare(RoutingContext::with_expr(
Declare {
ext_qos: ext::QoSType::DECLARE,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
body: DeclareBody::UndeclareQueryable(UndeclareQueryable {
id,
ext_wire_expr: WireExprType::null(),
}),
body: DeclareBody::UndeclareQueryable(UndeclareQueryable { id, ext_wire_expr }),
},
res.expr(),
));
Expand Down Expand Up @@ -478,14 +483,22 @@ pub(super) fn undeclare_client_queryable(
if client_qabls.len() == 1 && !peer_qabls {
let face = &mut client_qabls[0];
if let Some((id, _)) = face_hat_mut!(face).local_qabls.remove(res) {
// Still send WireExpr in UndeclareQueryable to clients for pico
let ext_wire_expr = if face.whatami == WhatAmI::Client {
WireExprType {
wire_expr: Resource::get_best_key(res, "", face.id),
}
} else {
WireExprType::null()
};
face.primitives.send_declare(RoutingContext::with_expr(
Declare {
ext_qos: ext::QoSType::DECLARE,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
body: DeclareBody::UndeclareQueryable(UndeclareQueryable {
id,
ext_wire_expr: WireExprType::null(),
ext_wire_expr,
}),
},
res.expr(),
Expand Down
20 changes: 18 additions & 2 deletions zenoh/src/net/routing/hat/p2p_peer/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,22 @@ fn client_subs(res: &Arc<Resource>) -> Vec<Arc<FaceState>> {
fn propagate_forget_simple_subscription(tables: &mut Tables, res: &Arc<Resource>) {
for face in tables.faces.values_mut() {
if let Some(id) = face_hat_mut!(face).local_subs.remove(res) {
// Still send WireExpr in UndeclareSubscriber to clients for pico
let ext_wire_expr = if face.whatami == WhatAmI::Client {
WireExprType {
wire_expr: Resource::get_best_key(res, "", face.id),
}
} else {
WireExprType::null()
};
face.primitives.send_declare(RoutingContext::with_expr(
Declare {
ext_qos: ext::QoSType::DECLARE,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber {
id,
ext_wire_expr: WireExprType::null(),
ext_wire_expr,
}),
},
res.expr(),
Expand All @@ -201,14 +209,22 @@ pub(super) fn undeclare_client_subscription(
let face = &mut client_subs[0];
if !(face.whatami == WhatAmI::Client && res.expr().starts_with(PREFIX_LIVELINESS)) {
if let Some(id) = face_hat_mut!(face).local_subs.remove(res) {
// Still send WireExpr in UndeclareSubscriber to clients for pico
let ext_wire_expr = if face.whatami == WhatAmI::Client {
WireExprType {
wire_expr: Resource::get_best_key(res, "", face.id),
}
} else {
WireExprType::null()
};
face.primitives.send_declare(RoutingContext::with_expr(
Declare {
ext_qos: ext::QoSType::DECLARE,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber {
id,
ext_wire_expr: WireExprType::null(),
ext_wire_expr,
}),
},
res.expr(),
Expand Down
23 changes: 18 additions & 5 deletions zenoh/src/net/routing/hat/p2p_peer/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,15 +164,20 @@ fn client_qabls(res: &Arc<Resource>) -> Vec<Arc<FaceState>> {
fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc<Resource>) {
for face in tables.faces.values_mut() {
if let Some((id, _)) = face_hat_mut!(face).local_qabls.remove(res) {
// Still send WireExpr in UndeclareQueryable to clients for pico
let ext_wire_expr = if face.whatami == WhatAmI::Client {
WireExprType {
wire_expr: Resource::get_best_key(res, "", face.id),
}
} else {
WireExprType::null()
};
face.primitives.send_declare(RoutingContext::with_expr(
Declare {
ext_qos: ext::QoSType::DECLARE,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
body: DeclareBody::UndeclareQueryable(UndeclareQueryable {
id,
ext_wire_expr: WireExprType::null(),
}),
body: DeclareBody::UndeclareQueryable(UndeclareQueryable { id, ext_wire_expr }),
},
res.expr(),
));
Expand Down Expand Up @@ -203,14 +208,22 @@ pub(super) fn undeclare_client_queryable(
if client_qabls.len() == 1 {
let face = &mut client_qabls[0];
if let Some((id, _)) = face_hat_mut!(face).local_qabls.remove(res) {
// Still send WireExpr in UndeclareQueryable to clients for pico
let ext_wire_expr = if face.whatami == WhatAmI::Client {
WireExprType {
wire_expr: Resource::get_best_key(res, "", face.id),
}
} else {
WireExprType::null()
};
face.primitives.send_declare(RoutingContext::with_expr(
Declare {
ext_qos: ext::QoSType::DECLARE,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
body: DeclareBody::UndeclareQueryable(UndeclareQueryable {
id,
ext_wire_expr: WireExprType::null(),
ext_wire_expr,
}),
},
res.expr(),
Expand Down
53 changes: 45 additions & 8 deletions zenoh/src/net/routing/hat/router/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,14 +404,22 @@ fn send_forget_sourced_subscription_to_net_childs(
fn propagate_forget_simple_subscription(tables: &mut Tables, res: &Arc<Resource>) {
for face in tables.faces.values_mut() {
if let Some(id) = face_hat_mut!(face).local_subs.remove(res) {
// Still send WireExpr in UndeclareSubscriber to clients for pico
let ext_wire_expr = if face.whatami == WhatAmI::Client {
WireExprType {
wire_expr: Resource::get_best_key(res, "", face.id),
}
} else {
WireExprType::null()
};
face.primitives.send_declare(RoutingContext::with_expr(
Declare {
ext_qos: ext::QoSType::DECLARE,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber {
id,
ext_wire_expr: WireExprType::null(),
ext_wire_expr,
}),
},
res.expr(),
Expand All @@ -429,14 +437,22 @@ fn propagate_forget_simple_subscription(tables: &mut Tables, res: &Arc<Resource>
.contains(&tables.zid)
}) {
if let Some(id) = face_hat_mut!(face).local_subs.remove(&res) {
// Still send WireExpr in UndeclareSubscriber to clients for pico
let ext_wire_expr = if face.whatami == WhatAmI::Client {
WireExprType {
wire_expr: Resource::get_best_key(&res, "", face.id),
}
} else {
WireExprType::null()
};
face.primitives.send_declare(RoutingContext::with_expr(
Declare {
ext_qos: ext::QoSType::DECLARE,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber {
id,
ext_wire_expr: WireExprType::null(),
ext_wire_expr,
}),
},
res.expr(),
Expand Down Expand Up @@ -469,14 +485,22 @@ fn propagate_forget_simple_subscription_to_peers(tables: &mut Tables, res: &Arc<
})
{
if let Some(id) = face_hat_mut!(&mut face).local_subs.remove(res) {
// Still send WireExpr in UndeclareSubscriber to clients for pico
let ext_wire_expr = if face.whatami == WhatAmI::Client {
WireExprType {
wire_expr: Resource::get_best_key(res, "", face.id),
}
} else {
WireExprType::null()
};
face.primitives.send_declare(RoutingContext::with_expr(
Declare {
ext_qos: ext::QoSType::DECLARE,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber {
id,
ext_wire_expr: WireExprType::null(),
ext_wire_expr,
}),
},
res.expr(),
Expand Down Expand Up @@ -620,14 +644,22 @@ pub(super) fn undeclare_client_subscription(
let face = &mut client_subs[0];
if !(face.whatami == WhatAmI::Client && res.expr().starts_with(PREFIX_LIVELINESS)) {
if let Some(id) = face_hat_mut!(face).local_subs.remove(res) {
// Still send WireExpr in UndeclareSubscriber to clients for pico
let ext_wire_expr = if face.whatami == WhatAmI::Client {
WireExprType {
wire_expr: Resource::get_best_key(res, "", face.id),
}
} else {
WireExprType::null()
};
face.primitives.send_declare(RoutingContext::with_expr(
Declare {
ext_qos: ext::QoSType::DECLARE,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber {
id,
ext_wire_expr: WireExprType::null(),
ext_wire_expr,
}),
},
res.expr(),
Expand Down Expand Up @@ -810,16 +842,21 @@ pub(super) fn pubsub_linkstate_change(tables: &mut Tables, zid: &ZenohId, links:
})
};
if forget {
// Still send WireExpr in UndeclareSubscriber to clients for pico
let ext_wire_expr = if dst_face.whatami == WhatAmI::Client {
WireExprType {
wire_expr: Resource::get_best_key(res, "", dst_face.id),
}
} else {
WireExprType::null()
};
dst_face.primitives.send_declare(RoutingContext::with_expr(
Declare {
ext_qos: ext::QoSType::DECLARE,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
body: DeclareBody::UndeclareSubscriber(
UndeclareSubscriber {
id,
ext_wire_expr: WireExprType::null(),
},
UndeclareSubscriber { id, ext_wire_expr },
),
},
res.expr(),
Expand Down
Loading

0 comments on commit 16f7789

Please sign in to comment.