Skip to content

Commit

Permalink
send wanted message to active session peers when local store is updated
Browse files Browse the repository at this point in the history
  • Loading branch information
mycognosist committed Feb 6, 2024
1 parent bbd77a2 commit fa76b12
Showing 1 changed file with 46 additions and 58 deletions.
104 changes: 46 additions & 58 deletions solar/src/actors/replication/ebt/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl Display for SessionRole {
#[derive(Debug)]
pub struct EbtManager {
/// Active EBT peer sessions.
active_sessions: HashMap<ConnectionId, (SsbId, SessionRole)>,
active_sessions: HashMap<ConnectionId, (SsbId, SessionRole, ReqNo)>,
/// Duration to wait before switching feed request to a different peer.
_feed_wait_timeout: u64,
/// The state of the replication loop.
Expand Down Expand Up @@ -158,7 +158,7 @@ impl EbtManager {

/// Retrieve either the local vector clock or the stored vector clock
/// for the peer represented by the given SSB ID.
fn _get_clock(&self, ssb_id: Option<&SsbId>) -> Option<VectorClock> {
fn get_clock(&self, ssb_id: Option<&SsbId>) -> Option<VectorClock> {
match ssb_id {
Some(id) => self.peer_clocks.get(id).cloned(),
None => Some(self.local_clock.to_owned()),
Expand Down Expand Up @@ -201,6 +201,7 @@ impl EbtManager {
let mut clock_file = File::open(&clock_entry.path())?;
let mut clock_file_contents = String::new();
clock_file.read_to_string(&mut clock_file_contents)?;
// TODO: Match on error and delete file.
let clock: VectorClock = serde_json::from_str(&clock_file_contents)?;

// Set the vector clock in memory.
Expand Down Expand Up @@ -233,21 +234,21 @@ impl EbtManager {
}

/// Retrieve the stored vector clock for the first peer, check for the
/// second peer in the vector clock and return the value of the receive
/// flag.
fn _is_receiving(&self, peer_ssb_id: SsbId, ssb_id: SsbId) -> Result<bool> {
/// second peer in the vector clock. If the receive flag is set to true,
/// return the decoded sequence number.
fn is_receiving(&self, peer_ssb_id: &SsbId, ssb_id: &SsbId) -> Result<Option<u64>> {
// Retrieve the vector clock for the first peer.
if let Some(clock) = self._get_clock(Some(&peer_ssb_id)) {
if let Some(clock) = self.get_clock(Some(peer_ssb_id)) {
// Check if the second peer is represented in the vector clock.
if let Some(encoded_seq_no) = clock.get(&ssb_id) {
if let Some(encoded_seq_no) = clock.get(ssb_id) {
// Check if the receive flag is true.
if let (_replicate_flag, Some(true), _seq) = clock::decode(*encoded_seq_no)? {
return Ok(true);
if let (_replicate_flag, Some(true), seq) = clock::decode(*encoded_seq_no)? {
return Ok(seq);
}
}
}

Ok(false)
Ok(None)
}

/// Get the sequence number of the latest message sent to the given
Expand Down Expand Up @@ -282,30 +283,27 @@ impl EbtManager {
}

/// Register a new EBT session for the given peer.
//fn register_session(&mut self, peer_ssb_id: &SsbId, req_no: ReqNo, session_role: SessionRole) {
fn register_session(
&mut self,
connection_id: ConnectionId,
peer_ssb_id: &SsbId,
peer_ssb_id: SsbId,
session_role: SessionRole,
req_no: ReqNo,
) {
self.active_sessions
.insert(connection_id, (peer_ssb_id.to_owned(), session_role));

trace!(target: "ebt-session", "Registered new EBT session for connection {} with {}", connection_id, peer_ssb_id);
self.active_sessions
.insert(connection_id, (peer_ssb_id, session_role, req_no));
}

/// Remove the given peer from the list of active session.
fn remove_session(&mut self, connection_id: ConnectionId) {
// TODO: Clean-up the string story so we're not sprinkling additional
// allocations everywhere.
let _ = self.active_sessions.remove(&connection_id);
}

/// Return the role of the local peer for the active session (represented
/// by connection ID).
fn session_role(&self, connection_id: ConnectionId) -> Option<SessionRole> {
if let Some((_ssb_id, session_role)) = self.active_sessions.get(&connection_id) {
if let Some((_ssb_id, session_role, _req_no)) = self.active_sessions.get(&connection_id) {
Some(session_role.to_owned())
} else {
None
Expand Down Expand Up @@ -431,7 +429,7 @@ impl EbtManager {
) -> Result<()> {
trace!(target: "ebt-replication", "Initiated EBT session with {} as {}", peer_ssb_id, session_role);

self.register_session(connection_id, &peer_ssb_id, session_role.to_owned());
self.register_session(connection_id, peer_ssb_id, session_role.to_owned(), req_no);
let local_clock = self.local_clock.to_owned();

match session_role {
Expand Down Expand Up @@ -619,46 +617,41 @@ impl EbtManager {
Ok(())
}

/*
TODO: Reintroduce this when we figure out the connection ID / request ID
association.
/// Check if any active session peers are interested in the updated feed.
/// If so, send them the appended message.
async fn handle_local_store_updated(&self, ssb_id: SsbId, msg_seq: u64) -> Result<()> {
// TODO: This is all radically inefficient, but it's a start.

/// Look up the latest sequence number for the updated feed, encode it as
/// the single entry of a vector clock and send that to any active session
/// peers.
async fn handle_local_store_updated(&self, ssb_id: SsbId) -> Result<()> {
// Iterate over all active EBT sessions.
for (_peer_ssb_id, (req_no, _session_role)) in self.active_sessions.iter() {
// Look up the latest sequence for the given ID.
if let Some(seq) = KV_STORE.read().await.get_latest_seq(&ssb_id)? {
// Encode the replicate flag, receive flag and sequence.
let encoded_value: EncodedClockValue = clock::encode(true, Some(true), Some(seq))?;
// Update the entry for `ssb_id` in the local vector clock.
if let Some(mut local_clock) = self.get_clock(None) {
local_clock.insert(ssb_id.to_owned(), encoded_value);
for (connection_id, (peer_ssb_id, session_role, req_no)) in self.active_sessions.iter() {
// Check if `peer_ssb_id` wants to replicate `ssb_id`.
if let Some(seq) = self.is_receiving(peer_ssb_id, &ssb_id)? {
if msg_seq > seq {
// Retrieve the message from the key-value store.
if let Some(msg_kvt) = KV_STORE.read().await.get_msg_kvt(&ssb_id, msg_seq)? {
// Create channel to send messages to broker.
let mut ch_broker = BROKER.lock().await.create_sender();

// Send the single-entry vector clock to the active session.
ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
BrokerMessage::Ebt(EbtEvent::SendMessage(
*connection_id,
*req_no,
peer_ssb_id.to_owned(),
msg_kvt.value,
session_role.to_owned(),
)),
))
.await?;
}
}
// Create a vector clock with a single entry.
let mut updated_clock = HashMap::new();
updated_clock.insert(ssb_id.to_owned(), encoded_value);
// Create channel to send messages to broker.
let mut ch_broker = BROKER.lock().await.create_sender();
// Send the single-entry vector clock to the active session.
ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
BrokerMessage::Ebt(EbtEvent::SendClock(*req_no, updated_clock)),
))
.await?;
}
}

Ok(())
}
*/

async fn handle_session_concluded(&mut self, connection_id: ConnectionId, peer_ssb_id: SsbId) {
trace!(target: "ebt-replication", "Session concluded for connection {} with {}", connection_id, peer_ssb_id);
Expand Down Expand Up @@ -814,19 +807,14 @@ impl EbtManager {
}
}
}
} else if let Some(BrokerMessage::StoreKv(StoreKvEvent(_ssb_id))) = msg {
} else if let Some(BrokerMessage::StoreKv(StoreKvEvent((ssb_id, seq)))) = msg {
debug!("Received KV store event from broker");

/*
TODO: Reintroduce this later, once Manyverse restart
issue is solved.
// Respond to a key-value store state change for the given peer.
// This is triggered when a new message is appended to the local feed.
if let Err(err) = self.handle_local_store_updated(ssb_id).await {
if let Err(err) = self.handle_local_store_updated(ssb_id, seq).await {
error!("Error while handling 'local store updated' event: {}", err)
}
*/
}
}
}
Expand Down

0 comments on commit fa76b12

Please sign in to comment.