Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release tables locks before propagating subscribers and queryables declarations to void dead locks #1150

Merged
merged 3 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions zenoh/src/net/primitives/demux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,21 @@ impl TransportPeerEventHandler for DeMux {
NetworkBody::ResponseFinal(m) => self.face.send_response_final(m),
NetworkBody::OAM(m) => {
if let Some(transport) = self.transport.as_ref() {
let mut declares = vec![];
let ctrl_lock = zlock!(self.face.tables.ctrl_lock);
let mut tables = zwrite!(self.face.tables.tables);
ctrl_lock.handle_oam(&mut tables, &self.face.tables, m, transport)?
ctrl_lock.handle_oam(
&mut tables,
&self.face.tables,
m,
transport,
&mut |p, m| declares.push((p.clone(), m)),
)?;
drop(tables);
drop(ctrl_lock);
for (p, m) in declares {
p.send_declare(m);
}
}
}
}
Expand All @@ -89,9 +101,17 @@ impl TransportPeerEventHandler for DeMux {
fn closing(&self) {
self.face.send_close();
if let Some(transport) = self.transport.as_ref() {
let mut declares = vec![];
let ctrl_lock = zlock!(self.face.tables.ctrl_lock);
let mut tables = zwrite!(self.face.tables.tables);
let _ = ctrl_lock.closing(&mut tables, &self.face.tables, transport);
let _ = ctrl_lock.closing(&mut tables, &self.face.tables, transport, &mut |p, m| {
declares.push((p.clone(), m))
});
drop(tables);
drop(ctrl_lock);
for (p, m) in declares {
p.send_declare(m);
}
}
}

Expand Down
25 changes: 24 additions & 1 deletion zenoh/src/net/routing/dispatcher/face.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,50 +195,73 @@ impl Primitives for Face {
unregister_expr(&self.tables, &mut self.state.clone(), m.id);
}
zenoh_protocol::network::DeclareBody::DeclareSubscriber(m) => {
let mut declares = vec![];
declare_subscription(
ctrl_lock.as_ref(),
&self.tables,
&mut self.state.clone(),
&m.wire_expr,
&m.ext_info,
msg.ext_nodeid.node_id,
&mut |p, m| declares.push((p.clone(), m)),
);
drop(ctrl_lock);
for (p, m) in declares {
p.send_declare(m);
}
}
zenoh_protocol::network::DeclareBody::UndeclareSubscriber(m) => {
let mut declares = vec![];
undeclare_subscription(
ctrl_lock.as_ref(),
&self.tables,
&mut self.state.clone(),
&m.ext_wire_expr.wire_expr,
msg.ext_nodeid.node_id,
&mut |p, m| declares.push((p.clone(), m)),
);
drop(ctrl_lock);
for (p, m) in declares {
p.send_declare(m);
}
}
zenoh_protocol::network::DeclareBody::DeclareQueryable(m) => {
let mut declares = vec![];
declare_queryable(
ctrl_lock.as_ref(),
&self.tables,
&mut self.state.clone(),
&m.wire_expr,
&m.ext_info,
msg.ext_nodeid.node_id,
&mut |p, m| declares.push((p.clone(), m)),
);
drop(ctrl_lock);
for (p, m) in declares {
p.send_declare(m);
}
}
zenoh_protocol::network::DeclareBody::UndeclareQueryable(m) => {
let mut declares = vec![];
undeclare_queryable(
ctrl_lock.as_ref(),
&self.tables,
&mut self.state.clone(),
&m.ext_wire_expr.wire_expr,
msg.ext_nodeid.node_id,
&mut |p, m| declares.push((p.clone(), m)),
);
drop(ctrl_lock);
for (p, m) in declares {
p.send_declare(m);
}
}
zenoh_protocol::network::DeclareBody::DeclareToken(_m) => todo!(),
zenoh_protocol::network::DeclareBody::UndeclareToken(_m) => todo!(),
zenoh_protocol::network::DeclareBody::DeclareInterest(_m) => todo!(),
zenoh_protocol::network::DeclareBody::FinalInterest(_m) => todo!(),
zenoh_protocol::network::DeclareBody::UndeclareInterest(_m) => todo!(),
}
drop(ctrl_lock);
}

