Skip to content

Commit

Permalink
refactor(metadata): improve access patterns
Browse files Browse the repository at this point in the history
- Makes use of more of dashmap inbuilt functions.
- Also includes RegisterStorage in used space calcs.
  • Loading branch information
oetyng committed Jul 5, 2021
1 parent 8464d81 commit b0ff963
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 97 deletions.
1 change: 0 additions & 1 deletion src/dbs/event_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ where

#[cfg(test)]
mod test {

use super::EventStore;
use crate::node::Result;
use crate::types::Token;
Expand Down
86 changes: 52 additions & 34 deletions src/node/metadata/adult_liveness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,26 +70,38 @@ impl AdultLiveness {
}

pub fn retain_members_only(&self, current_members: BTreeSet<XorName>) {
let old_members = self
let all_keys: Vec<_> = self
.closest_adults
.iter()
.map(|entry| *entry.key())
.collect::<Vec<_>>();
for name in old_members {
if !current_members.contains(&name) {
let _ = self.pending_ops.remove(&name);
let _ = self.closest_adults.remove(&name);
let message_ids = self
.ops
.iter()
.map(|entry| *entry.key())
.collect::<Vec<_>>();
// TODO(after T4): For write operations perhaps we need to write it to a different Adult
for msg_id in message_ids {
self.remove_target(&msg_id, &name);
.collect();

let removed_members = all_keys
.iter()
.filter_map(|key| {
if !current_members.contains(key) {
let _ = self.pending_ops.remove(key);
let _ = self.closest_adults.remove(key);
Some(*key)
} else {
None
}
})
.collect::<Vec<_>>();

let message_ids = self
.ops
.iter()
.map(|entry| *entry.key())
.collect::<Vec<_>>();

for name in removed_members {
// TODO(after T4): For write operations perhaps we need to write it to a different Adult
for msg_id in &message_ids {
self.remove_target(msg_id, &name);
}
}

self.recompute_closest_adults();
}

Expand Down Expand Up @@ -139,35 +151,41 @@ impl AdultLiveness {
}

fn increment_pending_op(&self, targets: &BTreeSet<XorName>) {
let mut closest_changed = false;

for node in targets {
*self.pending_ops.entry(*node).or_insert(0) += 1;
if !self.pending_ops.contains_key(node) {
let _ = self.pending_ops.insert(*node, 1);
}
if let Some(mut pair) = self.pending_ops.get_mut(node) {
*pair.value_mut() += 1;
}
if !self.closest_adults.contains_key(node) {
closest_changed = true;
let _ = self.closest_adults.insert(*node, Vec::new());
self.recompute_closest_adults();
}
}

if closest_changed {
self.recompute_closest_adults();
}
}

// TODO: how severe is potential variance due to concurrency?
pub fn recompute_closest_adults(&self) {
self.closest_adults
let all_keys: Vec<_> = self
.closest_adults
.iter()
.map(|entry| {
let key = entry.key();
let closest_adults = self
.closest_adults
.iter()
.map(|entry| *entry.key())
.filter(|name| key != name)
.sorted_by(|lhs, rhs| key.cmp_distance(lhs, rhs))
.take(NEIGHBOUR_COUNT)
.collect::<Vec<_>>();

(key.to_owned(), closest_adults)
})
.for_each(|(a, b)| {
let _ = self.closest_adults.insert(a, b);
});
.map(|entry| *entry.key())
.collect();
self.closest_adults.alter_all(|name, _| {
all_keys
.iter()
.filter(|&key| key != name)
.sorted_by(|lhs, rhs| name.cmp_distance(lhs, rhs))
.take(NEIGHBOUR_COUNT)
.copied()
.collect::<Vec<_>>()
});
}

// this is not an exact definition, thus has tolerance for variance due to concurrency
Expand Down
142 changes: 81 additions & 61 deletions src/node/metadata/register_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,37 +23,33 @@ use crate::{
types::DataAddress,
};
use dashmap::DashMap;
use std::sync::Arc;
use std::{
collections::BTreeMap,
fmt::{self, Display, Formatter},
path::{Path, PathBuf},
};
use tokio::sync::RwLock;
use tracing::info;
use xor_name::{Prefix, XorName};

/// Operations over the data type Register.
pub(super) struct RegisterStorage {
path: PathBuf,
_used_space: UsedSpace,
store: DashMap<XorName, TransientEntry>,
used_space: UsedSpace,
registers: DashMap<XorName, Option<StateEntry>>,
}

type TransientEntry = Arc<RwLock<Option<StateEntry>>>;

struct StateEntry {
state: Register,
history: EventStore<RegisterCmd>,
db: EventStore<RegisterCmd>,
}

impl RegisterStorage {
pub(super) fn new(path: &Path, _used_space: UsedSpace) -> Self {
_used_space.add_dir(path);
pub(super) fn new(path: &Path, used_space: UsedSpace) -> Self {
used_space.add_dir(path);
Self {
path: path.to_path_buf(),
_used_space,
store: DashMap::new(),
used_space,
registers: DashMap::new(),
}
}

Expand All @@ -63,12 +59,13 @@ impl RegisterStorage {
pub(super) async fn get_data_of(&self, prefix: Prefix) -> Result<RegisterDataExchange> {
let mut the_data = BTreeMap::default();

for entry in self.store.iter() {
let (key, val) = entry.pair();
if let Some(entry) = val.read().await.as_ref() {
for entry in self.registers.iter() {
let (key, cache) = entry.pair();
if let Some(entry) = cache {
if prefix.matches(entry.state.name()) {
let _ = the_data.insert(*key, entry.history.get_all());
let _ = the_data.insert(*key, entry.db.get_all());
}
} else {
}
}

Expand All @@ -87,16 +84,36 @@ impl RegisterStorage {
let _ = self.apply(op).await?;
}
}

// let instance = Arc::new(self);

// let handles = data.iter().map(|(_, history)| {
// let store = instance.clone();
// tokio::task::spawn(async {
// for op in history {
// let _ = store.apply(*op).await?;
// //let _ = self.apply(*op).await?;
// }
// Ok::<_, Error>(())
// })
// });

// join_all(handles)
// .await
// .iter()
// .flatten()
// .for_each(|e| error!("{:?}", e));

Ok(())
}

/// --- Writing ---
pub(super) async fn write(&self, op: RegisterCmd) -> Result<NodeDuty> {
// let required_space = std::mem::size_of::<RegisterCmd>() as u64;
// if !self.used_space.can_consume(required_space).await {
// return Err(Error::NotEnoughSpace);
// }
let required_space = std::mem::size_of::<RegisterCmd>() as u64;
if !self.used_space.can_consume(required_space).await {
return Err(Error::Database(crate::dbs::Error::NotEnoughSpace));
}
let msg_id = op.msg_id;
let origin = op.origin;
let write_result = self.apply(op).await;
Expand All @@ -117,72 +134,72 @@ impl RegisterStorage {
use RegisterWrite::*;
match write {
New(map) => {
if self.store.contains_key(&key) {
if self.registers.contains_key(&key) {
return Err(Error::DataExists);
}
let mut store = get_store(key, self.path.as_path())?;
let _ = store.append(op)?;
let _ = self.store.insert(
key,
Arc::new(RwLock::new(Some(StateEntry {
state: map,
history: store,
}))),
);
let mut db = load_db(key, self.path.as_path())?;
let _ = db.append(op)?;
let _ = self
.registers
.insert(key, Some(StateEntry { state: map, db }));
Ok(())
}
Delete(_) => {
let result = match self.store.get(&key) {
Some(entry) => {
let (key, val) = entry.pair();
// aqcuire write lock when deleting
if let Some(entry) = val.write().await.as_ref() {
let result = match self.registers.get_mut(&key) {
None => {
if let Ok(db) = load_db(key, self.path.as_path()) {
info!("Deleting Register");
db.as_deletable().delete().map_err(Error::from)
} else {
Ok(())
}
}
Some(mut entry) => {
let (_, cache) = entry.pair_mut();
if let Some(entry) = cache {
if entry.state.address().is_public() {
return Err(Error::InvalidMessage(
msg_id,
"Cannot delete public Register".to_string(),
));
}

// TODO - Register::check_permission() doesn't support Delete yet in safe-nd
// register.check_permission(action, Some(client_sig.public_key))?;

if client_sig.public_key != entry.state.owner() {
Err(Error::InvalidOwner(client_sig.public_key))
} else {
info!("Deleting Register");
entry.history.as_deletable().delete().map_err(Error::from)
entry.db.as_deletable().delete().map_err(Error::from)
}
} else if let Ok(store) = get_store(*key, self.path.as_path()) {
} else if let Ok(db) = load_db(key, self.path.as_path()) {
info!("Deleting Register");
store.as_deletable().delete().map_err(Error::from)
db.as_deletable().delete().map_err(Error::from)
} else {
Ok(())
}
}
None => Ok(()),
};

if result.is_ok() {
let _ = self.store.remove(&key);
let _ = self.registers.remove(&key);
}

result
}
Edit(reg_op) => {
let existing = match self.store.get(&key) {
Some(entry) => entry.value().clone(),
None => return Err(Error::NoSuchData(DataAddress::Register(address))),
};
let mut cache = existing.write().await;
let mut cache = self
.registers
.get_mut(&key)
.ok_or(Error::NoSuchData(DataAddress::Register(address)))?;
let entry = if let Some(cached_entry) = cache.as_mut() {
cached_entry
} else {
// read from disk
let history = get_store(key, self.path.as_path())?;
let db = load_db(key, self.path.as_path())?;
let mut reg = None;
// apply all ops
for op in history.get_all() {
for op in db.get_all() {
// first op shall be New
if let New(register) = op.write {
reg = Some(register);
} else if let Some(register) = &mut reg {
Expand All @@ -192,14 +209,18 @@ impl RegisterStorage {
}
}

let new_entry = match reg.take() {
Some(state) => StateEntry { state, history },
None => return Err(Error::NoSuchData(DataAddress::Register(address))),
};
let new_entry = reg
.take()
.ok_or(Error::NoSuchData(DataAddress::Register(address)))
.map(|state| StateEntry { state, db })?;

let _ = cache.replace(new_entry);

cache.as_mut().unwrap()
if let Some(entry) = cache.as_mut() {
entry
} else {
return Err(Error::NoSuchData(DataAddress::Register(address)));
}
};

info!("Editing Register");
Expand All @@ -209,7 +230,7 @@ impl RegisterStorage {
let result = entry.state.apply_op(reg_op).map_err(Error::NetworkData);

if result.is_ok() {
entry.history.append(op)?;
entry.db.append(op)?;
info!("Editing Register SUCCESSFUL!");
} else {
info!("Editing Register FAILED!");
Expand Down Expand Up @@ -273,11 +294,10 @@ impl RegisterStorage {
action: Action,
requester: PublicKey,
) -> Result<Register> {
let existing = match self.store.get(&to_id(address)?) {
Some(entry) => entry.value().clone(),
None => return Err(Error::NoSuchData(DataAddress::Register(*address))),
};
let cache = existing.read().await;
let cache = self
.registers
.get(&to_id(address)?)
.ok_or_else(|| Error::NoSuchData(DataAddress::Register(*address)))?;
let StateEntry { state, .. } = cache
.as_ref()
.ok_or_else(|| Error::NoSuchData(DataAddress::Register(*address)))?;
Expand Down Expand Up @@ -412,8 +432,8 @@ fn to_id(address: &Address) -> Result<XorName> {
.as_bytes()]))
}

fn get_store(id: XorName, path: &Path) -> Result<EventStore<RegisterCmd>> {
let db_dir = path.join("db").join("map".to_string());
fn load_db(id: XorName, path: &Path) -> Result<EventStore<RegisterCmd>> {
let db_dir = path.join("db").join("register".to_string());
EventStore::new(id, db_dir.as_path()).map_err(Error::from)
}

Expand Down
2 changes: 1 addition & 1 deletion src/node/metadata/sequence_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ fn to_id(address: &Address) -> Result<XorName> {
}

fn new_store(id: XorName, path: &Path) -> Result<EventStore<SequenceCmd>> {
let db_dir = path.join("db").join("map".to_string());
let db_dir = path.join("db").join("sequence".to_string());
EventStore::new(id, db_dir.as_path()).map_err(Error::from)
}

Expand Down

0 comments on commit b0ff963

Please sign in to comment.