Skip to content

Commit

Permalink
Merge pull request #85 from mycognosist/respond_kv_event_ebt
Browse files Browse the repository at this point in the history
Send partial vector clock when local KV store is updated
  • Loading branch information
mycognosist authored Jan 11, 2024
2 parents 0897be2 + ed447ee commit fd4f76d
Show file tree
Hide file tree
Showing 4 changed files with 247 additions and 149 deletions.
122 changes: 25 additions & 97 deletions solar/src/actors/muxrpc/ebt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ where
peer_ssb_id: String,
active_request: Option<ReqNo>,
) -> Result<bool> {
trace!(target: "ebt-handler", "Received MUXRPC input: {:?}", op);
trace!(target: "muxrpc-ebt-handler", "Received MUXRPC input: {:?}", op);

// An outbound EBT replicate request was made before the handler was
// called; add it to the map of active requests.
Expand Down Expand Up @@ -88,8 +88,7 @@ where
}
// Handle an incoming MUXRPC response.
RpcInput::Network(req_no, rpc::RecvMsg::RpcResponse(_type, res)) => {
self.recv_rpc_response(api, ch_broker, *req_no, res, peer_ssb_id)
.await
self.recv_rpc_response(ch_broker, *req_no, res).await
}
// Handle an incoming MUXRPC 'cancel stream' response.
RpcInput::Network(req_no, rpc::RecvMsg::CancelStreamResponse()) => {
Expand Down Expand Up @@ -119,21 +118,6 @@ where
}
_ => Ok(false),
},
/*
RpcInput::Message(msg) => {
if let Some(kv_event) = msg.downcast_ref::<StoreKvEvent>() {
match kv_event {
// Notification from the key-value store indicating that
// a new message has just been appended to the feed
// identified by `id`.
StoreKvEvent::IdChanged(id) => {
return self.recv_storageevent_idchanged(api, id).await
}
}
}
Ok(false)
}
*/
_ => Ok(false),
}
}
Expand Down Expand Up @@ -203,56 +187,39 @@ where
Ok(false)
}

/// Process an incoming MUXRPC response. The response is expected to
/// contain a vector clock or an SSB message.
/// Process an incoming MUXRPC response.
/// The response is expected to contain an SSB message.
async fn recv_rpc_response(
&mut self,
_api: &mut ApiCaller<W>,
ch_broker: &mut ChBrokerSend,
req_no: ReqNo,
res: &[u8],
peer_ssb_id: String,
) -> Result<bool> {
trace!(target: "ebt-handler", "Received RPC response: {}", req_no);

// Only handle the response if the associated request number is known
// to us, either because we sent or received the initiating replicate
// request.
if self.active_requests.contains_key(&req_no) {
// The response may be a vector clock (aka. notes) or an SSB message.
//
// Since there is no explicit way to determine which was received,
// we first attempt deserialization of a vector clock and move on
// to attempting message deserialization if that fails.
// First try to deserialize the response into a message value.
// If that fails, try to deserialize into a message KVT and then
// convert that into a message value. Return an error if that fails.
// This approach allows us to handle the unlikely event that
// messages are sent as KVTs and not simply values.
//
// TODO: Is matching on clock here redundant?
// We are already matching on `OtherRequest` in the handler.
if let Ok(clock) = serde_json::from_slice(res) {
ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
BrokerMessage::Ebt(EbtEvent::ReceivedClock(req_no, peer_ssb_id, clock)),
))
.await?;
} else {
// First try to deserialize the response into a message value.
// If that fails, try to deserialize into a message KVT and then
// convert that into a message value. Return an error if that fails.
// This approach allows us to handle the unlikely event that
// messages are sent as KVTs and not simply values.
//
// Validation of the message signature and fields is also performed
// as part of the call to `from_slice`.
let msg = match Message::from_slice(res) {
Ok(msg) => msg,
Err(_) => MessageKvt::from_slice(res)?.into_message()?,
};

ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
BrokerMessage::Ebt(EbtEvent::ReceivedMessage(msg)),
))
.await?;
}
// Validation of the message signature and fields is also performed
// as part of the call to `from_slice`.
let msg = match Message::from_slice(res) {
Ok(msg) => msg,
Err(_) => MessageKvt::from_slice(res)?.into_message()?,
};

ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
BrokerMessage::Ebt(EbtEvent::ReceivedMessage(msg)),
))
.await?;
}

Ok(false)
Expand All @@ -263,6 +230,7 @@ where
async fn recv_cancelstream(&mut self, api: &mut ApiCaller<W>, req_no: ReqNo) -> Result<bool> {
api.rpc().send_stream_eof(-req_no).await?;
self.active_requests.remove(&req_no);

Ok(true)
}

Expand All @@ -277,22 +245,6 @@ where
}

/*
/// Extract blob references from post-type messages.
fn extract_blob_refs(&mut self, msg: &Message) -> Vec<String> {
let mut refs = Vec::new();
let msg = serde_json::from_value(msg.content().clone());
if let Ok(dto::content::TypedMessage::Post { text, .. }) = msg {
for cap in BLOB_REGEX.captures_iter(&text) {
let key = cap.get(0).unwrap().as_str().to_owned();
refs.push(key);
}
}
refs
}
/// Process an incoming MUXRPC response. The response is expected to
/// contain an SSB message.
async fn recv_rpc_response(
Expand Down Expand Up @@ -365,28 +317,4 @@ where
}
}
*/

/*
/// Respond to a key-value store state change for the given peer.
/// This is triggered when a new message is appended to the local feed.
/// Remove the peer from the list of active streams, send the requested
/// messages from the local feed to the peer and then reinsert the public
/// key of the peer to the list of active streams.
async fn recv_storageevent_idchanged(
&mut self,
api: &mut ApiCaller<W>,
id: &str,
) -> Result<bool> {
// Attempt to remove the peer from the list of active streams.
if let Some(mut req) = self.reqs.remove(id) {
// Send local messages to the peer.
self.send_history(api, &mut req).await?;
// Reinsert the peer into the list of active streams.
self.reqs.insert(id.to_string(), req);
Ok(true)
} else {
Ok(false)
}
}
*/
}
12 changes: 6 additions & 6 deletions solar/src/actors/muxrpc/history_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,11 @@ where
self.recv_error_response(api, *req_no, err).await
}
// Handle a broker message.
RpcInput::Message(BrokerMessage::StoreKv(StoreKvEvent(id))) => {
RpcInput::Message(BrokerMessage::StoreKv(StoreKvEvent(ssb_id))) => {
// Notification from the key-value store indicating that
// a new message has just been appended to the feed
// identified by `id`.
return self.recv_storageevent_idchanged(api, id).await;
// identified by `ssb_id`.
return self.recv_storageevent_idchanged(api, ssb_id).await;
}
// Handle a timer event.
RpcInput::Timer => self.on_timer(api).await,
Expand Down Expand Up @@ -315,14 +315,14 @@ where
async fn recv_storageevent_idchanged(
&mut self,
api: &mut ApiCaller<W>,
id: &str,
ssb_id: &str,
) -> Result<bool> {
// Attempt to remove the peer from the list of active streams.
if let Some(mut req) = self.reqs.remove(id) {
if let Some(mut req) = self.reqs.remove(ssb_id) {
// Send local messages to the peer.
self.send_history(api, &mut req).await?;
// Reinsert the peer into the list of active streams.
self.reqs.insert(id.to_string(), req);
self.reqs.insert(ssb_id.to_string(), req);
Ok(true)
} else {
Ok(false)
Expand Down
Loading

0 comments on commit fd4f76d

Please sign in to comment.