Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Match on vector clock response and use negative request numbers #88

Merged
merged 2 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 40 additions & 22 deletions solar/src/actors/muxrpc/ebt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ where
}
// Handle an incoming MUXRPC response.
RpcInput::Network(req_no, rpc::RecvMsg::RpcResponse(_type, res)) => {
self.recv_rpc_response(ch_broker, *req_no, res).await
self.recv_rpc_response(ch_broker, *req_no, res, peer_ssb_id)
.await
}
// Handle an incoming MUXRPC 'cancel stream' response.
RpcInput::Network(req_no, rpc::RecvMsg::CancelStreamResponse()) => {
Expand All @@ -103,15 +104,17 @@ where
BrokerMessage::Ebt(EbtEvent::SendClock(req_no, clock)) => {
// Serialize the vector clock as a JSON string.
let json_clock = serde_json::to_string(&clock)?;
api.ebt_clock_res_send(*req_no, &json_clock).await?;
// The request number must be negative (response).
api.ebt_clock_res_send(-(*req_no), &json_clock).await?;

Ok(false)
}
BrokerMessage::Ebt(EbtEvent::SendMessage(req_no, ssb_id, msg)) => {
// Ensure the message is sent to the correct peer.
if peer_ssb_id == *ssb_id {
let json_msg = msg.to_string();
api.ebt_feed_res_send(*req_no, &json_msg).await?;
// The request number must be negative (response).
api.ebt_feed_res_send(-(*req_no), &json_msg).await?;
}

Ok(false)
Expand Down Expand Up @@ -188,38 +191,53 @@ where
}

/// Process an incoming MUXRPC response.
/// The response is expected to contain an SSB message.
/// The response is expected to contain a vector clock or an SSB message.
async fn recv_rpc_response(
&mut self,
ch_broker: &mut ChBrokerSend,
req_no: ReqNo,
res: &[u8],
peer_ssb_id: String,
) -> Result<bool> {
trace!(target: "ebt-handler", "Received RPC response: {}", req_no);

// Only handle the response if the associated request number is known
// to us, either because we sent or received the initiating replicate
// request.
if self.active_requests.contains_key(&req_no) {
// First try to deserialize the response into a message value.
// If that fails, try to deserialize into a message KVT and then
// convert that into a message value. Return an error if that fails.
// This approach allows us to handle the unlikely event that
// messages are sent as KVTs and not simply values.
// The response may be a vector clock (aka. notes) or an SSB message.
//
// Validation of the message signature and fields is also performed
// as part of the call to `from_slice`.
let msg = match Message::from_slice(res) {
Ok(msg) => msg,
Err(_) => MessageKvt::from_slice(res)?.into_message()?,
};

ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
BrokerMessage::Ebt(EbtEvent::ReceivedMessage(msg)),
))
.await?;
// Since there is no explicit way to determine which was received,
// we first attempt deserialization of a vector clock and move on
// to attempting message deserialization if that fails.
if let Ok(clock) = serde_json::from_slice(res) {
ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
BrokerMessage::Ebt(EbtEvent::ReceivedClock(req_no, peer_ssb_id, clock)),
))
.await?;
} else {
// First try to deserialize the response into a message value.
// If that fails, try to deserialize into a message KVT and then
// convert that into a message value. Return an error if that fails.
// This approach allows us to handle the unlikely event that
// messages are sent as KVTs and not simply values.
//
// Validation of the message signature and fields is also performed
// as part of the call to `from_slice`.
let msg = match Message::from_slice(res) {
Ok(msg) => msg,
Err(_) => MessageKvt::from_slice(res)?.into_message()?,
};

ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
BrokerMessage::Ebt(EbtEvent::ReceivedMessage(msg)),
))
.await?;
}
}

Ok(false)
Expand Down
6 changes: 2 additions & 4 deletions solar/src/actors/replication/ebt/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,10 +366,9 @@ impl EbtManager {
async fn handle_wait_for_session_request(&self, connection_data: ConnectionData) {
trace!(target: "ebt", "Waiting for EBT session request");

let session_role = SessionRole::Responder;
task::spawn(replicator::run(
connection_data,
session_role,
SessionRole::Responder,
self.session_wait_timeout,
));
}
Expand All @@ -387,10 +386,9 @@ impl EbtManager {
connection_data.peer_public_key.unwrap()
);

let session_role = SessionRole::Requester;
task::spawn(replicator::run(
connection_data,
session_role,
SessionRole::Requester,
self.session_wait_timeout,
));
}
Expand Down
Loading