diff --git a/solar/src/actors/muxrpc/ebt.rs b/solar/src/actors/muxrpc/ebt.rs index 13ab8fd..e2deec4 100644 --- a/solar/src/actors/muxrpc/ebt.rs +++ b/solar/src/actors/muxrpc/ebt.rs @@ -88,7 +88,8 @@ where } // Handle an incoming MUXRPC response. RpcInput::Network(req_no, rpc::RecvMsg::RpcResponse(_type, res)) => { - self.recv_rpc_response(ch_broker, *req_no, res).await + self.recv_rpc_response(ch_broker, *req_no, res, peer_ssb_id) + .await } // Handle an incoming MUXRPC 'cancel stream' response. RpcInput::Network(req_no, rpc::RecvMsg::CancelStreamResponse()) => { @@ -103,7 +104,8 @@ where BrokerMessage::Ebt(EbtEvent::SendClock(req_no, clock)) => { // Serialize the vector clock as a JSON string. let json_clock = serde_json::to_string(&clock)?; - api.ebt_clock_res_send(*req_no, &json_clock).await?; + // The request number must be negative (response). + api.ebt_clock_res_send(-(*req_no), &json_clock).await?; Ok(false) } @@ -111,7 +113,8 @@ where // Ensure the message is sent to the correct peer. if peer_ssb_id == *ssb_id { let json_msg = msg.to_string(); - api.ebt_feed_res_send(*req_no, &json_msg).await?; + // The request number must be negative (response). + api.ebt_feed_res_send(-(*req_no), &json_msg).await?; } Ok(false) @@ -188,12 +191,13 @@ where } /// Process an incoming MUXRPC response. - /// The response is expected to contain an SSB message. + /// The response is expected to contain a vector clock or an SSB message. async fn recv_rpc_response( &mut self, ch_broker: &mut ChBrokerSend, req_no: ReqNo, res: &[u8], + peer_ssb_id: String, ) -> Result { trace!(target: "ebt-handler", "Received RPC response: {}", req_no); @@ -201,25 +205,39 @@ where // to us, either because we sent or received the initiating replicate // request. if self.active_requests.contains_key(&req_no) { - // First try to deserialize the response into a message value. - // If that fails, try to deserialize into a message KVT and then - // convert that into a message value. Return an error if that fails. - // This approach allows us to handle the unlikely event that - // messages are sent as KVTs and not simply values. + // The response may be a vector clock (aka. notes) or an SSB message. // - // Validation of the message signature and fields is also performed - // as part of the call to `from_slice`. - let msg = match Message::from_slice(res) { - Ok(msg) => msg, - Err(_) => MessageKvt::from_slice(res)?.into_message()?, - }; - - ch_broker - .send(BrokerEvent::new( - Destination::Broadcast, - BrokerMessage::Ebt(EbtEvent::ReceivedMessage(msg)), - )) - .await?; + // Since there is no explicit way to determine which was received, + // we first attempt deserialization of a vector clock and move on + // to attempting message deserialization if that fails. + if let Ok(clock) = serde_json::from_slice(res) { + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Ebt(EbtEvent::ReceivedClock(req_no, peer_ssb_id, clock)), + )) + .await?; + } else { + // First try to deserialize the response into a message value. + // If that fails, try to deserialize into a message KVT and then + // convert that into a message value. Return an error if that fails. + // This approach allows us to handle the unlikely event that + // messages are sent as KVTs and not simply values. + // + // Validation of the message signature and fields is also performed + // as part of the call to `from_slice`. + let msg = match Message::from_slice(res) { + Ok(msg) => msg, + Err(_) => MessageKvt::from_slice(res)?.into_message()?, + }; + + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Ebt(EbtEvent::ReceivedMessage(msg)), + )) + .await?; + } } Ok(false) diff --git a/solar/src/actors/replication/ebt/manager.rs b/solar/src/actors/replication/ebt/manager.rs index 37f8496..c2d1167 100644 --- a/solar/src/actors/replication/ebt/manager.rs +++ b/solar/src/actors/replication/ebt/manager.rs @@ -366,10 +366,9 @@ impl EbtManager { async fn handle_wait_for_session_request(&self, connection_data: ConnectionData) { trace!(target: "ebt", "Waiting for EBT session request"); - let session_role = SessionRole::Responder; task::spawn(replicator::run( connection_data, - session_role, + SessionRole::Responder, self.session_wait_timeout, )); } @@ -387,10 +386,9 @@ impl EbtManager { connection_data.peer_public_key.unwrap() ); - let session_role = SessionRole::Requester; task::spawn(replicator::run( connection_data, - session_role, + SessionRole::Requester, self.session_wait_timeout, )); }