Skip to content

Commit

Permalink
remove redundant clock receipt
Browse files Browse the repository at this point in the history
  • Loading branch information
mycognosist committed Jan 11, 2024
1 parent 07e74e0 commit ed447ee
Showing 1 changed file with 23 additions and 40 deletions.
63 changes: 23 additions & 40 deletions solar/src/actors/muxrpc/ebt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ where
}
// Handle an incoming MUXRPC response.
RpcInput::Network(req_no, rpc::RecvMsg::RpcResponse(_type, res)) => {
self.recv_rpc_response(ch_broker, *req_no, res, peer_ssb_id)
.await
self.recv_rpc_response(ch_broker, *req_no, res).await
}
// Handle an incoming MUXRPC 'cancel stream' response.
RpcInput::Network(req_no, rpc::RecvMsg::CancelStreamResponse()) => {
Expand Down Expand Up @@ -188,55 +187,39 @@ where
Ok(false)
}

/// Process an incoming MUXRPC response. The response is expected to
/// contain a vector clock or an SSB message.
/// Process an incoming MUXRPC response.
/// The response is expected to contain an SSB message.
async fn recv_rpc_response(
&mut self,
ch_broker: &mut ChBrokerSend,
req_no: ReqNo,
res: &[u8],
peer_ssb_id: String,
) -> Result<bool> {
trace!(target: "ebt-handler", "Received RPC response: {}", req_no);

// Only handle the response if the associated request number is known
// to us, either because we sent or received the initiating replicate
// request.
if self.active_requests.contains_key(&req_no) {
// The response may be a vector clock (aka. notes) or an SSB message.
//
// Since there is no explicit way to determine which was received,
// we first attempt deserialization of a vector clock and move on
// to attempting message deserialization if that fails.
// First try to deserialize the response into a message value.
// If that fails, try to deserialize into a message KVT and then
// convert that into a message value. Return an error if that fails.
// This approach allows us to handle the unlikely event that
// messages are sent as KVTs and not simply values.
//
// TODO: Is matching on clock here redundant?
// We are already matching on `OtherRequest` in the handler.
if let Ok(clock) = serde_json::from_slice(res) {
ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
BrokerMessage::Ebt(EbtEvent::ReceivedClock(req_no, peer_ssb_id, clock)),
))
.await?;
} else {
// First try to deserialize the response into a message value.
// If that fails, try to deserialize into a message KVT and then
// convert that into a message value. Return an error if that fails.
// This approach allows us to handle the unlikely event that
// messages are sent as KVTs and not simply values.
//
// Validation of the message signature and fields is also performed
// as part of the call to `from_slice`.
let msg = match Message::from_slice(res) {
Ok(msg) => msg,
Err(_) => MessageKvt::from_slice(res)?.into_message()?,
};

ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
BrokerMessage::Ebt(EbtEvent::ReceivedMessage(msg)),
))
.await?;
}
// Validation of the message signature and fields is also performed
// as part of the call to `from_slice`.
let msg = match Message::from_slice(res) {
Ok(msg) => msg,
Err(_) => MessageKvt::from_slice(res)?.into_message()?,
};

ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
BrokerMessage::Ebt(EbtEvent::ReceivedMessage(msg)),
))
.await?;
}

Ok(false)
Expand Down

0 comments on commit ed447ee

Please sign in to comment.