Skip to content

Commit

Permalink
Merge pull request #92 from mycognosist/live_msg_sending
Browse files Browse the repository at this point in the history
Introduce live message sending for EBT sessions
  • Loading branch information
mycognosist authored Feb 6, 2024
2 parents 5f9c961 + fa76b12 commit 5ce58fd
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 65 deletions.
2 changes: 1 addition & 1 deletion solar/src/actors/muxrpc/history_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ where
self.recv_error_response(api, *req_no, err).await
}
// Handle a broker message.
RpcInput::Message(BrokerMessage::StoreKv(StoreKvEvent(ssb_id))) => {
RpcInput::Message(BrokerMessage::StoreKv(StoreKvEvent((ssb_id, _seq)))) => {
// Notification from the key-value store indicating that
// a new message has just been appended to the feed
// identified by `ssb_id`.
Expand Down
104 changes: 46 additions & 58 deletions solar/src/actors/replication/ebt/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl Display for SessionRole {
#[derive(Debug)]
pub struct EbtManager {
/// Active EBT peer sessions.
active_sessions: HashMap<ConnectionId, (SsbId, SessionRole)>,
active_sessions: HashMap<ConnectionId, (SsbId, SessionRole, ReqNo)>,
/// Duration to wait before switching feed request to a different peer.
_feed_wait_timeout: u64,
/// The state of the replication loop.
Expand Down Expand Up @@ -158,7 +158,7 @@ impl EbtManager {

/// Retrieve either the local vector clock or the stored vector clock
/// for the peer represented by the given SSB ID.
fn _get_clock(&self, ssb_id: Option<&SsbId>) -> Option<VectorClock> {
fn get_clock(&self, ssb_id: Option<&SsbId>) -> Option<VectorClock> {
match ssb_id {
Some(id) => self.peer_clocks.get(id).cloned(),
None => Some(self.local_clock.to_owned()),
Expand Down Expand Up @@ -201,6 +201,7 @@ impl EbtManager {
let mut clock_file = File::open(&clock_entry.path())?;
let mut clock_file_contents = String::new();
clock_file.read_to_string(&mut clock_file_contents)?;
// TODO: Match on error and delete file.
let clock: VectorClock = serde_json::from_str(&clock_file_contents)?;

// Set the vector clock in memory.
Expand Down Expand Up @@ -233,21 +234,21 @@ impl EbtManager {
}

/// Retrieve the stored vector clock for the first peer, check for the
/// second peer in the vector clock and return the value of the receive
/// flag.
fn _is_receiving(&self, peer_ssb_id: SsbId, ssb_id: SsbId) -> Result<bool> {
/// second peer in the vector clock. If the receive flag is set to true,
/// return the decoded sequence number.
fn is_receiving(&self, peer_ssb_id: &SsbId, ssb_id: &SsbId) -> Result<Option<u64>> {
// Retrieve the vector clock for the first peer.
if let Some(clock) = self._get_clock(Some(&peer_ssb_id)) {
if let Some(clock) = self.get_clock(Some(peer_ssb_id)) {
// Check if the second peer is represented in the vector clock.
if let Some(encoded_seq_no) = clock.get(&ssb_id) {
if let Some(encoded_seq_no) = clock.get(ssb_id) {
// Check if the receive flag is true.
if let (_replicate_flag, Some(true), _seq) = clock::decode(*encoded_seq_no)? {
return Ok(true);
if let (_replicate_flag, Some(true), seq) = clock::decode(*encoded_seq_no)? {
return Ok(seq);
}
}
}

Ok(false)
Ok(None)
}

/// Get the sequence number of the latest message sent to the given
Expand Down Expand Up @@ -282,30 +283,27 @@ impl EbtManager {
}

/// Register a new EBT session for the given peer.
//fn register_session(&mut self, peer_ssb_id: &SsbId, req_no: ReqNo, session_role: SessionRole) {
fn register_session(
&mut self,
connection_id: ConnectionId,
peer_ssb_id: &SsbId,
peer_ssb_id: SsbId,
session_role: SessionRole,
req_no: ReqNo,
) {
self.active_sessions
.insert(connection_id, (peer_ssb_id.to_owned(), session_role));

trace!(target: "ebt-session", "Registered new EBT session for connection {} with {}", connection_id, peer_ssb_id);
self.active_sessions
.insert(connection_id, (peer_ssb_id, session_role, req_no));
}

/// Remove the given peer from the list of active session.
fn remove_session(&mut self, connection_id: ConnectionId) {
// TODO: Clean-up the string story so we're not sprinkling additional
// allocations everywhere.
let _ = self.active_sessions.remove(&connection_id);
}

/// Return the role of the local peer for the active session (represented
/// by connection ID).
fn session_role(&self, connection_id: ConnectionId) -> Option<SessionRole> {
if let Some((_ssb_id, session_role)) = self.active_sessions.get(&connection_id) {
if let Some((_ssb_id, session_role, _req_no)) = self.active_sessions.get(&connection_id) {
Some(session_role.to_owned())
} else {
None
Expand Down Expand Up @@ -431,7 +429,7 @@ impl EbtManager {
) -> Result<()> {
trace!(target: "ebt-replication", "Initiated EBT session with {} as {}", peer_ssb_id, session_role);

self.register_session(connection_id, &peer_ssb_id, session_role.to_owned());
self.register_session(connection_id, peer_ssb_id, session_role.to_owned(), req_no);
let local_clock = self.local_clock.to_owned();

match session_role {
Expand Down Expand Up @@ -619,46 +617,41 @@ impl EbtManager {
Ok(())
}

/*
TODO: Reintroduce this when we figure out the connection ID / request ID
association.
/// Check if any active session peers are interested in the updated feed.
/// If so, send them the appended message.
async fn handle_local_store_updated(&self, ssb_id: SsbId, msg_seq: u64) -> Result<()> {
// TODO: This is all radically inefficient, but it's a start.

/// Look up the latest sequence number for the updated feed, encode it as
/// the single entry of a vector clock and send that to any active session
/// peers.
async fn handle_local_store_updated(&self, ssb_id: SsbId) -> Result<()> {
// Iterate over all active EBT sessions.
for (_peer_ssb_id, (req_no, _session_role)) in self.active_sessions.iter() {
// Look up the latest sequence for the given ID.
if let Some(seq) = KV_STORE.read().await.get_latest_seq(&ssb_id)? {
// Encode the replicate flag, receive flag and sequence.
let encoded_value: EncodedClockValue = clock::encode(true, Some(true), Some(seq))?;
// Update the entry for `ssb_id` in the local vector clock.
if let Some(mut local_clock) = self.get_clock(None) {
local_clock.insert(ssb_id.to_owned(), encoded_value);
for (connection_id, (peer_ssb_id, session_role, req_no)) in self.active_sessions.iter() {
// Check if `peer_ssb_id` wants to replicate `ssb_id`.
if let Some(seq) = self.is_receiving(peer_ssb_id, &ssb_id)? {
if msg_seq > seq {
// Retrieve the message from the key-value store.
if let Some(msg_kvt) = KV_STORE.read().await.get_msg_kvt(&ssb_id, msg_seq)? {
// Create channel to send messages to broker.
let mut ch_broker = BROKER.lock().await.create_sender();

// Send the single-entry vector clock to the active session.
ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
BrokerMessage::Ebt(EbtEvent::SendMessage(
*connection_id,
*req_no,
peer_ssb_id.to_owned(),
msg_kvt.value,
session_role.to_owned(),
)),
))
.await?;
}
}
// Create a vector clock with a single entry.
let mut updated_clock = HashMap::new();
updated_clock.insert(ssb_id.to_owned(), encoded_value);
// Create channel to send messages to broker.
let mut ch_broker = BROKER.lock().await.create_sender();
// Send the single-entry vector clock to the active session.
ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
BrokerMessage::Ebt(EbtEvent::SendClock(*req_no, updated_clock)),
))
.await?;
}
}

Ok(())
}
*/

async fn handle_session_concluded(&mut self, connection_id: ConnectionId, peer_ssb_id: SsbId) {
trace!(target: "ebt-replication", "Session concluded for connection {} with {}", connection_id, peer_ssb_id);
Expand Down Expand Up @@ -814,19 +807,14 @@ impl EbtManager {
}
}
}
} else if let Some(BrokerMessage::StoreKv(StoreKvEvent(_ssb_id))) = msg {
} else if let Some(BrokerMessage::StoreKv(StoreKvEvent((ssb_id, seq)))) = msg {
debug!("Received KV store event from broker");

/*
TODO: Reintroduce this later, once Manyverse restart
issue is solved.
// Respond to a key-value store state change for the given peer.
// This is triggered when a new message is appended to the local feed.
if let Err(err) = self.handle_local_store_updated(ssb_id).await {
if let Err(err) = self.handle_local_store_updated(ssb_id, seq).await {
error!("Error while handling 'local store updated' event: {}", err)
}
*/
}
}
}
Expand Down
9 changes: 3 additions & 6 deletions solar/src/storage/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,9 @@ const PREFIX_BLOB: u8 = 3u8;
/// Prefix for a key to a peer.
const PREFIX_PEER: u8 = 4u8;

/// The feed belonging to the given SSB ID has changed
/// (ie. a new message has been appended to the feed).
///
/// The JSON value of the appended message is included.
/// A new message has been appended to feed belonging to the given SSB ID.
#[derive(Debug, Clone)]
pub struct StoreKvEvent(pub String);
pub struct StoreKvEvent(pub (String, u64));

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BlobStatus {
Expand Down Expand Up @@ -287,7 +284,7 @@ impl KvStorage {
// key has been updated.
let broker_msg = BrokerEvent::new(
Destination::Broadcast,
BrokerMessage::StoreKv(StoreKvEvent(author)),
BrokerMessage::StoreKv(StoreKvEvent((author, seq_num))),
);

// Matching on the error here (instead of unwrapping) allows us to
Expand Down

0 comments on commit 5ce58fd

Please sign in to comment.