#[inline]
Expand Down
21 changes: 18 additions & 3 deletions zenoh/src/net/routing/dispatcher/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use super::face::FaceState;
use super::resource::{DataRoutes, Direction, PullCaches, Resource};
use super::tables::{NodeId, Route, RoutingExpr, Tables, TablesLock};
use crate::net::routing::hat::HatTrait;
use crate::net::routing::hat::{HatTrait, SendDeclare};
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;
Expand All @@ -37,6 +37,7 @@ pub(crate) fn declare_subscription(
expr: &WireExpr,
sub_info: &SubscriberInfo,
node_id: NodeId,
send_declare: &mut SendDeclare,
) {
tracing::debug!("Declare subscription {}", face);
let rtables = zread!(tables.tables);
Expand Down Expand Up @@ -66,7 +67,14 @@ pub(crate) fn declare_subscription(
(res, wtables)
};

hat_code.declare_subscription(&mut wtables, face, &mut res, sub_info, node_id);
hat_code.declare_subscription(
&mut wtables,
face,
&mut res,
sub_info,
node_id,
send_declare,
);

disable_matches_data_routes(&mut wtables, &mut res);
drop(wtables);
Expand Down Expand Up @@ -96,6 +104,7 @@ pub(crate) fn undeclare_subscription(
face: &mut Arc<FaceState>,
expr: &WireExpr,
node_id: NodeId,
send_declare: &mut SendDeclare,
) {
tracing::debug!("Undeclare subscription {}", face);
let rtables = zread!(tables.tables);
Expand All @@ -105,7 +114,13 @@ pub(crate) fn undeclare_subscription(
drop(rtables);
let mut wtables = zwrite!(tables.tables);

hat_code.undeclare_subscription(&mut wtables, face, &mut res, node_id);
hat_code.undeclare_subscription(
&mut wtables,
face,
&mut res,
node_id,
send_declare,
);

disable_matches_data_routes(&mut wtables, &mut res);
drop(wtables);
Expand Down
15 changes: 12 additions & 3 deletions zenoh/src/net/routing/dispatcher/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use super::face::FaceState;
use super::resource::{QueryRoute, QueryRoutes, QueryTargetQablSet, Resource};
use super::tables::NodeId;
use super::tables::{RoutingExpr, Tables, TablesLock};
use crate::net::routing::hat::HatTrait;
use crate::net::routing::hat::{HatTrait, SendDeclare};
use crate::net::routing::RoutingContext;
use async_trait::async_trait;
use std::collections::HashMap;
Expand Down Expand Up @@ -56,6 +56,7 @@ pub(crate) fn declare_queryable(
expr: &WireExpr,
qabl_info: &QueryableInfo,
node_id: NodeId,
send_declare: &mut SendDeclare,
) {
tracing::debug!("Register queryable {}", face);
let rtables = zread!(tables.tables);
Expand Down Expand Up @@ -85,7 +86,14 @@ pub(crate) fn declare_queryable(
(res, wtables)
};

hat_code.declare_queryable(&mut wtables, face, &mut res, qabl_info, node_id);
hat_code.declare_queryable(
&mut wtables,
face,
&mut res,
qabl_info,
node_id,
send_declare,
);

disable_matches_query_routes(&mut wtables, &mut res);
drop(wtables);
Expand All @@ -112,6 +120,7 @@ pub(crate) fn undeclare_queryable(
face: &mut Arc<FaceState>,
expr: &WireExpr,
node_id: NodeId,
send_declare: &mut SendDeclare,
) {
let rtables = zread!(tables.tables);
match rtables.get_mapping(face, &expr.scope, expr.mapping) {
Expand All @@ -120,7 +129,7 @@ pub(crate) fn undeclare_queryable(
drop(rtables);
let mut wtables = zwrite!(tables.tables);

hat_code.undeclare_queryable(&mut wtables, face, &mut res, node_id);
hat_code.undeclare_queryable(&mut wtables, face, &mut res, node_id, send_declare);

disable_matches_query_routes(&mut wtables, &mut res);
drop(wtables);
Expand Down
8 changes: 7 additions & 1 deletion zenoh/src/net/routing/dispatcher/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,13 @@ pub fn close_face(tables: &TablesLock, face: &Weak<FaceState>) {
tracing::debug!("Close {}", face);
face.task_controller.terminate_all(Duration::from_secs(10));
finalize_pending_queries(tables, &mut face);
zlock!(tables.ctrl_lock).close_face(tables, &mut face);
let mut declares = vec![];
let ctrl_lock = zlock!(tables.ctrl_lock);
ctrl_lock.close_face(tables, &mut face, &mut |p, m| declares.push((p.clone(), m)));
drop(ctrl_lock);
for (p, m) in declares {
p.send_declare(m);
}
}
None => tracing::error!("Face already closed!"),
}
Expand Down
25 changes: 17 additions & 8 deletions zenoh/src/net/routing/hat/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use super::{
face::FaceState,
tables::{NodeId, Resource, RoutingExpr, Tables, TablesLock},
},
HatBaseTrait, HatTrait,
HatBaseTrait, HatTrait, SendDeclare,
};
use std::{
any::Any,
Expand Down Expand Up @@ -97,9 +97,10 @@ impl HatBaseTrait for HatCode {
tables: &mut Tables,
_tables_ref: &Arc<TablesLock>,
face: &mut Face,
send_declare: &mut SendDeclare,
) -> ZResult<()> {
pubsub_new_face(tables, &mut face.state);
queries_new_face(tables, &mut face.state);
pubsub_new_face(tables, &mut face.state, send_declare);
queries_new_face(tables, &mut face.state, send_declare);
Ok(())
}

Expand All @@ -109,13 +110,19 @@ impl HatBaseTrait for HatCode {
_tables_ref: &Arc<TablesLock>,
face: &mut Face,
_transport: &TransportUnicast,
send_declare: &mut SendDeclare,
) -> ZResult<()> {
pubsub_new_face(tables, &mut face.state);
queries_new_face(tables, &mut face.state);
pubsub_new_face(tables, &mut face.state, send_declare);
queries_new_face(tables, &mut face.state, send_declare);
Ok(())
}

fn close_face(&self, tables: &TablesLock, face: &mut Arc<FaceState>) {
fn close_face(
&self,
tables: &TablesLock,
face: &mut Arc<FaceState>,
send_declare: &mut SendDeclare,
) {
let mut wtables = zwrite!(tables.tables);
let mut face_clone = face.clone();
let face = get_mut_unchecked(face);
Expand All @@ -139,7 +146,7 @@ impl HatBaseTrait for HatCode {
.drain()
{
get_mut_unchecked(&mut res).session_ctxs.remove(&face.id);
undeclare_client_subscription(&mut wtables, &mut face_clone, &mut res);
undeclare_client_subscription(&mut wtables, &mut face_clone, &mut res, send_declare);

if res.context.is_some() {
for match_ in &res.context().matches {
Expand Down Expand Up @@ -167,7 +174,7 @@ impl HatBaseTrait for HatCode {
.drain()
{
get_mut_unchecked(&mut res).session_ctxs.remove(&face.id);
undeclare_client_queryable(&mut wtables, &mut face_clone, &mut res);
undeclare_client_queryable(&mut wtables, &mut face_clone, &mut res, send_declare);

if res.context.is_some() {
for match_ in &res.context().matches {
Expand Down Expand Up @@ -229,6 +236,7 @@ impl HatBaseTrait for HatCode {
_tables_ref: &Arc<TablesLock>,
_oam: Oam,
_transport: &TransportUnicast,
_send_declare: &mut SendDeclare,
) -> ZResult<()> {
Ok(())
}
Expand All @@ -248,6 +256,7 @@ impl HatBaseTrait for HatCode {
_tables: &mut Tables,
_tables_ref: &Arc<TablesLock>,
_transport: &TransportUnicast,
_send_declare: &mut SendDeclare,
) -> ZResult<()> {
Ok(())
}
Expand Down
Loading