Skip to content

Commit

Permalink
Merge pull request #93 from mycognosist/implement_ebt_stream_termination
Browse files Browse the repository at this point in the history
Send end-of-stream request to all active sessions before solar terminates
  • Loading branch information
mycognosist authored Feb 6, 2024
2 parents 5ce58fd + e1c48ba commit 7a346d6
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 7 deletions.
24 changes: 22 additions & 2 deletions solar/src/actors/muxrpc/ebt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,18 @@ where
}
// Handle a broker message.
RpcInput::Message(msg) => match msg {
BrokerMessage::Ebt(EbtEvent::TerminateSession(conn_id, session_role)) => {
if conn_id == &connection_id {
let req_no = match session_role {
SessionRole::Requester => self.active_request,
SessionRole::Responder => -(self.active_request),
};

return self.send_cancelstream(api, req_no).await;
}

Ok(false)
}
BrokerMessage::Ebt(EbtEvent::SendClock(conn_id, req_no, clock, session_role)) => {
// This is, regrettably, rather unintuitive.
//
Expand Down Expand Up @@ -324,8 +336,7 @@ where
Ok(false)
}

/// Remove the associated request from the map of active requests and close
/// the stream.
/// Receive close-stream request.
async fn recv_cancelstream(&mut self, api: &mut ApiCaller<W>, req_no: ReqNo) -> Result<bool> {
trace!(target: "ebt-handler", "Received cancel stream RPC response: {}", req_no);

Expand All @@ -334,6 +345,15 @@ where
Ok(true)
}

/// Send close-stream request.
async fn send_cancelstream(&mut self, api: &mut ApiCaller<W>, req_no: ReqNo) -> Result<bool> {
trace!(target: "ebt-handler", "Send cancel stream RPC response: {}", req_no);

api.rpc().send_stream_eof(-req_no).await?;

Ok(true)
}

/// Report a MUXRPC error and remove the associated request from the map of
/// active requests.
async fn recv_error_response(&mut self, req_no: ReqNo, err_msg: &str) -> Result<bool> {
Expand Down
8 changes: 8 additions & 0 deletions solar/src/actors/replication/ebt/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub enum EbtEvent {
ReceivedMessage(Message),
SessionConcluded(ConnectionId, SsbId),
SessionTimeout(ConnectionData, SsbId),
TerminateSession(ConnectionId, SessionRole),
Error(ConnectionData, SsbId, ErrorMsg),
}

Expand Down Expand Up @@ -686,6 +687,10 @@ impl EbtManager {
Ok(())
}

async fn handle_terminate_session(&mut self, connection_id: ConnectionId) {
trace!(target: "ebt-replication", "Terminating session for connection {}", connection_id);
}

async fn handle_error(
&mut self,
connection_data: ConnectionData,
Expand Down Expand Up @@ -801,6 +806,9 @@ impl EbtManager {
error!("Error while handling 'session timeout' event: {}", err)
}
}
EbtEvent::TerminateSession(connection_data, _session_role) => {
self.handle_terminate_session(connection_data).await;
}
EbtEvent::Error(connection_data, peer_ssb_id, error_msg) => {
if let Err(err) = self.handle_error(connection_data, peer_ssb_id, error_msg).await {
error!("Error while handling 'error' event: {}", err)
Expand Down
25 changes: 20 additions & 5 deletions solar/src/actors/replication/ebt/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
network::connection::ConnectionData,
replication::ebt::{EbtEvent, SessionRole},
},
broker::{ActorEndpoint, BrokerEvent, BrokerMessage, Destination, BROKER},
broker::{ActorEndpoint, BrokerEvent, BrokerMessage, Destination, Void, BROKER},
Error, Result,
};

Expand All @@ -27,7 +27,9 @@ pub async fn run(
// Register the EBT replication loop actor with the broker.
let ActorEndpoint {
ch_terminate,
ch_terminated,
ch_msg,
mut ch_broker,
..
} = BROKER
.lock()
Expand Down Expand Up @@ -69,9 +71,6 @@ pub async fn run(
let rpc_recv_stream = rpc_reader.into_stream().fuse();
pin_mut!(rpc_recv_stream);

// Create channel to send messages to broker.
let mut ch_broker = BROKER.lock().await.create_sender();

trace!(target: "ebt-session", "Initiating EBT replication session with: {}", peer_ssb_id);

let mut session_initiated = false;
Expand All @@ -98,13 +97,26 @@ pub async fn run(
// ready, one will be selected in order of declaration.
let input = select_biased! {
_value = ch_terminate_fuse => {
break;
// Communicate stream termination to the session peer.
RpcInput::Message(
BrokerMessage::Ebt(
EbtEvent::TerminateSession(connection_id, session_role.to_owned())
)
)
},
packet = rpc_recv_stream.select_next_some() => {
let (req_no, packet) = packet;
RpcInput::Network(req_no, packet)
},
msg = ch_msg.next().fuse() => {
// Listen for a 'session concluded' event and terminate the
// replicator if the connection ID of the event matches the
// ID of this instance of the replicator.
if let Some(BrokerMessage::Ebt(EbtEvent::SessionConcluded(conn_id, _))) = msg {
if connection_id == conn_id {
break
}
}
// Listen for a 'session initiated' event.
if let Some(BrokerMessage::Ebt(EbtEvent::SessionInitiated(_connection_id, ref req_no, ref ssb_id, ref session_role))) = msg {
if peer_ssb_id == *ssb_id && *session_role == SessionRole::Responder {
Expand Down Expand Up @@ -188,5 +200,8 @@ pub async fn run(
))
.await?;

// Send 'terminated' signal to broker.
let _ = ch_terminated.send(Void {});

Ok(())
}

0 comments on commit 7a346d6

Please sign in to comment.