Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Make peer and blob range queries robust to existence of later key prefixes #99

Merged
merged 5 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion solar/src/actors/replication/ebt/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ impl EbtManager {
//
// This indicates that the local peer is acting as the session
// requester.
if self.sent_clocks.get(&connection_id).is_none() {
if !self.sent_clocks.contains_key(&connection_id) {
let local_clock = self.local_clock.to_owned();
ch_broker
.send(BrokerEvent::new(
Expand Down
119 changes: 115 additions & 4 deletions solar/src/storage/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,9 @@ impl KvStorage {
let mut list = Vec::new();

let db = self.db.as_ref().ok_or(Error::OptionIsNone)?;
let scan_key: &[u8] = &[PREFIX_BLOB];
for item in db.range(scan_key..) {
let scan_key_start: &[u8] = &[PREFIX_BLOB];
let scan_key_end: &[u8] = &[PREFIX_BLOB + 1];
for item in db.range(scan_key_start..scan_key_end) {
let (k, v) = item?;
let blob: BlobStatus = serde_cbor::from_slice(&v)?;
if !blob.retrieved {
Expand Down Expand Up @@ -226,8 +227,9 @@ impl KvStorage {
let mut peers = Vec::new();

// Use the generic peer prefix to return an iterator over all peers.
let scan_peer_key: &[u8] = &[PREFIX_PEER];
for peer in db.range(scan_peer_key..) {
let scan_peer_key_start: &[u8] = &[PREFIX_PEER];
let scan_peer_key_end: &[u8] = &[PREFIX_PEER + 1];
for peer in db.range(scan_peer_key_start..scan_peer_key_end) {
let (peer_key, _) = peer?;
// Drop the prefix byte and convert the remaining bytes to
// a string.
Expand Down Expand Up @@ -432,6 +434,68 @@ mod test {
Ok(())
}

#[async_std::test]
async fn test_peer_range_query() -> Result<()> {
let (keypair, kv) = initialise_keypair_and_kv()?;
// Get a list of all replicated peers and their latest sequence
// numbers. This list is expected to be empty because we never
// added any data to the database.
let peers = kv.get_peers().await?;
assert_eq!(peers.len(), 0);

// Create a post-type message.
let msg_content = TypedMessage::Post {
text: "A solar flare is an intense localized eruption of electromagnetic radiation."
.to_string(),
mentions: None,
};

// Lookup the value of the previous message. This will be `None`.
let last_msg = kv.get_latest_msg_val(&keypair.id)?;

// Sign the message content using the temporary keypair and value of
// the previous message.
let msg = MessageValue::sign(last_msg.as_ref(), &keypair, json!(msg_content))?;

// Append the signed message to the feed. Returns the sequence number
// of the appended message.
let _ = kv.append_feed(msg).await?;

// now that we have added a message, we should have one peer,
// which is the keypair we used to sign the message.
let peers = kv.get_peers().await?;
assert_eq!(peers.len(), 1);
assert_eq!(&peers.get(0).unwrap().0, &keypair.id);

let db = kv.db.as_ref().ok_or(Error::OptionIsNone)?;

// insert one key with PREFIX_PEER+1 as the first byte.
db.insert(
&vec![PREFIX_PEER + 1u8],
"this should not show up in the peers list because it's after the peers range"
.as_bytes()
.to_vec(),
)?;

// this should not have changed the peers list
let peers = kv.get_peers().await?;
assert_eq!(peers.len(), 1);

// do the same for PREFIX_PEER-1
db.insert(
&vec![PREFIX_PEER - 1u8],
"this should not show up in the peers list because it's before the peers range"
.as_bytes()
.to_vec(),
)?;

// this should not have changed the peers list
let peers = kv.get_peers().await?;
assert_eq!(peers.len(), 1);

Ok(())
}

// In reality this test covers more than just the append method.
// It tests multiple methods exposed by the kv database.
// The main reason for combining the tests is the cost of setting up
Expand Down Expand Up @@ -517,6 +581,53 @@ mod test {
Ok(())
}

#[async_std::test]
async fn test_blobs_range_query_when_peers_exist() -> Result<()> {
let (keypair, kv) = initialise_keypair_and_kv()?;
kv.set_blob(
"b1",
&BlobStatus {
retrieved: false,
users: ["u2".to_string()].to_vec(),
},
)?;

assert_eq!(kv.get_peers().await?.len(), 0);

assert_eq!(kv.get_pending_blobs().unwrap(), vec!["b1".to_string()]);

println!("Inserting a new message and thus peer");
let msg_content = TypedMessage::Post {
text: "A solar flare is an intense localized eruption of electromagnetic radiation."
.to_string(),
mentions: None,
};
// Passing None as the last message since we start from an empty feed
let msg = MessageValue::sign(None, &keypair, json!(msg_content))?;
let _ = kv.append_feed(msg).await?;

// now that we have added a message, we should have one peer,
// which is the keypair we used to sign the message.
let peers = kv.get_peers().await?;
assert_eq!(peers.len(), 1);

println!("Inserting a second blob");
kv.set_blob(
"b2",
&BlobStatus {
retrieved: false,
users: ["u7".to_string()].to_vec(),
},
)?;

assert_eq!(
kv.get_pending_blobs()?,
vec!["b1".to_string(), "b2".to_string()]
);

Ok(())
}

#[test]
fn test_blobs() -> Result<()> {
let kv = open_temporary_kv()?;
Expand Down
Loading