From b2ecc5a8dfe2b3526ed773514222418e18faab30 Mon Sep 17 00:00:00 2001 From: Jules de Smit Date: Tue, 24 Aug 2021 15:44:33 +0200 Subject: [PATCH 1/5] Remove Storage abstraction, and remove files iteratively on round reset Fixes #360 --- .../src/commands/aggregation.rs | 8 +- .../src/commands/initialization.rs | 6 +- phase1-coordinator/src/coordinator.rs | 53 +++------ phase1-coordinator/src/coordinator_state.rs | 4 +- phase1-coordinator/src/environment.rs | 5 +- phase1-coordinator/src/main.rs | 5 +- phase1-coordinator/src/objects/round.rs | 107 ++++-------------- phase1-coordinator/src/storage/disk.rs | 75 ++++++++---- phase1-coordinator/src/storage/storage.rs | 48 +------- 9 files changed, 103 insertions(+), 208 deletions(-) diff --git a/phase1-coordinator/src/commands/aggregation.rs b/phase1-coordinator/src/commands/aggregation.rs index 58ff4256..2dcc1a3b 100644 --- a/phase1-coordinator/src/commands/aggregation.rs +++ b/phase1-coordinator/src/commands/aggregation.rs @@ -1,7 +1,7 @@ use crate::{ environment::Environment, objects::Round, - storage::{ContributionLocator, Locator, Object, ObjectReader, Storage}, + storage::{ContributionLocator, Disk, Locator, Object, ObjectReader, StorageLocator, StorageObject}, CoordinatorError, }; use phase1::{helpers::CurveKind, Phase1}; @@ -15,7 +15,7 @@ pub(crate) struct Aggregation; impl Aggregation { /// Runs aggregation for a given environment, storage, and round. #[inline] - pub(crate) fn run(environment: &Environment, storage: &mut impl Storage, round: &Round) -> anyhow::Result<()> { + pub(crate) fn run(environment: &Environment, storage: &mut Disk, round: &Round) -> anyhow::Result<()> { let start = Instant::now(); // Fetch the round height. @@ -95,7 +95,7 @@ impl Aggregation { #[inline] fn readers<'a>( environment: &Environment, - storage: &'a impl Storage, + storage: &'a Disk, round: &Round, ) -> anyhow::Result>> { let mut readers = vec![]; @@ -148,7 +148,7 @@ mod tests { use crate::{ authentication::Dummy, commands::{Aggregation, Seed, SigningKey, SEED_LENGTH}, - storage::{Locator, Storage}, + storage::Locator, testing::prelude::*, Coordinator, }; diff --git a/phase1-coordinator/src/commands/initialization.rs b/phase1-coordinator/src/commands/initialization.rs index e331ac31..02355f92 100644 --- a/phase1-coordinator/src/commands/initialization.rs +++ b/phase1-coordinator/src/commands/initialization.rs @@ -1,6 +1,6 @@ use crate::{ environment::Environment, - storage::{ContributionLocator, Locator, Object, Storage}, + storage::{ContributionLocator, Disk, Locator, Object, StorageObject}, CoordinatorError, }; use phase1::{helpers::CurveKind, Phase1, Phase1Parameters}; @@ -23,7 +23,7 @@ impl Initialization { #[inline] pub(crate) fn run( environment: &Environment, - storage: &mut impl Storage, + storage: &mut Disk, round_height: u64, chunk_id: u64, ) -> anyhow::Result> { @@ -100,7 +100,7 @@ impl Initialization { /// Compute both contribution hashes and check for equivalence. #[inline] fn check_hash( - storage: &impl Storage, + storage: &Disk, contribution_locator: &Locator, next_contribution_locator: &Locator, ) -> anyhow::Result> { diff --git a/phase1-coordinator/src/coordinator.rs b/phase1-coordinator/src/coordinator.rs index 892c8524..73b26720 100644 --- a/phase1-coordinator/src/coordinator.rs +++ b/phase1-coordinator/src/coordinator.rs @@ -22,8 +22,9 @@ use crate::{ Locator, LocatorPath, Object, - Storage, StorageAction, + StorageLocator, + StorageObject, }, }; use setup_utils::calculate_hash; @@ -330,13 +331,13 @@ impl TimeSource for MockTimeSource { /// A core structure for operating the Phase 1 ceremony. This struct /// is designed to be [Send] + [Sync]. The state of the ceremony is /// stored in a [CoordinatorState] object. -pub struct Coordinator { +pub struct Coordinator { /// The parameters and settings of this coordinator. environment: Environment, /// The signature scheme for contributors & verifiers with this coordinator. signature: Arc, /// The storage of contributions and rounds for this coordinator. - storage: S, + storage: Disk, /// The current round and participant self. state: CoordinatorState, /// The source of time, allows mocking system time for testing. @@ -345,7 +346,7 @@ pub struct Coordinator { aggregation_callback: Arc) -> () + Send + Sync>, } -impl Coordinator { +impl Coordinator { /// /// Creates a new instance of the `Coordinator`, for a given environment. /// @@ -393,10 +394,7 @@ impl Coordinator { } } -impl Coordinator -where - S: Storage + 'static, -{ +impl Coordinator { /// /// Runs a set of operations to initialize state and start the coordinator. /// @@ -2355,7 +2353,7 @@ where } #[inline] - fn load_current_round_height(storage: &S) -> Result { + fn load_current_round_height(storage: &Disk) -> Result { // Fetch the current round height from storage. match storage.get(&Locator::RoundHeight)? { // Case 1 - This is a typical round of the ceremony. @@ -2366,7 +2364,7 @@ where } #[inline] - fn load_current_round(storage: &S) -> Result { + fn load_current_round(storage: &Disk) -> Result { // Fetch the current round height from storage. let current_round_height = Self::load_current_round_height(storage)?; @@ -2375,7 +2373,7 @@ where } #[inline] - fn load_round(storage: &S, round_height: u64) -> Result { + fn load_round(storage: &Disk, round_height: u64) -> Result { // Fetch the current round height from storage. let current_round_height = Self::load_current_round_height(storage)?; @@ -2399,7 +2397,7 @@ where /// #[cfg(test)] #[inline] - pub(super) fn storage(&self) -> &S { + pub(super) fn storage(&self) -> &Disk { &self.storage } @@ -2409,7 +2407,7 @@ where /// #[cfg(test)] #[inline] - pub(super) fn storage_mut(&mut self) -> &mut S { + pub(super) fn storage_mut(&mut self) -> &mut Disk { &mut self.storage } @@ -2455,26 +2453,10 @@ where let mut round = Self::load_round(&mut self.storage, current_round_height)?; tracing::debug!("Resetting round and applying storage changes"); - if let Some(error) = round - .reset(&reset_action.remove_participants) - .into_iter() - .map(|action| match &action { - StorageAction::Remove(a) => match a.clone().try_into_locator(&self.storage) { - Ok(locator) => { - if self.storage.exists(&locator) { - return self.storage.process(action); - } else { - Ok(()) - } - } - Err(e) => Err(e), - }, - _ => self.storage.process(action), - }) - .find_map(Result::err) - { - return Err(error); - } + self.storage.process(round.reset(&reset_action.remove_participants))?; + + // Clear all files + self.storage.clear_round_files(current_round_height)?; if reset_action.rollback { if current_round_height == 0 { @@ -2501,10 +2483,7 @@ where use crate::commands::{Computation, Seed, SigningKey, Verification}; #[cfg(any(test, feature = "operator"))] -impl Coordinator -where - S: Storage + 'static, -{ +impl Coordinator { #[tracing::instrument( skip(self, contributor, contributor_signing_key, contributor_seed), fields(contributor = %contributor), diff --git a/phase1-coordinator/src/coordinator_state.rs b/phase1-coordinator/src/coordinator_state.rs index 421d3128..fb6d3dcc 100644 --- a/phase1-coordinator/src/coordinator_state.rs +++ b/phase1-coordinator/src/coordinator_state.rs @@ -4,7 +4,7 @@ use crate::{ participant::*, task::{initialize_tasks, Task}, }, - storage::{Locator, Object, Storage}, + storage::{Disk, Locator, Object}, CoordinatorError, TimeSource, }; @@ -3475,7 +3475,7 @@ impl CoordinatorState { /// Save the coordinator state in storage. #[inline] - pub(crate) fn save(&self, storage: &mut impl Storage) -> Result<(), CoordinatorError> { + pub(crate) fn save(&self, storage: &mut Disk) -> Result<(), CoordinatorError> { storage.update(&Locator::CoordinatorState, Object::CoordinatorState(self.clone())) } } diff --git a/phase1-coordinator/src/environment.rs b/phase1-coordinator/src/environment.rs index 4e2da3e4..c9c4a5a8 100644 --- a/phase1-coordinator/src/environment.rs +++ b/phase1-coordinator/src/environment.rs @@ -1,7 +1,4 @@ -use crate::{ - objects::Participant, - storage::{Disk, Storage}, -}; +use crate::{objects::Participant, storage::Disk}; use phase1::{helpers::CurveKind, ContributionMode, ProvingSystem}; use setup_utils::{CheckForCorrectness, UseCompression}; diff --git a/phase1-coordinator/src/main.rs b/phase1-coordinator/src/main.rs index 1dfa2fff..099127ff 100644 --- a/phase1-coordinator/src/main.rs +++ b/phase1-coordinator/src/main.rs @@ -10,7 +10,7 @@ use std::{sync::Arc, time::Duration}; use tokio::{sync::RwLock, task, time::sleep}; use tracing::*; -fn coordinator(environment: &Environment, signature: Arc) -> anyhow::Result> { +fn coordinator(environment: &Environment, signature: Arc) -> anyhow::Result { Ok(Coordinator::new(environment.clone(), signature)?) } @@ -29,8 +29,7 @@ pub async fn main() -> anyhow::Result<()> { // let environment: Environment = Production::from(Parameters::AleoInner).into(); // Instantiate the coordinator. - let coordinator: Arc>> = - Arc::new(RwLock::new(coordinator(&environment, Arc::new(Dummy))?)); + let coordinator: Arc> = Arc::new(RwLock::new(coordinator(&environment, Arc::new(Dummy))?)); let ceremony_coordinator = coordinator.clone(); // Initialize the coordinator. diff --git a/phase1-coordinator/src/objects/round.rs b/phase1-coordinator/src/objects/round.rs index 9223b641..2cbb4775 100644 --- a/phase1-coordinator/src/objects/round.rs +++ b/phase1-coordinator/src/objects/round.rs @@ -4,12 +4,13 @@ use crate::{ storage::{ ContributionLocator, ContributionSignatureLocator, + Disk, Locator, LocatorPath, Object, RemoveAction, - Storage, StorageAction, + StorageLocator, UpdateAction, }, CoordinatorError, @@ -87,7 +88,7 @@ impl Round { #[inline] pub(crate) fn new( environment: &Environment, - storage: &mut impl Storage, + storage: &mut Disk, round_height: u64, started_at: DateTime, contributor_ids: Vec, @@ -382,7 +383,7 @@ impl Round { )] pub(crate) fn current_contribution_locator( &self, - storage: &impl Storage, + storage: &Disk, chunk_id: u64, verified: bool, ) -> Result { @@ -443,7 +444,7 @@ impl Round { )] pub(crate) fn next_contribution_locator( &self, - storage: &impl Storage, + storage: &Disk, chunk_id: u64, ) -> Result { // Fetch the current round height. @@ -499,7 +500,7 @@ impl Round { #[inline] pub(crate) fn next_contribution_file_signature_locator( &self, - storage: &impl Storage, + storage: &Disk, chunk_id: u64, ) -> Result { // Fetch the current round height. @@ -550,7 +551,7 @@ impl Round { pub(crate) fn try_lock_chunk( &mut self, environment: &Environment, - storage: &mut impl Storage, + storage: &mut Disk, chunk_id: u64, participant: &Participant, ) -> Result { @@ -797,7 +798,7 @@ impl Round { /// Remove a contributor from the round. pub(crate) fn remove_contributor_unsafe( &mut self, - storage: &mut impl Storage, + storage: &mut Disk, contributor: &Participant, locked_chunks: &[u64], tasks: &[Task], @@ -834,7 +835,7 @@ impl Round { #[inline] pub(crate) fn remove_locks_unsafe( &mut self, - storage: &mut impl Storage, + storage: &mut Disk, participant: &Participant, locked_chunks: &[u64], ) -> Result<(), CoordinatorError> { @@ -1013,7 +1014,7 @@ impl Round { )] pub(crate) fn remove_chunk_contributions_unsafe( &mut self, - storage: &mut impl Storage, + storage: &mut Disk, participant: &Participant, tasks: &[Task], ) -> Result<(), CoordinatorError> { @@ -1137,82 +1138,14 @@ impl Round { /// the [crate::storage::Storage] to reflect the changes to the /// round state. `remove_participants` is a list of participants /// to remove from the round. - pub(crate) fn reset(&mut self, remove_participants: &[Participant]) -> Vec { - let expected_number_of_contributions = self.expected_number_of_contributions(); - let round_height = self.round_height(); - - let mut actions: Vec = - self.chunks - .iter_mut() - .flat_map(|chunk| { - let chunk_id = chunk.chunk_id(); - - let contributions_remove: Vec<(u64, Vec)> = chunk.get_contributions() - .iter() - .filter(|(id, _)| **id != 0) // don't remove initial challenge - .map(|(id, contribution)| { - let actions: Vec = contribution.get_locators() - .into_iter() - .map(|path| RemoveAction::new(path)) - .collect(); - (*id, actions) - }) - .collect(); - - // Remove files that were initialized when the lock was taken, - // but have not yet had the contirbution/verification uploaded. - let remove_initialized_files: Vec = match chunk.lock_holder() { - Some(participant) => { - let (adjusted_round_height, contribution_id, is_verified) = match participant { - Participant::Contributor(_) => { - (round_height, chunk.current_contribution_id() + 1, false) - } - Participant::Verifier(_) => { - let (adjusted_round_height, contribution_id) = - if chunk.current_contribution_id() == expected_number_of_contributions - 1 { - // handle the case where the final verification becomes - // the first verification for the next round. - (round_height + 1, 0) - } else { - (round_height, chunk.current_contribution_id()) - }; - (adjusted_round_height, contribution_id, true) - } - }; - - let contribution_locator = Locator::ContributionFile(ContributionLocator::new( - adjusted_round_height, - chunk_id, - contribution_id, - is_verified, - )); - let signature_locator = Locator::ContributionFileSignature( - ContributionSignatureLocator::new(round_height, chunk_id, contribution_id, is_verified), - ); - - vec![ - RemoveAction::new(contribution_locator), - RemoveAction::new(signature_locator), - ] - } - None => Vec::new(), - }; - - chunk.set_lock_holder_unsafe(None); - - let actions: Vec = contributions_remove - .into_iter() - .flat_map(|(contribution_id, actions)| { - chunk.remove_contribution_unsafe(contribution_id); - actions.into_iter() - }) - .map(StorageAction::Remove) - .chain(remove_initialized_files.into_iter().map(StorageAction::Remove)) - .collect(); + pub(crate) fn reset(&mut self, remove_participants: &[Participant]) -> StorageAction { + self.chunks.iter_mut().for_each(|chunk| { + chunk.set_lock_holder_unsafe(None); - actions.into_iter() - }) - .collect(); + for (id, _) in chunk.clone().get_contributions() { + chunk.remove_contribution_unsafe(*id); + } + }); // Remove the requested participants from the set of contributor IDs. self.contributor_ids = self @@ -1230,14 +1163,12 @@ impl Round { .filter(|v| remove_participants.iter().find(|p| p == &v).is_none()) .collect(); - actions.push(StorageAction::Update(UpdateAction { + StorageAction::Update(UpdateAction { locator: Locator::RoundState { round_height: self.height, }, object: Object::RoundState(self.clone()), // PERFORMANCE: clone here is not great for performance - })); - - actions + }) } } diff --git a/phase1-coordinator/src/storage/disk.rs b/phase1-coordinator/src/storage/disk.rs index aa9eb96c..2aa647d9 100644 --- a/phase1-coordinator/src/storage/disk.rs +++ b/phase1-coordinator/src/storage/disk.rs @@ -8,7 +8,6 @@ use crate::{ Object, ObjectReader, ObjectWriter, - Storage, StorageLocator, StorageObject, }, @@ -16,6 +15,7 @@ use crate::{ CoordinatorState, }; +use anyhow::Result; use itertools::Itertools; use memmap::{MmapMut, MmapOptions}; use rayon::prelude::*; @@ -24,8 +24,8 @@ use std::{ collections::{BTreeSet, HashMap, HashSet}, convert::TryFrom, fs::{self, File, OpenOptions}, - io::Write, - path::Path, + io::{self, Error, ErrorKind, Write}, + path::{Path, PathBuf}, str::FromStr, sync::{Arc, RwLock}, }; @@ -41,10 +41,44 @@ pub struct Disk { resolver: DiskResolver, } -impl Storage for Disk { +impl Disk { + pub fn clear_round_files(&self, current_round_height: u64) -> Result<()> { + // Let's first fully clear any files in the next round - these will be + // verifications and represent the initial challenges. + let next_round_dir = self.resolver.round_directory(current_round_height + 1); + clear_dir_files(next_round_dir.into(), &fs::remove_file)?; + + // Now, let's clear all the contributions made on this round. + let round_dir = self.resolver.round_directory(current_round_height); + clear_dir_files(round_dir.into(), &remove_round_contribution) + } +} + +fn remove_round_contribution(path: PathBuf) -> io::Result<()> { + let file_name = path + .to_str() + .ok_or(Error::new(ErrorKind::Other, "filepath is not UTF-8 encoded"))? + .to_owned(); + + match file_name.contains("contribution_0") { + true => Ok(()), + false => fs::remove_file(path), + } +} + +fn clear_dir_files(path: PathBuf, remove_file: &dyn Fn(PathBuf) -> io::Result<()>) -> Result<()> { + Ok(for entry in fs::read_dir(path.as_path())? { + let entry = entry?; + match entry.path().is_dir() { + true => clear_dir_files(entry.path(), remove_file)?, + false => remove_file(entry.path())?, + } + }) +} + +impl Disk { /// Loads a new instance of `Disk`. - #[inline] - fn load(environment: &Environment) -> Result + pub fn load(environment: &Environment) -> Result where Self: Sized, { @@ -94,8 +128,7 @@ impl Storage for Disk { } /// Initializes the location corresponding to the given locator. - #[inline] - fn initialize(&mut self, locator: Locator, size: u64) -> Result<(), CoordinatorError> { + pub fn initialize(&mut self, locator: Locator, size: u64) -> Result<(), CoordinatorError> { let locator_path = self.to_path(&locator)?; trace!("Initializing {:?}", locator_path); @@ -128,8 +161,7 @@ impl Storage for Disk { } /// Returns `true` if a given locator exists in storage. Otherwise, returns `false`. - #[inline] - fn exists(&self, locator: &Locator) -> bool { + pub fn exists(&self, locator: &Locator) -> bool { let is_in_manifest = self.manifest.read().unwrap().contains(locator); #[cfg(test)] trace!("Checking if locator exists in storage (manifest = {})", is_in_manifest,); @@ -137,8 +169,7 @@ impl Storage for Disk { } /// Returns `true` if a given locator is opened in storage. Otherwise, returns `false`. - #[inline] - fn is_open(&self, locator: &Locator) -> bool { + pub fn is_open(&self, locator: &Locator) -> bool { let is_in_manifest = self.manifest.read().unwrap().contains(locator); let is_in_locators = self.open.contains_key(locator); #[cfg(test)] @@ -151,8 +182,7 @@ impl Storage for Disk { } /// Returns a copy of an object at the given locator in storage, if it exists. - #[inline] - fn get(&self, locator: &Locator) -> Result { + pub fn get(&self, locator: &Locator) -> Result { trace!("Fetching {}", self.to_path(locator)?); // Check that the given locator exists in storage. @@ -254,8 +284,7 @@ impl Storage for Disk { } /// Inserts a new object at the given locator into storage, if it does not exist. - #[inline] - fn insert(&mut self, locator: Locator, object: Object) -> Result<(), CoordinatorError> { + pub fn insert(&mut self, locator: Locator, object: Object) -> Result<(), CoordinatorError> { trace!("Inserting {}", self.to_path(&locator)?); // Check that the given locator does not exist in storage. @@ -281,8 +310,7 @@ impl Storage for Disk { } /// Updates an existing object for the given locator in storage, if it exists. - #[inline] - fn update(&mut self, locator: &Locator, object: Object) -> Result<(), CoordinatorError> { + pub fn update(&mut self, locator: &Locator, object: Object) -> Result<(), CoordinatorError> { trace!("Updating {}", self.to_path(locator)?); // Check that the given locator exists in storage. @@ -325,8 +353,7 @@ impl Storage for Disk { } /// Copies an object from the given source locator to the given destination locator. - #[inline] - fn copy(&mut self, source_locator: &Locator, destination_locator: &Locator) -> Result<(), CoordinatorError> { + pub fn copy(&mut self, source_locator: &Locator, destination_locator: &Locator) -> Result<(), CoordinatorError> { trace!( "Copying from A to B\n\n\tA: {}\n\tB: {}\n", self.to_path(source_locator)?, @@ -359,8 +386,7 @@ impl Storage for Disk { } /// Removes the object corresponding to the given locator from storage. - #[inline] - fn remove(&mut self, locator: &Locator) -> Result<(), CoordinatorError> { + pub fn remove(&mut self, locator: &Locator) -> Result<(), CoordinatorError> { trace!("Removing {}", self.to_path(locator)?); // Check that the locator exists in storage. @@ -394,8 +420,7 @@ impl Storage for Disk { } /// Returns the size of the object stored at the given locator. - #[inline] - fn size(&self, locator: &Locator) -> Result { + pub fn size(&self, locator: &Locator) -> Result { trace!("Fetching size of {}", self.to_path(locator)?); // Check that the given locator exists in storage. @@ -414,7 +439,7 @@ impl Storage for Disk { Ok(size) } - fn process(&mut self, action: StorageAction) -> Result<(), CoordinatorError> { + pub fn process(&mut self, action: StorageAction) -> Result<(), CoordinatorError> { match action { StorageAction::Remove(remove_action) => { let locator = remove_action.try_into_locator(self)?; diff --git a/phase1-coordinator/src/storage/storage.rs b/phase1-coordinator/src/storage/storage.rs index 472e311a..ef180b3c 100644 --- a/phase1-coordinator/src/storage/storage.rs +++ b/phase1-coordinator/src/storage/storage.rs @@ -15,6 +15,8 @@ use std::{ sync::{RwLockReadGuard, RwLockWriteGuard}, }; +use super::Disk; + #[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, Serialize, Deserialize)] pub struct ContributionLocator { round_height: u64, @@ -193,44 +195,6 @@ impl Object { pub type ObjectReader<'a> = RwLockReadGuard<'a, MmapMut>; pub type ObjectWriter<'a> = RwLockWriteGuard<'a, MmapMut>; -/// A standard model for storage. -pub trait Storage: Send + Sync + StorageLocator + StorageObject { - /// Loads a new instance of `Storage`. - fn load(environment: &Environment) -> Result - where - Self: Sized; - - /// Initializes the location corresponding to the given locator with the given size. - fn initialize(&mut self, locator: Locator, size: u64) -> Result<(), CoordinatorError>; - - /// Returns `true` if a given locator exists in storage. Otherwise, returns `false`. - fn exists(&self, locator: &Locator) -> bool; - - /// Returns `true` if a given locator is opened in storage. Otherwise, returns `false`. - fn is_open(&self, locator: &Locator) -> bool; - - /// Returns a copy of an object at the given locator in storage, if it exists. - fn get(&self, locator: &Locator) -> Result; - - /// Inserts a new object at the given locator into storage, if it does not exist. - fn insert(&mut self, locator: Locator, object: Object) -> Result<(), CoordinatorError>; - - /// Updates an existing object for the given locator in storage, if it exists. - fn update(&mut self, locator: &Locator, object: Object) -> Result<(), CoordinatorError>; - - /// Copies the object in the given source locator to the given destination locator. - fn copy(&mut self, source_locator: &Locator, destination_locator: &Locator) -> Result<(), CoordinatorError>; - - /// Removes a object from storage for a given locator. - fn remove(&mut self, locator: &Locator) -> Result<(), CoordinatorError>; - - /// Returns the size of the object stored at the given locator. - fn size(&self, locator: &Locator) -> Result; - - /// Process a [StorageAction] which mutates the storage. - fn process(&mut self, action: StorageAction) -> Result<(), CoordinatorError>; -} - /// The path to a resource defined by a [Locator]. #[derive(Clone, Serialize, Deserialize, Eq, PartialEq, Hash, PartialOrd, Ord)] pub struct LocatorPath(String); @@ -296,14 +260,14 @@ pub enum LocatorOrPath { } impl LocatorOrPath { - pub fn try_into_locator(self, storage: &impl Storage) -> Result { + pub fn try_into_locator(self, storage: &Disk) -> Result { match self { LocatorOrPath::Path(path) => storage.to_locator(&path), LocatorOrPath::Locator(locator) => Ok(locator), } } - pub fn try_into_path(self, storage: &impl Storage) -> Result { + pub fn try_into_path(self, storage: &Disk) -> Result { match self { LocatorOrPath::Path(path) => Ok(path), LocatorOrPath::Locator(locator) => storage.to_path(&locator), @@ -345,11 +309,11 @@ impl RemoveAction { /// Obtain the location of the item to be removed from [Storage] /// as a [Locator]. - pub fn try_into_locator(self, storage: &impl Storage) -> Result { + pub fn try_into_locator(self, storage: &Disk) -> Result { self.locator_or_path.try_into_locator(storage) } - pub fn try_into_path(self, storage: &impl Storage) -> Result { + pub fn try_into_path(self, storage: &Disk) -> Result { self.locator_or_path.try_into_path(storage) } } From bdb30cda5965db07c5868c1cae7340dfc9aaca84 Mon Sep 17 00:00:00 2001 From: Jules de Smit Date: Tue, 24 Aug 2021 16:16:32 +0200 Subject: [PATCH 2/5] Use proper deletion method, simplify deletion logic --- .../src/commands/computation.rs | 6 +-- phase1-coordinator/src/commands/mod.rs | 4 +- .../src/commands/verification.rs | 16 +++++-- phase1-coordinator/src/main.rs | 1 - phase1-coordinator/src/storage/disk.rs | 45 ++++++++++--------- phase1-coordinator/src/testing/coordinator.rs | 6 +-- 6 files changed, 43 insertions(+), 35 deletions(-) diff --git a/phase1-coordinator/src/commands/computation.rs b/phase1-coordinator/src/commands/computation.rs index f9af608f..f152909a 100644 --- a/phase1-coordinator/src/commands/computation.rs +++ b/phase1-coordinator/src/commands/computation.rs @@ -2,7 +2,7 @@ use crate::{ authentication::Signature, commands::SigningKey, environment::Environment, - storage::{Locator, Storage}, + storage::{Disk, Locator, StorageLocator, StorageObject}, CoordinatorError, }; use phase1::{helpers::CurveKind, Phase1, Phase1Parameters}; @@ -30,7 +30,7 @@ impl Computation { /// pub(crate) fn run( environment: &Environment, - storage: &mut impl Storage, + storage: &mut Disk, signature: Arc, contributor_signing_key: &SigningKey, challenge_locator: &Locator, @@ -175,7 +175,7 @@ mod tests { use crate::{ authentication::{Dummy, Signature}, commands::{Computation, Initialization, Seed, SEED_LENGTH}, - storage::{ContributionLocator, ContributionSignatureLocator, Locator, Object, Storage, StorageObject}, + storage::{ContributionLocator, ContributionSignatureLocator, Locator, Object, StorageObject}, testing::prelude::*, }; use setup_utils::calculate_hash; diff --git a/phase1-coordinator/src/commands/mod.rs b/phase1-coordinator/src/commands/mod.rs index f0aebb7e..99e0f20e 100644 --- a/phase1-coordinator/src/commands/mod.rs +++ b/phase1-coordinator/src/commands/mod.rs @@ -18,7 +18,7 @@ pub(crate) use verification::*; use crate::{ authentication::Signature, objects::{ContributionFileSignature, ContributionState}, - storage::{Locator, Storage}, + storage::{Disk, Locator, StorageLocator, StorageObject}, CoordinatorError, }; @@ -47,7 +47,7 @@ pub type SigningKey = String; #[cfg(any(test, feature = "operator"))] #[inline] pub(crate) fn write_contribution_file_signature( - storage: &mut impl Storage, + storage: &mut Disk, signature: Arc, signing_key: &SigningKey, challenge_locator: &Locator, diff --git a/phase1-coordinator/src/commands/verification.rs b/phase1-coordinator/src/commands/verification.rs index 0116543d..4194f89a 100644 --- a/phase1-coordinator/src/commands/verification.rs +++ b/phase1-coordinator/src/commands/verification.rs @@ -2,7 +2,15 @@ use crate::{ authentication::Signature, commands::SigningKey, environment::Environment, - storage::{ContributionLocator, ContributionSignatureLocator, Locator, Object, Storage}, + storage::{ + ContributionLocator, + ContributionSignatureLocator, + Disk, + Locator, + Object, + StorageLocator, + StorageObject, + }, CoordinatorError, }; use phase1::{helpers::CurveKind, Phase1, Phase1Parameters, PublicKey}; @@ -23,7 +31,7 @@ impl Verification { #[inline] pub(crate) fn run( environment: &Environment, - storage: &mut impl Storage, + storage: &mut Disk, signature: Arc, signing_key: &SigningKey, round_height: u64, @@ -148,7 +156,7 @@ impl Verification { #[inline] fn verification( environment: &Environment, - storage: &mut impl Storage, + storage: &mut Disk, chunk_id: u64, challenge_locator: Locator, response_locator: Locator, @@ -341,7 +349,7 @@ mod tests { use crate::{ authentication::Dummy, commands::{Computation, Seed, Verification, SEED_LENGTH}, - storage::{ContributionLocator, ContributionSignatureLocator, Locator, Object, Storage}, + storage::{ContributionLocator, ContributionSignatureLocator, Locator, Object}, testing::prelude::*, Coordinator, }; diff --git a/phase1-coordinator/src/main.rs b/phase1-coordinator/src/main.rs index 099127ff..3df2150e 100644 --- a/phase1-coordinator/src/main.rs +++ b/phase1-coordinator/src/main.rs @@ -1,7 +1,6 @@ use phase1_coordinator::{ authentication::{Dummy, Signature}, environment::{Development, Environment, Parameters}, - storage::Disk, Coordinator, }; use tracing_subscriber; diff --git a/phase1-coordinator/src/storage/disk.rs b/phase1-coordinator/src/storage/disk.rs index 2aa647d9..c890c94b 100644 --- a/phase1-coordinator/src/storage/disk.rs +++ b/phase1-coordinator/src/storage/disk.rs @@ -42,40 +42,41 @@ pub struct Disk { } impl Disk { - pub fn clear_round_files(&self, current_round_height: u64) -> Result<()> { + pub fn clear_round_files(&mut self, current_round_height: u64) -> Result<()> { // Let's first fully clear any files in the next round - these will be // verifications and represent the initial challenges. let next_round_dir = self.resolver.round_directory(current_round_height + 1); - clear_dir_files(next_round_dir.into(), &fs::remove_file)?; + self.clear_dir_files(next_round_dir.into(), true)?; // Now, let's clear all the contributions made on this round. let round_dir = self.resolver.round_directory(current_round_height); - clear_dir_files(round_dir.into(), &remove_round_contribution) + self.clear_dir_files(round_dir.into(), false) } -} -fn remove_round_contribution(path: PathBuf) -> io::Result<()> { - let file_name = path - .to_str() - .ok_or(Error::new(ErrorKind::Other, "filepath is not UTF-8 encoded"))? - .to_owned(); + fn clear_dir_files(&mut self, path: PathBuf, delete_initial_contribution: bool) -> Result<()> { + Ok(for entry in fs::read_dir(path.as_path())? { + let entry = entry?; + match entry.path().is_dir() { + true => self.clear_dir_files(entry.path(), delete_initial_contribution)?, + false => { + let file_path = entry + .path() + .to_str() + .ok_or(Error::new(ErrorKind::Other, "filepath is not UTF-8 encoded"))? + .to_owned(); + + if !delete_initial_contribution && file_path.contains("contribution_0") { + continue; + } - match file_name.contains("contribution_0") { - true => Ok(()), - false => fs::remove_file(path), + let locator = self.resolver.to_locator(&LocatorPath::new(file_path))?; + self.remove(&locator)?; + } + }; + }) } } -fn clear_dir_files(path: PathBuf, remove_file: &dyn Fn(PathBuf) -> io::Result<()>) -> Result<()> { - Ok(for entry in fs::read_dir(path.as_path())? { - let entry = entry?; - match entry.path().is_dir() { - true => clear_dir_files(entry.path(), remove_file)?, - false => remove_file(entry.path())?, - } - }) -} - impl Disk { /// Loads a new instance of `Disk`. pub fn load(environment: &Environment) -> Result diff --git a/phase1-coordinator/src/testing/coordinator.rs b/phase1-coordinator/src/testing/coordinator.rs index 88ff5024..b2b5f361 100644 --- a/phase1-coordinator/src/testing/coordinator.rs +++ b/phase1-coordinator/src/testing/coordinator.rs @@ -2,7 +2,7 @@ use crate::{ authentication::Dummy, environment::{Environment, Parameters, Testing}, objects::{Participant, Round}, - storage::{Disk, Storage}, + storage::Disk, Coordinator, CoordinatorError, }; @@ -57,7 +57,7 @@ pub static TEST_CONTRIBUTOR_IDS: Lazy> = Lazy::new(|| vec![Lazy /// Verifier IDs for testing purposes only. pub static TEST_VERIFIER_IDS: Lazy> = Lazy::new(|| vec![Lazy::force(&TEST_VERIFIER_ID).clone()]); -pub fn test_coordinator(environment: &Environment) -> anyhow::Result> { +pub fn test_coordinator(environment: &Environment) -> anyhow::Result { info!("Starting coordinator"); let coordinator = Coordinator::new(environment.clone(), Arc::new(Dummy))?; info!("Coordinator is ready"); @@ -110,7 +110,7 @@ fn clear_test_storage(environment: &Environment) { } /// Initializes a test storage object. -pub fn test_storage(environment: &Environment) -> impl Storage { +pub fn test_storage(environment: &Environment) -> Disk { environment.storage().unwrap() } From 7b71127ba0dbc779cad18678c3349bdbbc4c3d58 Mon Sep 17 00:00:00 2001 From: Jules de Smit Date: Tue, 24 Aug 2021 16:34:25 +0200 Subject: [PATCH 3/5] Fix tests --- phase1-coordinator/src/coordinator.rs | 6 +++--- phase1-coordinator/src/objects/round.rs | 8 ++++---- phase1-coordinator/src/tests.rs | 10 +++++----- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/phase1-coordinator/src/coordinator.rs b/phase1-coordinator/src/coordinator.rs index 73b26720..18871d09 100644 --- a/phase1-coordinator/src/coordinator.rs +++ b/phase1-coordinator/src/coordinator.rs @@ -2786,7 +2786,7 @@ mod tests { use std::{collections::HashMap, sync::Arc}; fn initialize_to_round_1( - coordinator: &mut Coordinator, + coordinator: &mut Coordinator, contributors: &[Participant], verifiers: &[Participant], ) -> anyhow::Result<()> { @@ -2838,7 +2838,7 @@ mod tests { Ok(()) } - fn initialize_coordinator(coordinator: &mut Coordinator) -> anyhow::Result<()> { + fn initialize_coordinator(coordinator: &mut Coordinator) -> anyhow::Result<()> { // Load the contributors and verifiers. let contributors = vec![ Lazy::force(&TEST_CONTRIBUTOR_ID).clone(), @@ -2849,7 +2849,7 @@ mod tests { initialize_to_round_1(coordinator, &contributors, &verifiers) } - fn initialize_coordinator_single_contributor(coordinator: &mut Coordinator) -> anyhow::Result<()> { + fn initialize_coordinator_single_contributor(coordinator: &mut Coordinator) -> anyhow::Result<()> { // Load the contributors and verifiers. let contributors = vec![Lazy::force(&TEST_CONTRIBUTOR_ID).clone()]; let verifiers = vec![Lazy::force(&TEST_VERIFIER_ID).clone()]; diff --git a/phase1-coordinator/src/objects/round.rs b/phase1-coordinator/src/objects/round.rs index 2cbb4775..96e02a82 100644 --- a/phase1-coordinator/src/objects/round.rs +++ b/phase1-coordinator/src/objects/round.rs @@ -1143,7 +1143,9 @@ impl Round { chunk.set_lock_holder_unsafe(None); for (id, _) in chunk.clone().get_contributions() { - chunk.remove_contribution_unsafe(*id); + if *id != 0 { + chunk.remove_contribution_unsafe(*id); + } } }); @@ -1233,10 +1235,8 @@ mod tests { let n_verifications = 30; let n_locked_chunks = 1; let n_files = 2 * n_contributions + 2 * n_verifications + 2 * n_locked_chunks; - let n_actions = n_files + 1; // include action to update round - let actions = round_1.reset(&[TEST_CONTRIBUTOR_ID_2.clone()]); - assert_eq!(n_actions, actions.len()); + let action = round_1.reset(&[TEST_CONTRIBUTOR_ID_2.clone()]); assert_eq!(64, round_1.chunks().len()); diff --git a/phase1-coordinator/src/tests.rs b/phase1-coordinator/src/tests.rs index 7ab7c29f..452e3b0f 100644 --- a/phase1-coordinator/src/tests.rs +++ b/phase1-coordinator/src/tests.rs @@ -3,7 +3,7 @@ use crate::{ commands::{Seed, SigningKey, SEED_LENGTH}, environment::{Environment, Parameters, Settings, Testing}, objects::Task, - storage::{Disk, Storage}, + storage::{Disk, StorageLocator, StorageObject}, testing::prelude::*, Coordinator, CoordinatorError, @@ -53,7 +53,7 @@ struct ContributorTestDetails { } impl ContributorTestDetails { - fn contribute_to(&self, coordinator: &mut Coordinator) -> Result<(), CoordinatorError> { + fn contribute_to(&self, coordinator: &mut Coordinator) -> Result<(), CoordinatorError> { coordinator.contribute(&self.participant, &self.signing_key, &self.seed) } } @@ -73,7 +73,7 @@ struct VerifierTestDetails { } impl VerifierTestDetails { - fn verify(&self, coordinator: &mut Coordinator) -> anyhow::Result<()> { + fn verify(&self, coordinator: &mut Coordinator) -> anyhow::Result<()> { coordinator.verify(&self.participant, &self.signing_key) } } @@ -1315,7 +1315,7 @@ fn coordinator_drop_several_contributors() { fn contribute_verify_until_no_tasks( contributor: &ContributorTestDetails, verifier: &VerifierTestDetails, - coordinator: &mut Coordinator, + coordinator: &mut Coordinator, ) -> anyhow::Result { match contributor.contribute_to(coordinator) { Err(CoordinatorError::ParticipantHasNoRemainingTasks) => Ok(true), @@ -1366,7 +1366,7 @@ fn coordinator_drop_several_contributors() { assert_eq!(0, coordinator.number_of_queue_verifiers()); } -fn check_round_matches_storage_files(storage: &impl Storage, round: &Round) { +fn check_round_matches_storage_files(storage: &Disk, round: &Round) { debug!("Checking round {}", round.round_height()); for chunk in round.chunks() { debug!("Checking chunk {}", chunk.chunk_id()); From c93c1b678bacbc97d4b003b9bcde8adf5ad303ce Mon Sep 17 00:00:00 2001 From: Luke Frisken Date: Mon, 30 Aug 2021 15:03:39 +0400 Subject: [PATCH 4/5] #303 Refactor round storage methods to return actions + remove_contributor_unsafe + remove_locks_unsafe + remove_chunk_contributions_unsafe --- phase1-coordinator/src/coordinator.rs | 20 ++-- phase1-coordinator/src/objects/round.rs | 107 +++++++++++----------- phase1-coordinator/src/storage/disk.rs | 20 +++- phase1-coordinator/src/storage/storage.rs | 5 + 4 files changed, 84 insertions(+), 68 deletions(-) diff --git a/phase1-coordinator/src/coordinator.rs b/phase1-coordinator/src/coordinator.rs index 18871d09..e6b13618 100644 --- a/phase1-coordinator/src/coordinator.rs +++ b/phase1-coordinator/src/coordinator.rs @@ -2293,12 +2293,11 @@ impl Coordinator { let mut round = Self::load_current_round(&self.storage)?; // Remove the contributor from the round self. - round.remove_contributor_unsafe( - &mut self.storage, + self.storage.perform_actions(round.remove_contributor_unsafe( &replace_action.dropped_contributor, &replace_action.locked_chunks, &replace_action.tasks, - )?; + )?)?; // Assign a replacement contributor to the dropped tasks for the current round. round.add_replacement_contributor_unsafe(replace_action.replacement_contributor.clone())?; @@ -2324,18 +2323,14 @@ impl Coordinator { warn!("Removing locked chunks and all impacted contributions"); // Remove the lock from the specified chunks. - round.remove_locks_unsafe( - &mut self.storage, - &remove_action.dropped_verifier, - &remove_action.locked_chunks, + self.storage.perform_actions( + round.remove_locks_unsafe(&remove_action.dropped_verifier, &remove_action.locked_chunks)?, )?; warn!("Removed locked chunks"); // Remove the contributions from the specified chunks. - round.remove_chunk_contributions_unsafe( - &mut self.storage, - &remove_action.dropped_verifier, - &remove_action.tasks, + self.storage.perform_actions( + round.remove_chunk_contributions_unsafe(&remove_action.dropped_verifier, &remove_action.tasks)?, )?; warn!("Removed impacted contributions"); @@ -2453,7 +2448,8 @@ impl Coordinator { let mut round = Self::load_round(&mut self.storage, current_round_height)?; tracing::debug!("Resetting round and applying storage changes"); - self.storage.process(round.reset(&reset_action.remove_participants))?; + self.storage + .perform_action(round.reset(&reset_action.remove_participants))?; // Clear all files self.storage.clear_round_files(current_round_height)?; diff --git a/phase1-coordinator/src/objects/round.rs b/phase1-coordinator/src/objects/round.rs index 96e02a82..ad33d3a9 100644 --- a/phase1-coordinator/src/objects/round.rs +++ b/phase1-coordinator/src/objects/round.rs @@ -798,19 +798,18 @@ impl Round { /// Remove a contributor from the round. pub(crate) fn remove_contributor_unsafe( &mut self, - storage: &mut Disk, contributor: &Participant, locked_chunks: &[u64], tasks: &[Task], - ) -> Result<(), CoordinatorError> { + ) -> Result, CoordinatorError> { warn!("Removing locked chunks and all impacted contributions"); // Remove the lock from the specified chunks. - self.remove_locks_unsafe(storage, contributor, locked_chunks)?; + let mut actions = self.remove_locks_unsafe(contributor, locked_chunks)?; warn!("Removed locked chunks"); // Remove the contributions from the specified chunks. - self.remove_chunk_contributions_unsafe(storage, contributor, tasks)?; + actions.append(&mut self.remove_chunk_contributions_unsafe(contributor, tasks)?); warn!("Removed impacted contributions"); self.contributor_ids = self @@ -820,7 +819,7 @@ impl Round { .filter(|participant| participant != contributor) .collect(); - Ok(()) + Ok(actions) } /// @@ -835,10 +834,9 @@ impl Round { #[inline] pub(crate) fn remove_locks_unsafe( &mut self, - storage: &mut Disk, participant: &Participant, locked_chunks: &[u64], - ) -> Result<(), CoordinatorError> { + ) -> Result, CoordinatorError> { // Sanity check that the participant holds the lock for each specified chunk. let locked_chunks: Vec<_> = locked_chunks .par_iter() @@ -854,8 +852,9 @@ impl Round { // Remove the response locator for a contributor, and remove the next challenge locator // for both a contributor and verifier. - for chunk_id in locked_chunks.into_iter() { - match participant { + let actions = locked_chunks.into_iter().map(|chunk_id| { + let mut actions: Vec = Vec::new(); + match &participant { Participant::Contributor(_) => { // Check that the participant is an *authorized* contributor // for the current round. @@ -887,25 +886,22 @@ impl Round { next_contribution_id, false, )); - if storage.exists(&response_locator) { - storage.remove(&response_locator)?; - } + + actions.push(StorageAction::RemoveIfExists(RemoveAction::new(response_locator))); // Removing contribution file signature for pending task - let response_signature_locator = Locator::ContributionFileSignature( + let unverified_response_signature_locator = Locator::ContributionFileSignature( ContributionSignatureLocator::new(current_round_height, *chunk_id, next_contribution_id, false), ); - if storage.exists(&response_signature_locator) { - storage.remove(&response_signature_locator)?; - } + + actions.push(StorageAction::RemoveIfExists(RemoveAction::new(unverified_response_signature_locator))); // Removing contribution file signature for verified task - let response_signature_locator = Locator::ContributionFileSignature( + let verified_response_signature_locator = Locator::ContributionFileSignature( ContributionSignatureLocator::new(current_round_height, *chunk_id, next_contribution_id, true), ); - if storage.exists(&response_signature_locator) { - storage.remove(&response_signature_locator)?; - } + + actions.push(StorageAction::RemoveIfExists(RemoveAction::new(verified_response_signature_locator))); // TODO: revisit the logic of removing challenges // https://github.com/AleoHQ/aleo-setup/issues/250 @@ -934,11 +930,10 @@ impl Round { true, )) }; + // Don't remove initial challenge - if storage.exists(&contribution_file) - && chunk.current_contribution()?.get_contributor().is_some() - { - storage.remove(&contribution_file)?; + if chunk.current_contribution()?.get_contributor().is_some() { + actions.push(StorageAction::RemoveIfExists(RemoveAction::new(contribution_file))); } } } @@ -960,9 +955,7 @@ impl Round { )), }; - if storage.exists(&response_locator) { - storage.remove(&response_locator)?; - } + actions.push(StorageAction::RemoveIfExists(RemoveAction::new(response_locator))); let response_locator_signature = match is_final_contribution { true => Locator::ContributionFileSignature(ContributionSignatureLocator::new( @@ -979,9 +972,7 @@ impl Round { )), }; - if storage.exists(&response_locator_signature) { - storage.remove(&response_locator_signature)?; - } + actions.push(StorageAction::RemoveIfExists(RemoveAction::new(response_locator_signature))); } }; @@ -989,9 +980,19 @@ impl Round { // Remove the lock for each given chunk ID. chunk.set_lock_holder_unsafe(None); - } - Ok(()) + Ok(actions) + }) + // flat map the results so they can be collected into a single Vec + .flat_map(|result| { + match result { + Ok(ok) => ok.into_iter().map(|action| Ok(action)).collect(), + Err(err) => vec![Err(err)], + } + }) + .collect::, CoordinatorError>>()?; + + Ok(actions) } /// @@ -1009,20 +1010,19 @@ impl Round { /// #[tracing::instrument( level = "error", - skip(self, storage, tasks), + skip(self, tasks), fields(round = self.round_height()) )] pub(crate) fn remove_chunk_contributions_unsafe( &mut self, - storage: &mut Disk, participant: &Participant, tasks: &[Task], - ) -> Result<(), CoordinatorError> { + ) -> Result, CoordinatorError> { // Check if the participant is a verifier. As verifications are not dependent // on each other, no further update is necessary in the round state. if participant.is_verifier() { warn!("Skipping removal of contributions as {} is a verifier", participant); - return Ok(()); + return Ok(Vec::new()); } // Check that the participant is in the current contributors ID. @@ -1032,41 +1032,30 @@ impl Round { } // Remove the given contribution from each chunk in the current round. - for task in tasks { + tasks.iter().map(|task| { + let mut actions: Vec = Vec::new(); let chunk = self.chunk_mut(task.chunk_id())?; if let Ok(contribution) = chunk.get_contribution(task.contribution_id()) { warn!("Removing task {:?}", task.to_tuple()); // Remove the unverified contribution file, if it exists. if let Some(locator) = contribution.get_contributed_location() { - let path = storage.to_locator(&locator)?; - if storage.exists(&path) { - storage.remove(&path)?; - } + actions.push(StorageAction::RemoveIfExists(RemoveAction::new(locator.clone()))); } // Remove the contribution signature file, if it exists. if let Some(locator) = contribution.get_contributed_signature_location() { - let path = storage.to_locator(&locator)?; - if storage.exists(&path) { - storage.remove(&path)?; - } + actions.push(StorageAction::RemoveIfExists(RemoveAction::new(locator.clone()))); } // Remove the verified contribution file, if it exists. if let Some(locator) = contribution.get_verified_location() { - let path = storage.to_locator(&locator)?; - if storage.exists(&path) { - storage.remove(&path)?; - } + actions.push(StorageAction::RemoveIfExists(RemoveAction::new(locator.clone()))); } // Remove the verified contribution file signature, if it exists. if let Some(locator) = contribution.get_verified_signature_location() { - let path = storage.to_locator(&locator)?; - if storage.exists(&path) { - storage.remove(&path)?; - } + actions.push(StorageAction::RemoveIfExists(RemoveAction::new(locator.clone()))); } // Remove the given contribution and all subsequent contributions. @@ -1081,9 +1070,17 @@ impl Round { chunk, ); } - } - Ok(()) + Ok(actions) + }) + // flat map the results so they can be collected into a single Vec + .flat_map(|result| { + match result { + Ok(ok) => ok.into_iter().map(|action| Ok(action)).collect(), + Err(err) => vec![Err(err)], + } + }) + .collect::, CoordinatorError>>() } /// diff --git a/phase1-coordinator/src/storage/disk.rs b/phase1-coordinator/src/storage/disk.rs index c890c94b..fdb3ddd6 100644 --- a/phase1-coordinator/src/storage/disk.rs +++ b/phase1-coordinator/src/storage/disk.rs @@ -440,15 +440,33 @@ impl Disk { Ok(size) } - pub fn process(&mut self, action: StorageAction) -> Result<(), CoordinatorError> { + /// Perform a [StorageAction] to mutate something in storage. + pub fn perform_action(&mut self, action: StorageAction) -> Result<(), CoordinatorError> { match action { StorageAction::Remove(remove_action) => { let locator = remove_action.try_into_locator(self)?; self.remove(&locator) } StorageAction::Update(update_action) => self.update(&update_action.locator, update_action.object), + StorageAction::RemoveIfExists(remove_action) => { + let locator = remove_action.try_into_locator(self)?; + if self.exists(&locator) { + self.remove(&locator) + } else { + Ok(()) + } + } } } + + /// Convenience method to run [Self::perform_action] over + /// something that can provide an [Iterator] of [StorageAction]. + pub fn perform_actions( + &mut self, + actions: impl IntoIterator, + ) -> Result<(), CoordinatorError> { + actions.into_iter().map(|action| self.perform_action(action)).collect() + } } impl StorageLocator for Disk { diff --git a/phase1-coordinator/src/storage/storage.rs b/phase1-coordinator/src/storage/storage.rs index ef180b3c..9990422a 100644 --- a/phase1-coordinator/src/storage/storage.rs +++ b/phase1-coordinator/src/storage/storage.rs @@ -328,7 +328,12 @@ pub struct UpdateAction { /// [Storage::process()]. #[non_exhaustive] pub enum StorageAction { + /// Remove an item in storage, will fail if the item does not yet + /// exist. Remove(RemoveAction), + /// Remove an item in storage if it exists. + RemoveIfExists(RemoveAction), + /// Update an item in storage. Update(UpdateAction), } From 01ee29f3ec74ee8a1068633f649f0fbb88149101 Mon Sep 17 00:00:00 2001 From: Luke Frisken Date: Mon, 30 Aug 2021 15:34:44 +0400 Subject: [PATCH 5/5] #303 Refactor round storage method to return actions + try_lock_chunk --- phase1-coordinator/src/coordinator.rs | 4 +- phase1-coordinator/src/objects/round.rs | 45 +++++++++++++---------- phase1-coordinator/src/storage/disk.rs | 3 ++ phase1-coordinator/src/storage/storage.rs | 18 ++++++--- 4 files changed, 45 insertions(+), 25 deletions(-) diff --git a/phase1-coordinator/src/coordinator.rs b/phase1-coordinator/src/coordinator.rs index e6b13618..3c43f8e4 100644 --- a/phase1-coordinator/src/coordinator.rs +++ b/phase1-coordinator/src/coordinator.rs @@ -1505,7 +1505,9 @@ impl Coordinator { // Attempt to acquire the chunk lock for participant. trace!("Preparing to lock chunk {}", chunk_id); - let locked_locators = round.try_lock_chunk(&self.environment, &mut self.storage, chunk_id, &participant)?; + let (locked_locators, actions) = + round.try_lock_chunk(&self.environment, &self.storage, chunk_id, &participant)?; + self.storage.perform_actions(actions)?; trace!("Participant {} locked chunk {}", participant, chunk_id); // Add the updated round to storage. diff --git a/phase1-coordinator/src/objects/round.rs b/phase1-coordinator/src/objects/round.rs index ad33d3a9..f38b3450 100644 --- a/phase1-coordinator/src/objects/round.rs +++ b/phase1-coordinator/src/objects/round.rs @@ -5,6 +5,7 @@ use crate::{ ContributionLocator, ContributionSignatureLocator, Disk, + InitializeAction, Locator, LocatorPath, Object, @@ -551,10 +552,10 @@ impl Round { pub(crate) fn try_lock_chunk( &mut self, environment: &Environment, - storage: &mut Disk, + storage: &Disk, chunk_id: u64, participant: &Participant, - ) -> Result { + ) -> Result<(LockedLocators, Vec), CoordinatorError> { debug!("{} is attempting to lock chunk {}", participant, chunk_id); // Check that the participant is holding less than the chunk lock limit. @@ -711,38 +712,44 @@ impl Round { self.chunk_mut(chunk_id)? .acquire_lock(participant.clone(), expected_num_contributions)?; + let mut actions: Vec = Vec::new(); + // Initialize the next contribution locator. match participant { Participant::Contributor(_) => { // Initialize the unverified response file. - storage.initialize( - Locator::ContributionFile(locked_locators.next_contribution.clone()), - Object::contribution_file_size(environment, chunk_id, false), - )?; + actions.push(StorageAction::Initialize(InitializeAction { + locator: Locator::ContributionFile(locked_locators.next_contribution.clone()), + object_size: Object::contribution_file_size(environment, chunk_id, false), + })); // Initialize the contribution file signature. - storage.initialize( - Locator::ContributionFileSignature(locked_locators.next_contribution_file_signature.clone()), - Object::contribution_file_signature_size(false), - )?; + actions.push(StorageAction::Initialize(InitializeAction { + locator: Locator::ContributionFileSignature( + locked_locators.next_contribution_file_signature.clone(), + ), + object_size: Object::contribution_file_signature_size(false), + })); } Participant::Verifier(_) => { // Initialize the next challenge file. - storage.initialize( - Locator::ContributionFile(locked_locators.next_contribution.clone()), - Object::contribution_file_size(environment, chunk_id, true), - )?; + actions.push(StorageAction::Initialize(InitializeAction { + locator: Locator::ContributionFile(locked_locators.next_contribution.clone()), + object_size: Object::contribution_file_size(environment, chunk_id, true), + })); // Initialize the contribution file signature. - storage.initialize( - Locator::ContributionFileSignature(locked_locators.next_contribution_file_signature.clone()), - Object::contribution_file_signature_size(true), - )?; + actions.push(StorageAction::Initialize(InitializeAction { + locator: Locator::ContributionFileSignature( + locked_locators.next_contribution_file_signature.clone(), + ), + object_size: Object::contribution_file_signature_size(true), + })); } }; debug!("{} locked chunk {}", participant, chunk_id); - Ok(locked_locators) + Ok((locked_locators, actions)) } /// diff --git a/phase1-coordinator/src/storage/disk.rs b/phase1-coordinator/src/storage/disk.rs index fdb3ddd6..7d9804d5 100644 --- a/phase1-coordinator/src/storage/disk.rs +++ b/phase1-coordinator/src/storage/disk.rs @@ -456,6 +456,9 @@ impl Disk { Ok(()) } } + StorageAction::Initialize(initialize_action) => { + self.initialize(initialize_action.locator, initialize_action.object_size) + } } } diff --git a/phase1-coordinator/src/storage/storage.rs b/phase1-coordinator/src/storage/storage.rs index 9990422a..f9904cf4 100644 --- a/phase1-coordinator/src/storage/storage.rs +++ b/phase1-coordinator/src/storage/storage.rs @@ -287,7 +287,7 @@ impl From for LocatorOrPath { } } -/// An action to remove an item from [Storage]. +/// An action to remove an item from [Disk]. #[derive(Clone, PartialEq, Debug)] pub struct RemoveAction { locator_or_path: LocatorOrPath, @@ -301,13 +301,13 @@ impl RemoveAction { } } - /// Obtain the location of the item to be removed from [Storage] + /// Obtain the location of the item to be removed from [Disk] /// as a [LocatorOrPath]. pub fn locator_or_path(&self) -> &LocatorOrPath { &self.locator_or_path } - /// Obtain the location of the item to be removed from [Storage] + /// Obtain the location of the item to be removed from [Disk] /// as a [Locator]. pub fn try_into_locator(self, storage: &Disk) -> Result { self.locator_or_path.try_into_locator(storage) @@ -318,13 +318,19 @@ impl RemoveAction { } } -/// An action to update an item in [Storage]. +/// An action to update an item in [Disk]. pub struct UpdateAction { pub locator: Locator, pub object: Object, } -/// An action taken to mutate [Storage], which can be processed by +/// An action to initialize an item in [Disk]. +pub struct InitializeAction { + pub locator: Locator, + pub object_size: u64, +} + +/// An action taken to mutate [Disk], which can be processed by /// [Storage::process()]. #[non_exhaustive] pub enum StorageAction { @@ -335,6 +341,8 @@ pub enum StorageAction { RemoveIfExists(RemoveAction), /// Update an item in storage. Update(UpdateAction), + /// Initialize an item in storage. + Initialize(InitializeAction), } pub trait StorageLocator {