diff --git a/solar/src/actors/replication/ebt/manager.rs b/solar/src/actors/replication/ebt/manager.rs index 64a94d8..5b0b8f4 100644 --- a/solar/src/actors/replication/ebt/manager.rs +++ b/solar/src/actors/replication/ebt/manager.rs @@ -510,7 +510,7 @@ impl EbtManager { // // This indicates that the local peer is acting as the session // requester. - if self.sent_clocks.get(&connection_id).is_none() { + if !self.sent_clocks.contains_key(&connection_id) { let local_clock = self.local_clock.to_owned(); ch_broker .send(BrokerEvent::new( diff --git a/solar/src/storage/kv.rs b/solar/src/storage/kv.rs index 538c9cc..824c473 100644 --- a/solar/src/storage/kv.rs +++ b/solar/src/storage/kv.rs @@ -136,8 +136,9 @@ impl KvStorage { let mut list = Vec::new(); let db = self.db.as_ref().ok_or(Error::OptionIsNone)?; - let scan_key: &[u8] = &[PREFIX_BLOB]; - for item in db.range(scan_key..) { + let scan_key_start: &[u8] = &[PREFIX_BLOB]; + let scan_key_end: &[u8] = &[PREFIX_BLOB + 1]; + for item in db.range(scan_key_start..scan_key_end) { let (k, v) = item?; let blob: BlobStatus = serde_cbor::from_slice(&v)?; if !blob.retrieved { @@ -226,8 +227,9 @@ impl KvStorage { let mut peers = Vec::new(); // Use the generic peer prefix to return an iterator over all peers. - let scan_peer_key: &[u8] = &[PREFIX_PEER]; - for peer in db.range(scan_peer_key..) { + let scan_peer_key_start: &[u8] = &[PREFIX_PEER]; + let scan_peer_key_end: &[u8] = &[PREFIX_PEER + 1]; + for peer in db.range(scan_peer_key_start..scan_peer_key_end) { let (peer_key, _) = peer?; // Drop the prefix byte and convert the remaining bytes to // a string. @@ -432,6 +434,68 @@ mod test { Ok(()) } + #[async_std::test] + async fn test_peer_range_query() -> Result<()> { + let (keypair, kv) = initialise_keypair_and_kv()?; + // Get a list of all replicated peers and their latest sequence + // numbers. This list is expected to be empty because we never + // added any data to the database. + let peers = kv.get_peers().await?; + assert_eq!(peers.len(), 0); + + // Create a post-type message. + let msg_content = TypedMessage::Post { + text: "A solar flare is an intense localized eruption of electromagnetic radiation." + .to_string(), + mentions: None, + }; + + // Lookup the value of the previous message. This will be `None`. + let last_msg = kv.get_latest_msg_val(&keypair.id)?; + + // Sign the message content using the temporary keypair and value of + // the previous message. + let msg = MessageValue::sign(last_msg.as_ref(), &keypair, json!(msg_content))?; + + // Append the signed message to the feed. Returns the sequence number + // of the appended message. + let _ = kv.append_feed(msg).await?; + + // now that we have added a message, we should have one peer, + // which is the keypair we used to sign the message. + let peers = kv.get_peers().await?; + assert_eq!(peers.len(), 1); + assert_eq!(&peers.get(0).unwrap().0, &keypair.id); + + let db = kv.db.as_ref().ok_or(Error::OptionIsNone)?; + + // insert one key with PREFIX_PEER+1 as the first byte. + db.insert( + &vec![PREFIX_PEER + 1u8], + "this should not show up in the peers list because it's after the peers range" + .as_bytes() + .to_vec(), + )?; + + // this should not have changed the peers list + let peers = kv.get_peers().await?; + assert_eq!(peers.len(), 1); + + // do the same for PREFIX_PEER-1 + db.insert( + &vec![PREFIX_PEER - 1u8], + "this should not show up in the peers list because it's before the peers range" + .as_bytes() + .to_vec(), + )?; + + // this should not have changed the peers list + let peers = kv.get_peers().await?; + assert_eq!(peers.len(), 1); + + Ok(()) + } + // In reality this test covers more than just the append method. // It tests multiple methods exposed by the kv database. // The main reason for combining the tests is the cost of setting up @@ -517,6 +581,53 @@ mod test { Ok(()) } + #[async_std::test] + async fn test_blobs_range_query_when_peers_exist() -> Result<()> { + let (keypair, kv) = initialise_keypair_and_kv()?; + kv.set_blob( + "b1", + &BlobStatus { + retrieved: false, + users: ["u2".to_string()].to_vec(), + }, + )?; + + assert_eq!(kv.get_peers().await?.len(), 0); + + assert_eq!(kv.get_pending_blobs().unwrap(), vec!["b1".to_string()]); + + println!("Inserting a new message and thus peer"); + let msg_content = TypedMessage::Post { + text: "A solar flare is an intense localized eruption of electromagnetic radiation." + .to_string(), + mentions: None, + }; + // Passing None as the last message since we start from an empty feed + let msg = MessageValue::sign(None, &keypair, json!(msg_content))?; + let _ = kv.append_feed(msg).await?; + + // now that we have added a message, we should have one peer, + // which is the keypair we used to sign the message. + let peers = kv.get_peers().await?; + assert_eq!(peers.len(), 1); + + println!("Inserting a second blob"); + kv.set_blob( + "b2", + &BlobStatus { + retrieved: false, + users: ["u7".to_string()].to_vec(), + }, + )?; + + assert_eq!( + kv.get_pending_blobs()?, + vec!["b1".to_string(), "b2".to_string()] + ); + + Ok(()) + } + #[test] fn test_blobs() -> Result<()> { let kv = open_temporary_kv()?;