Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor/303 round return storage actions #367

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions phase1-coordinator/src/commands/aggregation.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
environment::Environment,
objects::Round,
storage::{ContributionLocator, Locator, Object, ObjectReader, Storage},
storage::{ContributionLocator, Disk, Locator, Object, ObjectReader, StorageLocator, StorageObject},
CoordinatorError,
};
use phase1::{helpers::CurveKind, Phase1};
Expand All @@ -15,7 +15,7 @@ pub(crate) struct Aggregation;
impl Aggregation {
/// Runs aggregation for a given environment, storage, and round.
#[inline]
pub(crate) fn run(environment: &Environment, storage: &mut impl Storage, round: &Round) -> anyhow::Result<()> {
pub(crate) fn run(environment: &Environment, storage: &mut Disk, round: &Round) -> anyhow::Result<()> {
let start = Instant::now();

// Fetch the round height.
Expand Down Expand Up @@ -95,7 +95,7 @@ impl Aggregation {
#[inline]
fn readers<'a>(
environment: &Environment,
storage: &'a impl Storage,
storage: &'a Disk,
round: &Round,
) -> anyhow::Result<Vec<ObjectReader<'a>>> {
let mut readers = vec![];
Expand Down Expand Up @@ -148,7 +148,7 @@ mod tests {
use crate::{
authentication::Dummy,
commands::{Aggregation, Seed, SigningKey, SEED_LENGTH},
storage::{Locator, Storage},
storage::Locator,
testing::prelude::*,
Coordinator,
};
Expand Down
6 changes: 3 additions & 3 deletions phase1-coordinator/src/commands/computation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
authentication::Signature,
commands::SigningKey,
environment::Environment,
storage::{Locator, Storage},
storage::{Disk, Locator, StorageLocator, StorageObject},
CoordinatorError,
};
use phase1::{helpers::CurveKind, Phase1, Phase1Parameters};
Expand Down Expand Up @@ -30,7 +30,7 @@ impl Computation {
///
pub(crate) fn run(
environment: &Environment,
storage: &mut impl Storage,
storage: &mut Disk,
signature: Arc<dyn Signature>,
contributor_signing_key: &SigningKey,
challenge_locator: &Locator,
Expand Down Expand Up @@ -175,7 +175,7 @@ mod tests {
use crate::{
authentication::{Dummy, Signature},
commands::{Computation, Initialization, Seed, SEED_LENGTH},
storage::{ContributionLocator, ContributionSignatureLocator, Locator, Object, Storage, StorageObject},
storage::{ContributionLocator, ContributionSignatureLocator, Locator, Object, StorageObject},
testing::prelude::*,
};
use setup_utils::calculate_hash;
Expand Down
6 changes: 3 additions & 3 deletions phase1-coordinator/src/commands/initialization.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
environment::Environment,
storage::{ContributionLocator, Locator, Object, Storage},
storage::{ContributionLocator, Disk, Locator, Object, StorageObject},
CoordinatorError,
};
use phase1::{helpers::CurveKind, Phase1, Phase1Parameters};
Expand All @@ -23,7 +23,7 @@ impl Initialization {
#[inline]
pub(crate) fn run(
environment: &Environment,
storage: &mut impl Storage,
storage: &mut Disk,
round_height: u64,
chunk_id: u64,
) -> anyhow::Result<Vec<u8>> {
Expand Down Expand Up @@ -100,7 +100,7 @@ impl Initialization {
/// Compute both contribution hashes and check for equivalence.
#[inline]
fn check_hash(
storage: &impl Storage,
storage: &Disk,
contribution_locator: &Locator,
next_contribution_locator: &Locator,
) -> anyhow::Result<Vec<u8>> {
Expand Down
4 changes: 2 additions & 2 deletions phase1-coordinator/src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub(crate) use verification::*;
use crate::{
authentication::Signature,
objects::{ContributionFileSignature, ContributionState},
storage::{Locator, Storage},
storage::{Disk, Locator, StorageLocator, StorageObject},
CoordinatorError,
};

Expand Down Expand Up @@ -47,7 +47,7 @@ pub type SigningKey = String;
#[cfg(any(test, feature = "operator"))]
#[inline]
pub(crate) fn write_contribution_file_signature(
storage: &mut impl Storage,
storage: &mut Disk,
signature: Arc<dyn Signature>,
signing_key: &SigningKey,
challenge_locator: &Locator,
Expand Down
16 changes: 12 additions & 4 deletions phase1-coordinator/src/commands/verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,15 @@ use crate::{
authentication::Signature,
commands::SigningKey,
environment::Environment,
storage::{ContributionLocator, ContributionSignatureLocator, Locator, Object, Storage},
storage::{
ContributionLocator,
ContributionSignatureLocator,
Disk,
Locator,
Object,
StorageLocator,
StorageObject,
},
CoordinatorError,
};
use phase1::{helpers::CurveKind, Phase1, Phase1Parameters, PublicKey};
Expand All @@ -23,7 +31,7 @@ impl Verification {
#[inline]
pub(crate) fn run(
environment: &Environment,
storage: &mut impl Storage,
storage: &mut Disk,
signature: Arc<dyn Signature>,
signing_key: &SigningKey,
round_height: u64,
Expand Down Expand Up @@ -148,7 +156,7 @@ impl Verification {
#[inline]
fn verification(
environment: &Environment,
storage: &mut impl Storage,
storage: &mut Disk,
chunk_id: u64,
challenge_locator: Locator,
response_locator: Locator,
Expand Down Expand Up @@ -341,7 +349,7 @@ mod tests {
use crate::{
authentication::Dummy,
commands::{Computation, Seed, Verification, SEED_LENGTH},
storage::{ContributionLocator, ContributionSignatureLocator, Locator, Object, Storage},
storage::{ContributionLocator, ContributionSignatureLocator, Locator, Object},
testing::prelude::*,
Coordinator,
};
Expand Down
81 changes: 29 additions & 52 deletions phase1-coordinator/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ use crate::{
Locator,
LocatorPath,
Object,
Storage,
StorageAction,
StorageLocator,
StorageObject,
},
};
use setup_utils::calculate_hash;
Expand Down Expand Up @@ -330,13 +331,13 @@ impl TimeSource for MockTimeSource {
/// A core structure for operating the Phase 1 ceremony. This struct
/// is designed to be [Send] + [Sync]. The state of the ceremony is
/// stored in a [CoordinatorState] object.
pub struct Coordinator<S: Storage> {
pub struct Coordinator {
/// The parameters and settings of this coordinator.
environment: Environment,
/// The signature scheme for contributors & verifiers with this coordinator.
signature: Arc<dyn Signature>,
/// The storage of contributions and rounds for this coordinator.
storage: S,
storage: Disk,
/// The current round and participant self.
state: CoordinatorState,
/// The source of time, allows mocking system time for testing.
Expand All @@ -345,7 +346,7 @@ pub struct Coordinator<S: Storage> {
aggregation_callback: Arc<dyn Fn(Vec<Participant>) -> () + Send + Sync>,
}

impl Coordinator<Disk> {
impl Coordinator {
///
/// Creates a new instance of the `Coordinator`, for a given environment.
///
Expand Down Expand Up @@ -393,10 +394,7 @@ impl Coordinator<Disk> {
}
}

impl<S> Coordinator<S>
where
S: Storage + 'static,
{
impl Coordinator {
///
/// Runs a set of operations to initialize state and start the coordinator.
///
Expand Down Expand Up @@ -1507,7 +1505,9 @@ where

// Attempt to acquire the chunk lock for participant.
trace!("Preparing to lock chunk {}", chunk_id);
let locked_locators = round.try_lock_chunk(&self.environment, &mut self.storage, chunk_id, &participant)?;
let (locked_locators, actions) =
round.try_lock_chunk(&self.environment, &self.storage, chunk_id, &participant)?;
self.storage.perform_actions(actions)?;
trace!("Participant {} locked chunk {}", participant, chunk_id);

// Add the updated round to storage.
Expand Down Expand Up @@ -2295,12 +2295,11 @@ where
let mut round = Self::load_current_round(&self.storage)?;

// Remove the contributor from the round self.
round.remove_contributor_unsafe(
&mut self.storage,
self.storage.perform_actions(round.remove_contributor_unsafe(
&replace_action.dropped_contributor,
&replace_action.locked_chunks,
&replace_action.tasks,
)?;
)?)?;

// Assign a replacement contributor to the dropped tasks for the current round.
round.add_replacement_contributor_unsafe(replace_action.replacement_contributor.clone())?;
Expand All @@ -2326,18 +2325,14 @@ where
warn!("Removing locked chunks and all impacted contributions");

// Remove the lock from the specified chunks.
round.remove_locks_unsafe(
&mut self.storage,
&remove_action.dropped_verifier,
&remove_action.locked_chunks,
self.storage.perform_actions(
round.remove_locks_unsafe(&remove_action.dropped_verifier, &remove_action.locked_chunks)?,
)?;
warn!("Removed locked chunks");

// Remove the contributions from the specified chunks.
round.remove_chunk_contributions_unsafe(
&mut self.storage,
&remove_action.dropped_verifier,
&remove_action.tasks,
self.storage.perform_actions(
round.remove_chunk_contributions_unsafe(&remove_action.dropped_verifier, &remove_action.tasks)?,
)?;
warn!("Removed impacted contributions");

Expand All @@ -2355,7 +2350,7 @@ where
}

#[inline]
fn load_current_round_height(storage: &S) -> Result<u64, CoordinatorError> {
fn load_current_round_height(storage: &Disk) -> Result<u64, CoordinatorError> {
// Fetch the current round height from storage.
match storage.get(&Locator::RoundHeight)? {
// Case 1 - This is a typical round of the ceremony.
Expand All @@ -2366,7 +2361,7 @@ where
}

#[inline]
fn load_current_round(storage: &S) -> Result<Round, CoordinatorError> {
fn load_current_round(storage: &Disk) -> Result<Round, CoordinatorError> {
// Fetch the current round height from storage.
let current_round_height = Self::load_current_round_height(storage)?;

Expand All @@ -2375,7 +2370,7 @@ where
}

#[inline]
fn load_round(storage: &S, round_height: u64) -> Result<Round, CoordinatorError> {
fn load_round(storage: &Disk, round_height: u64) -> Result<Round, CoordinatorError> {
// Fetch the current round height from storage.
let current_round_height = Self::load_current_round_height(storage)?;

Expand All @@ -2399,7 +2394,7 @@ where
///
#[cfg(test)]
#[inline]
pub(super) fn storage(&self) -> &S {
pub(super) fn storage(&self) -> &Disk {
&self.storage
}

Expand All @@ -2409,7 +2404,7 @@ where
///
#[cfg(test)]
#[inline]
pub(super) fn storage_mut(&mut self) -> &mut S {
pub(super) fn storage_mut(&mut self) -> &mut Disk {
&mut self.storage
}

Expand Down Expand Up @@ -2455,26 +2450,11 @@ where
let mut round = Self::load_round(&mut self.storage, current_round_height)?;

tracing::debug!("Resetting round and applying storage changes");
if let Some(error) = round
.reset(&reset_action.remove_participants)
.into_iter()
.map(|action| match &action {
StorageAction::Remove(a) => match a.clone().try_into_locator(&self.storage) {
Ok(locator) => {
if self.storage.exists(&locator) {
return self.storage.process(action);
} else {
Ok(())
}
}
Err(e) => Err(e),
},
_ => self.storage.process(action),
})
.find_map(Result::err)
{
return Err(error);
}
self.storage
.perform_action(round.reset(&reset_action.remove_participants))?;

// Clear all files
self.storage.clear_round_files(current_round_height)?;

if reset_action.rollback {
if current_round_height == 0 {
Expand All @@ -2501,10 +2481,7 @@ where
use crate::commands::{Computation, Seed, SigningKey, Verification};

#[cfg(any(test, feature = "operator"))]
impl<S> Coordinator<S>
where
S: Storage + 'static,
{
impl Coordinator {
#[tracing::instrument(
skip(self, contributor, contributor_signing_key, contributor_seed),
fields(contributor = %contributor),
Expand Down Expand Up @@ -2807,7 +2784,7 @@ mod tests {
use std::{collections::HashMap, sync::Arc};

fn initialize_to_round_1(
coordinator: &mut Coordinator<Disk>,
coordinator: &mut Coordinator,
contributors: &[Participant],
verifiers: &[Participant],
) -> anyhow::Result<()> {
Expand Down Expand Up @@ -2859,7 +2836,7 @@ mod tests {
Ok(())
}

fn initialize_coordinator(coordinator: &mut Coordinator<Disk>) -> anyhow::Result<()> {
fn initialize_coordinator(coordinator: &mut Coordinator) -> anyhow::Result<()> {
// Load the contributors and verifiers.
let contributors = vec![
Lazy::force(&TEST_CONTRIBUTOR_ID).clone(),
Expand All @@ -2870,7 +2847,7 @@ mod tests {
initialize_to_round_1(coordinator, &contributors, &verifiers)
}

fn initialize_coordinator_single_contributor(coordinator: &mut Coordinator<Disk>) -> anyhow::Result<()> {
fn initialize_coordinator_single_contributor(coordinator: &mut Coordinator) -> anyhow::Result<()> {
// Load the contributors and verifiers.
let contributors = vec![Lazy::force(&TEST_CONTRIBUTOR_ID).clone()];
let verifiers = vec![Lazy::force(&TEST_VERIFIER_ID).clone()];
Expand Down
4 changes: 2 additions & 2 deletions phase1-coordinator/src/coordinator_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
participant::*,
task::{initialize_tasks, Task},
},
storage::{Locator, Object, Storage},
storage::{Disk, Locator, Object},
CoordinatorError,
TimeSource,
};
Expand Down Expand Up @@ -3475,7 +3475,7 @@ impl CoordinatorState {

/// Save the coordinator state in storage.
#[inline]
pub(crate) fn save(&self, storage: &mut impl Storage) -> Result<(), CoordinatorError> {
pub(crate) fn save(&self, storage: &mut Disk) -> Result<(), CoordinatorError> {
storage.update(&Locator::CoordinatorState, Object::CoordinatorState(self.clone()))
}
}
Expand Down
5 changes: 1 addition & 4 deletions phase1-coordinator/src/environment.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use crate::{
objects::Participant,
storage::{Disk, Storage},
};
use crate::{objects::Participant, storage::Disk};
use phase1::{helpers::CurveKind, ContributionMode, ProvingSystem};
use setup_utils::{CheckForCorrectness, UseCompression};

Expand Down
Loading