diff --git a/upstairs/src/downstairs.rs b/upstairs/src/downstairs.rs index cc0a94e71..35d80d6ed 100644 --- a/upstairs/src/downstairs.rs +++ b/upstairs/src/downstairs.rs @@ -1,6 +1,6 @@ // Copyright 2023 Oxide Computer Company use std::{ - collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque}, + collections::{BTreeMap, HashMap, HashSet, VecDeque}, net::SocketAddr, sync::Arc, time::Duration, @@ -139,11 +139,6 @@ pub(crate) struct Downstairs { /// Data for an in-progress live repair repair: Option, - /// Jobs that are ready to be acked - /// - /// This must be handled after every event - ackable_work: BTreeSet, - /// Region definition (copied from the upstairs) ddef: Option, @@ -155,49 +150,70 @@ pub(crate) struct Downstairs { reqwest_client: reqwest::Client, } +/// Tuple storing a job and an optional result +#[derive(Debug)] +pub(crate) struct PendingJob { + id: JobId, + result: Option>, +} + +impl PendingJob { + fn new(id: JobId) -> Self { + Self { id, result: None } + } +} + /// State machine for a live-repair operation /// /// We pass through all states (except `FinalFlush`) in order for each extent, /// then pass through the `FinalFlush` state before completion. In each state, -/// we're waiting for a particular job to finish, which is indicated by a -/// `BlockOpWaiter`. -/// -/// Early states carry around reserved IDs (both `JobId` and guest work IDs), as -/// well as a reserved `BlockOpWaiter` for the final flush. -#[derive(Copy, Clone, Debug)] +/// we're waiting for a particular job to finish. We store a `PendingJob` for +/// every pending job, because it may be possible for more than one to finish +/// before checking live-repair state. +#[derive(Debug)] pub(crate) enum LiveRepairState { Closing { - close_id: JobId, - repair_id: JobId, - noop_id: JobId, - reopen_id: JobId, + close_job: PendingJob, + repair_job: PendingJob, + noop_job: PendingJob, + reopen_job: PendingJob, }, Repairing { - repair_id: JobId, - noop_id: JobId, - reopen_id: JobId, + repair_job: PendingJob, + noop_job: PendingJob, + reopen_job: PendingJob, }, Noop { - noop_id: JobId, - reopen_id: JobId, + noop_job: PendingJob, + reopen_job: PendingJob, }, Reopening { - reopen_id: JobId, + reopen_job: PendingJob, }, FinalFlush { - flush_id: JobId, + flush_job: PendingJob, }, } impl LiveRepairState { - /// Returns the job ID that we're waiting on at the moment - fn active_job_id(&self) -> JobId { + fn dummy() -> Self { + LiveRepairState::Noop { + noop_job: PendingJob::new(JobId(0)), + reopen_job: PendingJob::new(JobId(0)), + } + } + + /// Returns the result for the job that we're waiting on at the moment + /// + /// This is a consuming function; the result is removed from the object. + #[must_use] + fn active_job_result(&mut self) -> Option> { match self { - LiveRepairState::Closing { close_id, .. } => *close_id, - LiveRepairState::Repairing { repair_id, .. } => *repair_id, - LiveRepairState::Noop { noop_id, .. } => *noop_id, - LiveRepairState::Reopening { reopen_id, .. } => *reopen_id, - LiveRepairState::FinalFlush { flush_id } => *flush_id, + LiveRepairState::Closing { close_job: j, .. } + | LiveRepairState::Repairing { repair_job: j, .. } + | LiveRepairState::Noop { noop_job: j, .. } + | LiveRepairState::Reopening { reopen_job: j, .. } + | LiveRepairState::FinalFlush { flush_job: j } => j.result.take(), } } } @@ -246,6 +262,47 @@ pub(crate) struct LiveRepairData { state: LiveRepairState, } +impl LiveRepairData { + /// If the given job is one that we're waiting for, store its result + fn on_job_complete(&mut self, ds_id: JobId, io: &DownstairsIO) { + // Build an array of `Option<&mut PendingJob>` to check + let jobs = match &mut self.state { + LiveRepairState::Closing { + close_job, + repair_job, + noop_job, + reopen_job, + } => [ + Some(close_job), + Some(repair_job), + Some(noop_job), + Some(reopen_job), + ], + LiveRepairState::Repairing { + repair_job, + noop_job, + reopen_job, + } => [Some(repair_job), Some(noop_job), Some(reopen_job), None], + LiveRepairState::Noop { + noop_job, + reopen_job, + } => [Some(noop_job), Some(reopen_job), None, None], + LiveRepairState::Reopening { reopen_job } => { + [Some(reopen_job), None, None, None] + } + LiveRepairState::FinalFlush { flush_job } => { + [Some(flush_job), None, None, None] + } + }; + for pending in jobs.into_iter().flatten() { + if pending.id == ds_id { + assert!(pending.result.is_none()); + pending.result = Some(io.result()) + } + } + } +} + #[derive(Debug)] pub(crate) struct ReconcileData { /// An ID uniquely identifying this reconciliation @@ -321,7 +378,6 @@ impl Downstairs { reconcile_repair_needed: 0, reconcile_repair_aborted: 0, log: log.new(o!("" => "downstairs".to_string())), - ackable_work: BTreeSet::new(), repair: None, ddef: None, stats, @@ -418,23 +474,6 @@ impl Downstairs { } } - /// Checks whether we have ackable work - pub(crate) fn has_ackable_jobs(&self) -> bool { - !self.ackable_work.is_empty() - } - - /// Send back acks for all jobs that are `AckReady` - pub(crate) fn ack_jobs(&mut self) { - debug!(self.log, "ack_jobs called in Downstairs"); - - let ack_list = std::mem::take(&mut self.ackable_work); - let jobs_checked = ack_list.len(); - for ds_id_done in ack_list.iter() { - self.ack_job(*ds_id_done); - } - debug!(self.log, "ack_ready handled {jobs_checked} jobs"); - } - /// Send the ack for a single job back upstairs through `GuestWork` /// /// Update stats for the upstairs as well @@ -493,6 +532,10 @@ impl Downstairs { } debug!(self.log, "[A] ack job {}", ds_id); + if let Some(r) = &mut self.repair { + r.on_job_complete(ds_id, done); + } + // Copy (if present) read data back to the guest buffer they // provided to us, and notify any waiters. if let Some(res) = done.res.take() { @@ -508,7 +551,6 @@ impl Downstairs { } else { panic!("job {ds_id} not on gw_active list"); } - self.retire_check(ds_id); } /// Helper function to calculate pruned deps for a given job @@ -961,33 +1003,19 @@ impl Downstairs { /// Checks whether live-repair can continue /// - /// If live-repair can continue, returns the relevant `JobId`, which should - /// be passed into `self.continue_live_repair(..)` - /// - /// This must be called before `Downstairs::ack_jobs`, because it looks for - /// the repair job in `self.ackable_work` to decide if it's done. - fn get_live_repair_job(&mut self) -> Option { - if let Some(repair) = &self.repair { - let ds_id = repair.state.active_job_id(); - if self.ackable_work.contains(&ds_id) { - Some(ds_id) - } else { - // The job that live-repair is waiting on isn't yet ackable, and - // it better not have already been acked. - let job = self.ds_active.get(&ds_id).unwrap(); - assert!(!job.acked); - None - } - } else { - // No live-repair in progress, nothing to do - None - } + /// If live-repair can continue, returns the relevant job result, which + /// should be passed into `self.continue_live_repair(..)`. + #[must_use] + fn get_live_repair_job(&mut self) -> Option> { + self.repair + .as_mut() + .and_then(|r| r.state.active_job_result()) } /// Pushes live-repair forward, if possible /// - /// It's possible that handling the current live-repair job will make - /// subsequent live-repair jobs ackable immediately + /// It's possible that handling the current live-repair job will cause + /// subsequent live-repair jobs to be completed immediately /// (`test_repair_extent_fail_noop_out_of_order` exercises this case). As /// such, this function will continue running until the next live-repair job /// is not ready. @@ -995,17 +1023,16 @@ impl Downstairs { &mut self, up_state: &UpstairsState, ) { - while let Some(ds_id) = self.get_live_repair_job() { - self.continue_live_repair(ds_id, up_state); + while let Some(r) = self.get_live_repair_job() { + self.continue_live_repair(r, up_state); } } - fn continue_live_repair(&mut self, ds_id: JobId, up_state: &UpstairsState) { - let done = self.ds_active.get(&ds_id).unwrap(); - assert!(!done.acked); - assert!(self.ackable_work.contains(&ds_id)); - let r = done.result(); - + fn continue_live_repair( + &mut self, + r: Result<(), CrucibleError>, + up_state: &UpstairsState, + ) { let Some(repair) = &self.repair else { panic!("cannot continue live-repair without self.repair"); }; @@ -1049,23 +1076,27 @@ impl Downstairs { // calling such functions; we have to extract everything we want from // the `LiveRepairData` before calling anything on `&mut self`. let repair = self.repair.as_mut().unwrap(); // reborrow - match repair.state { + + // steal `repair.state`, because we're about to replace it + match std::mem::replace(&mut repair.state, LiveRepairState::dummy()) { LiveRepairState::Closing { - close_id, - repair_id, - noop_id, - reopen_id, + close_job, + repair_job, + noop_job, + reopen_job, } => { info!( self.log, "RE:{} Wait for result from repair command {}", repair.active_extent, - repair_id, + repair_job.id, ); + let close_id = close_job.id; + let repair_id = repair_job.id; repair.state = LiveRepairState::Repairing { - repair_id, - noop_id, - reopen_id, + repair_job, + noop_job, + reopen_job, }; if repair.aborting_repair { self.create_and_enqueue_noop_io(vec![close_id], repair_id); @@ -1083,28 +1114,33 @@ impl Downstairs { }; } LiveRepairState::Repairing { - repair_id, - noop_id, - reopen_id, + repair_job, + noop_job, + reopen_job, } => { info!( self.log, "RE:{} Wait for result from NoOp command {}", repair.active_extent, - noop_id, + noop_job.id, ); - repair.state = LiveRepairState::Noop { noop_id, reopen_id }; + let repair_id = repair_job.id; + let noop_id = noop_job.id; + repair.state = LiveRepairState::Noop { + noop_job, + reopen_job, + }; self.create_and_enqueue_noop_io(vec![repair_id], noop_id); } - LiveRepairState::Noop { reopen_id, .. } => { + LiveRepairState::Noop { reopen_job, .. } => { info!( self.log, "RE:{} Wait for result from reopen command {}", repair.active_extent, - reopen_id, + reopen_job.id, ); // The reopen job was already queued, so just change state - repair.state = LiveRepairState::Reopening { reopen_id }; + repair.state = LiveRepairState::Reopening { reopen_job }; } LiveRepairState::Reopening { .. } => { // It's possible that we've reached the end of our extents! @@ -1129,7 +1165,9 @@ impl Downstairs { // The borrow was dropped earlier, so reborrow `self.repair` self.repair.as_mut().unwrap().state = - LiveRepairState::FinalFlush { flush_id } + LiveRepairState::FinalFlush { + flush_job: PendingJob::new(flush_id), + } } else { // Keep going! repair.active_extent = next_extent; @@ -1342,10 +1380,10 @@ impl Downstairs { ); let state = LiveRepairState::Closing { - close_id, - repair_id, - reopen_id, - noop_id, + close_job: PendingJob::new(close_id), + repair_job: PendingJob::new(repair_id), + reopen_job: PendingJob::new(reopen_id), + noop_job: PendingJob::new(noop_id), }; if let Some(extent_count) = extent_count { self.repair = Some(LiveRepairData { @@ -2140,7 +2178,7 @@ impl Downstairs { /// /// - enqueue the job in each of [Self::clients] (clients may skip the job) /// - add the job to [Self::ds_active] - /// - Mark the job as ackable if it was skipped by all downstairs + /// - Ack the job immediately if it was skipped by all downstairs /// - Check that the job was already acked if it's a write (the "fast ack" /// optimization, which is performed elsewhere) /// - Send the job to each downstairs client task (if not skipped) @@ -2176,12 +2214,6 @@ impl Downstairs { // Writes are acked immediately, before being enqueued let acked = io.is_write(); - if skipped == 3 { - if !acked { - self.ackable_work.insert(ds_id); - } - warn!(self.log, "job {} skipped on all downstairs", &ds_id); - } // Puts the IO onto the downstairs work queue. self.ds_active.insert( @@ -2196,11 +2228,22 @@ impl Downstairs { io_limits, }, ); + if acked { self.acked_ids.push(ds_id) } else { self.gw_active.insert(ds_id); } + + // Ack the job immediately if it was skipped on all 3x downstairs + // (and wasn't previously acked, i.e. isn't a write) + if skipped == 3 { + if !acked { + self.ack_job(ds_id); + } + self.retire_check(ds_id); + warn!(self.log, "job {} skipped on all downstairs", &ds_id); + } } /// Sends the given job to the given client @@ -2540,8 +2583,8 @@ impl Downstairs { /// Move all `New` and `InProgress` jobs for the given client to `Skipped` /// - /// This may lead to jobs being marked as ackable, since a skipped job - /// counts as complete in some circumstances. + /// This may lead to jobs being acked, since a skipped job counts as + /// complete in some circumstances. pub(crate) fn skip_all_jobs(&mut self, client_id: ClientId) { info!( self.log, @@ -2549,6 +2592,7 @@ impl Downstairs { self.ds_active.len(), ); + let mut ack_jobs = vec![]; let mut retire_check = vec![]; let mut number_jobs_skipped = 0; @@ -2572,12 +2616,16 @@ impl Downstairs { self.log, "[{}] notify = true for {}", client_id, ds_id ); - self.ackable_work.insert(*ds_id); + ack_jobs.push(*ds_id); } } } }); + for ds_id in ack_jobs { + self.ack_job(ds_id); + } + info!( self.log, "[{}] changed {} jobs to fault skipped", @@ -3174,11 +3222,10 @@ impl Downstairs { /// Wrapper for marking a single job as done from the given client /// - /// This can be used to test handling of ackable work, etc. + /// This can be used to test handling of job acks, etc /// - /// Returns true if the given job has gone from not ackable (not present in - /// `self.ackable_work`) to ackable. This is for historical reasons, - /// because it's often used in existing tests. + /// Returns true if the given job has gone from not acked to acked. This is + /// for historical reasons, because it's often used in existing tests. #[cfg(test)] pub fn process_ds_completion( &mut self, @@ -3188,7 +3235,7 @@ impl Downstairs { up_state: &UpstairsState, extent_info: Option, ) -> bool { - let was_ackable = self.ackable_work.contains(&ds_id); + let was_acked = self.ds_active.get(&ds_id).unwrap().acked; self.process_io_completion_inner( ds_id, @@ -3197,9 +3244,11 @@ impl Downstairs { up_state, extent_info, ); - let now_ackable = self.ackable_work.contains(&ds_id); - !was_ackable && now_ackable - // TODO should this also ack the job, to mimick our event loop? + // It's possible for the IO completion to have retired this job; in that + // case, it must have been acked before being retired. + let now_acked = + self.ds_active.get(&ds_id).map(|j| j.acked).unwrap_or(true); + !was_acked && now_acked } fn process_io_completion_inner( @@ -3235,38 +3284,22 @@ impl Downstairs { return; }; - if self.clients[client_id].process_io_completion( + let was_acked = job.acked; + let should_ack = self.clients[client_id].process_io_completion( ds_id, job, responses, deactivate, extent_info, - ) { - self.ackable_work.insert(ds_id); - } + ); - /* - * If all 3 jobs are done, we can check here to see if we can - * remove this job from the DS list. If we have completed the ack - * to the guest, then there will be no more work on this job - * but messages may still be unprocessed. - */ - if job.acked { - self.retire_check(ds_id); - } else { - // If we reach this then the job probably has errors and - // hasn't acked back yet. We check for NotAcked so we don't - // double count three done and return true if we already have - // AckReady set. + // If all 3 jobs are done, we can check here to see if we can remove + // this job from the DS list. + let wc = job.state_count(); + let complete = (wc.error + wc.skipped + wc.done) == 3; - // If we are a write or a flush with one success, then - // we must switch our state to failed. This condition is - // handled when we check the job result. - let wc = job.state_count(); - if (wc.error + wc.skipped + wc.done) == 3 { - self.ackable_work.insert(ds_id); - debug!(self.log, "[{}] Set AckReady {}", client_id, ds_id); - } + if !was_acked && (should_ack || complete) { + self.ack_job(ds_id); } // Decide what to do when we have an error from this IO. @@ -3329,6 +3362,10 @@ impl Downstairs { // Nothing to do here, no error! } } + + if complete { + self.retire_check(ds_id); + } } /// Accessor for [`Downstairs::reconcile_repaired`] @@ -3370,31 +3407,6 @@ impl Downstairs { .unwrap_or(0) } - /// Marks a single job as acked - /// - /// The job is removed from `self.ackable_work` and `acked` is set to `true` - /// in the job's state. - /// - /// This is only useful in tests; in real code, we'd also want to reply to - /// the guest when acking a job. - #[cfg(test)] - fn ack(&mut self, ds_id: JobId) { - /* - * Move AckReady to Acked. - */ - let Some(job) = self.ds_active.get_mut(&ds_id) else { - panic!("reqid {} is not active", ds_id); - }; - if !self.ackable_work.remove(&ds_id) { - panic!("Job {ds_id} is not ackable"); - } - - if job.acked { - panic!("Job {ds_id} already acked!"); - } - job.acked = true; - } - /// Returns all jobs in sorted order by [`JobId`] /// /// This function is used in unit tests for the Upstairs @@ -3413,11 +3425,6 @@ impl Downstairs { self.ds_active.get_extents_for(ds_id) } - #[cfg(test)] - pub fn ackable_work(&self) -> &BTreeSet { - &self.ackable_work - } - #[cfg(test)] pub fn completed(&self) -> &AllocRingBuffer { &self.retired_ids @@ -4291,14 +4298,15 @@ struct DownstairsBackpressureConfig { #[cfg(test)] pub(crate) mod test { - use super::Downstairs; + use super::{Downstairs, PendingJob}; use crate::{ downstairs::{LiveRepairData, LiveRepairState, ReconcileData}, live_repair::ExtentInfo, upstairs::UpstairsState, - ClientId, CrucibleError, DsState, ExtentFix, ExtentRepairIDs, IOState, - IOop, ImpactedAddr, ImpactedBlocks, JobId, RawReadResponse, - ReconcileIO, ReconcileIOState, ReconciliationId, SnapshotDetails, + BlockOpWaiter, ClientId, CrucibleError, DsState, ExtentFix, + ExtentRepairIDs, IOState, IOop, ImpactedAddr, ImpactedBlocks, JobId, + RawReadResponse, ReconcileIO, ReconcileIOState, ReconciliationId, + SnapshotDetails, }; use bytes::BytesMut; @@ -4338,10 +4346,6 @@ pub(crate) mod test { None, ); } - // Writes are fast-acked when first submitted - if !ds.ds_active.get(&ds_id).unwrap().work.is_write() { - ds.ack(ds_id); - } } fn set_all_reconcile(ds: &mut Downstairs) { @@ -4375,7 +4379,6 @@ pub(crate) mod test { &UpstairsState::Active, None, )); - assert!(ds.ackable_work.is_empty()); assert!(ds.retired_ids.is_empty()); assert!(ds.process_ds_completion( @@ -4385,12 +4388,9 @@ pub(crate) mod test { &UpstairsState::Active, None, )); - assert_eq!(ds.ackable_work.len(), 1); + assert!(ds.ds_active.get(&next_id).unwrap().acked); assert!(ds.retired_ids.is_empty()); - assert!(!ds.ds_active.get(&next_id).unwrap().acked); - ds.ack(next_id); - assert!(!ds.process_ds_completion( next_id, ClientId::new(2), @@ -4398,7 +4398,6 @@ pub(crate) mod test { &UpstairsState::Active, None, )); - assert!(ds.ackable_work.is_empty()); assert_eq!(ds.retired_ids.len(), 1); } @@ -4423,8 +4422,6 @@ pub(crate) mod test { &UpstairsState::Active, None, )); - - assert!(ds.ackable_work.is_empty()); assert!(ds.retired_ids.is_empty()); assert!(!ds.process_ds_completion( @@ -4434,8 +4431,6 @@ pub(crate) mod test { &UpstairsState::Active, None, )); - - assert!(ds.ackable_work.is_empty()); assert!(ds.retired_ids.is_empty()); assert!(ds.process_ds_completion( @@ -4446,11 +4441,6 @@ pub(crate) mod test { None, )); - assert_eq!(ds.ackable_work.len(), 1); - - ds.ack(next_id); - ds.retire_check(next_id); - assert_eq!(ds.retired_ids.len(), 1); } @@ -4468,7 +4458,6 @@ pub(crate) mod test { &UpstairsState::Active, None, )); - assert!(ds.ackable_work.is_empty()); assert!(ds.retired_ids.is_empty()); assert!(!ds.process_ds_completion( @@ -4478,7 +4467,6 @@ pub(crate) mod test { &UpstairsState::Active, None, )); - assert!(ds.ackable_work.is_empty()); assert!(ds.retired_ids.is_empty()); assert!(ds.process_ds_completion( @@ -4488,10 +4476,6 @@ pub(crate) mod test { &UpstairsState::Active, None, )); - assert_eq!(ds.ackable_work.len(), 1); - - ds.ack(next_id); - ds.retire_check(next_id); assert_eq!(ds.retired_ids.len(), 1); // No skipped jobs here. @@ -4514,7 +4498,6 @@ pub(crate) mod test { &UpstairsState::Active, None, )); - assert!(ds.ackable_work.is_empty()); assert!(ds.retired_ids.is_empty()); assert!(!ds.process_ds_completion( @@ -4524,7 +4507,6 @@ pub(crate) mod test { &UpstairsState::Active, None, )); - assert!(ds.ackable_work.is_empty()); assert!(ds.retired_ids.is_empty()); assert!(ds.process_ds_completion( @@ -4534,10 +4516,6 @@ pub(crate) mod test { &UpstairsState::Active, None, )); - assert_eq!(ds.ackable_work.len(), 1); - - ds.ack(next_id); - ds.retire_check(next_id); assert_eq!(ds.retired_ids.len(), 1); } @@ -4558,11 +4536,8 @@ pub(crate) mod test { &UpstairsState::Active, None, )); - assert_eq!(ds.ackable_work.len(), 1); assert!(ds.retired_ids.is_empty()); - ds.ack(next_id); - let response = Ok(build_read_response(&[])); assert!(!ds.process_ds_completion( @@ -4572,7 +4547,6 @@ pub(crate) mod test { &UpstairsState::Active, None, )); - assert!(ds.ackable_work.is_empty()); assert!(ds.retired_ids.is_empty()); let response = Ok(build_read_response(&[])); @@ -4584,7 +4558,7 @@ pub(crate) mod test { &UpstairsState::Active, None, )); - assert!(ds.ackable_work.is_empty()); + // A flush is required to move work to completed assert!(ds.retired_ids.is_empty()); } @@ -4603,7 +4577,6 @@ pub(crate) mod test { &UpstairsState::Active, None, )); - assert!(ds.ackable_work.is_empty()); assert!(ds.retired_ids.is_empty()); let response = Ok(build_read_response(&[])); @@ -4615,11 +4588,8 @@ pub(crate) mod test { &UpstairsState::Active, None, )); - assert_eq!(ds.ackable_work.len(), 1); assert!(ds.retired_ids.is_empty()); - ds.ack(next_id); - let response = Ok(build_read_response(&[])); assert!(!ds.process_ds_completion( @@ -4629,7 +4599,7 @@ pub(crate) mod test { &UpstairsState::Active, None, )); - assert!(ds.ackable_work.is_empty()); + // A flush is required to move work to completed // That this is still zero is part of the test assert!(ds.retired_ids.is_empty()); @@ -4649,7 +4619,6 @@ pub(crate) mod test { &UpstairsState::Active, None, )); - assert!(ds.ackable_work.is_empty()); assert!(ds.retired_ids.is_empty()); assert!(!ds.process_ds_completion( @@ -4659,7 +4628,6 @@ pub(crate) mod test { &UpstairsState::Active, None, )); - assert!(ds.ackable_work.is_empty()); assert!(ds.retired_ids.is_empty()); let response = Ok(build_read_response(&[])); @@ -4671,13 +4639,8 @@ pub(crate) mod test { &UpstairsState::Active, None, )); - assert_eq!(ds.ackable_work.len(), 1); - - ds.ack(next_id); - ds.retire_check(next_id); // A flush is required to move work to completed - assert!(ds.ackable_work.is_empty()); assert!(ds.retired_ids.is_empty()); } @@ -4695,7 +4658,6 @@ pub(crate) mod test { &UpstairsState::Active, None, )); - assert!(ds.ackable_work.is_empty()); assert!(ds.retired_ids.is_empty()); assert!(!ds.process_ds_completion( @@ -4705,7 +4667,6 @@ pub(crate) mod test { &UpstairsState::Active, None, )); - assert!(ds.ackable_work.is_empty()); assert!(ds.retired_ids.is_empty()); assert!(ds.process_ds_completion( @@ -4715,12 +4676,7 @@ pub(crate) mod test { &UpstairsState::Active, None, )); - assert_eq!(ds.ackable_work.len(), 1); - - ds.ack(next_id); - ds.retire_check(next_id); - assert!(ds.ackable_work.is_empty()); assert!(ds.retired_ids.is_empty()); } @@ -4734,6 +4690,7 @@ pub(crate) mod test { let response = || Ok(build_read_response(&[])); + // ack on first reply assert!(ds.process_ds_completion( next_id, ClientId::new(2), @@ -4750,10 +4707,6 @@ pub(crate) mod test { None, )); - // emulated run in up_ds_listen - ds.ack(next_id); - ds.retire_check(next_id); - assert!(!ds.process_ds_completion( next_id, ClientId::new(1), @@ -4762,7 +4715,6 @@ pub(crate) mod test { None, )); - assert!(ds.ackable_work.is_empty()); // Work won't be completed until we get a flush. assert!(ds.retired_ids.is_empty()); } @@ -4786,6 +4738,7 @@ pub(crate) mod test { ds.force_active(); // send a write, and clients 0 and 1 will return errors + // job is fast-acked, so it starts as acked let next_id = ds.create_and_enqueue_generic_write_eob(is_write_unwritten); @@ -4810,25 +4763,14 @@ pub(crate) mod test { assert!(ds.ds_active.get(&next_id).unwrap().data.is_none()); - // Jobs should be acked - assert!(ds.ackable_work.is_empty()); - assert!(ds.ds_active.get(&next_id).unwrap().acked); - let response = Ok(Default::default()); - let res = ds.process_ds_completion( + assert!(!ds.process_ds_completion( next_id, ClientId::new(2), response, &UpstairsState::Active, None, - ); - - // The IO should not have been marked as ackable, because it was - // fast-acked - assert!(!res); - - // Both write and write_unwritten should be fast-acked - assert!(ds.ackable_work.is_empty()); + )); assert!(ds.clients[ClientId::new(0)].stats.downstairs_errors > 0); assert!(ds.clients[ClientId::new(1)].stats.downstairs_errors > 0); @@ -4860,9 +4802,8 @@ pub(crate) mod test { // Simulate completing both writes to downstairs 0 and 1 // - // write_unwritten jobs become ackable upon the second completion; - // normal writes were ackable from the start (and hence - // process_ds_completion always returns `false`) + // all writes are acked immediately upon submission, so + // process_ds_completion always returns `false`. assert!(!ds.process_ds_completion( id1, ClientId::new(0), @@ -4897,7 +4838,6 @@ pub(crate) mod test { assert!(ds.ds_active.get(&id2).unwrap().acked); // Work stays on active queue till the flush - assert!(ds.ackable_work.is_empty()); assert!(ds.retired_ids.is_empty()); // Create the flush and send it to the downstairs @@ -4919,20 +4859,12 @@ pub(crate) mod test { None, )); - // Ack the flush back to the guest - ds.ack(flush_id); - // Make sure downstairs 0 and 1 update their last flush id and // that downstairs 2 does not. assert_eq!(ds.clients[ClientId::new(0)].last_flush, flush_id); assert_eq!(ds.clients[ClientId::new(1)].last_flush, flush_id); assert_eq!(ds.clients[ClientId::new(2)].last_flush, JobId(0)); - // Should not retire yet. - ds.retire_check(flush_id); - - assert!(ds.ackable_work.is_empty()); - // Make sure all work is still on the active side assert!(ds.retired_ids.is_empty()); @@ -5009,6 +4941,7 @@ pub(crate) mod test { None )); + // Confirm that we've stored this block let responses = ds.ds_active.get(&next_id).unwrap().data.as_ref(); assert!(responses.is_some()); assert_eq!( @@ -5084,7 +5017,7 @@ pub(crate) mod test { )); // One completion of a read means we can ACK - assert_eq!(ds.ackable_work.len(), 1); + assert!(ds.ds_active.get(&next_id).unwrap().acked); // Complete downstairs 1 and 2 let response = Ok(build_read_response(&[])); @@ -5105,19 +5038,13 @@ pub(crate) mod test { None )); - // Make sure the job is still active - assert!(ds.retired_ids.is_empty()); - - // Ack the job to the guest (this checks whether it's ack ready) - ds.ack(next_id); - - // Nothing left to ACK, but until the flush we keep the IO data. - assert!(ds.ackable_work.is_empty()); + // Make sure the job is not yet retired assert!(ds.retired_ids.is_empty()); // A flush is required to move work to completed // Create the flush then send it to all downstairs. - let next_id = ds.submit_flush(None, None, None); + let (mut rx, tx) = BlockOpWaiter::pair(); + let next_id = ds.submit_flush(None, Some(tx), None); // Complete the Flush at each downstairs. assert!(!ds.process_ds_completion( @@ -5143,12 +5070,9 @@ pub(crate) mod test { None )); - // ACK the flush and let retire_check move things along. - ds.ack(next_id); - ds.retire_check(next_id); + // Verify the flush has been acked + assert!(rx.try_wait().unwrap().is_ok()); - // Verify no more work to ack. - assert!(ds.ackable_work.is_empty()); // The read and the flush should now be moved to completed. assert_eq!(ds.retired_ids.len(), 2); } @@ -5161,12 +5085,12 @@ pub(crate) mod test { ds.force_active(); // Build our read, put it into the work queue - let next_id = ds.create_and_enqueue_generic_read_eob(); + let read_id = ds.create_and_enqueue_generic_read_eob(); // Downstairs 0 now has completed this work. let response = Ok(build_read_response(&[])); assert!(ds.process_ds_completion( - next_id, + read_id, ClientId::new(0), response, &UpstairsState::Active, @@ -5174,34 +5098,27 @@ pub(crate) mod test { )); // One completion of a read means we can ACK - assert_eq!(ds.ackable_work.len(), 1); - - // But, don't send the ack just yet. - // The job should be ack ready - assert!(ds.ackable_work.contains(&next_id)); - assert!(!ds.ds_active.get(&next_id).unwrap().acked); + assert!(ds.ds_active.get(&read_id).unwrap().acked); // A flush is required to move work to completed // Create the flush then send it to all downstairs. - let next_id = ds.submit_flush(None, None, None); + let (mut rx, tx) = BlockOpWaiter::pair(); + let flush_id = ds.submit_flush(None, Some(tx), None); // Send and complete the Flush at each downstairs. for cid in ClientId::iter() { ds.process_ds_completion( - next_id, + flush_id, cid, Ok(Default::default()), &UpstairsState::Active, None, ); } + assert!(rx.try_wait().unwrap().is_ok()); - // ACK the flush and let retire_check move things along. - ds.ack(next_id); - ds.retire_check(next_id); - - // Verify the read is still ack ready. - assert_eq!(ds.ackable_work.len(), 1); + // Verify the read is still acked + assert!(ds.ds_active.get(&read_id).unwrap().acked); // The the flush should now be moved to completed. assert_eq!(ds.retired_ids.len(), 1); // The read should still be on the queue. @@ -5256,13 +5173,13 @@ pub(crate) mod test { assert!(ds.ds_active.get(&next_id).unwrap().acked); // Work stays on active queue till the flush - assert!(ds.ackable_work.is_empty()); assert!(ds.retired_ids.is_empty()); // Create the flush IO - let next_id = ds.submit_flush(None, None, None); + let (mut rx, tx) = BlockOpWaiter::pair(); + let next_id = ds.submit_flush(None, Some(tx), None); - // Complete the flush on all three downstairs. + // Complete the flush on all three downstairs, acking on the 2nd assert!(!ds.process_ds_completion( next_id, ClientId::new(0), @@ -5285,10 +5202,7 @@ pub(crate) mod test { None )); - ds.ack(next_id); - ds.retire_check(next_id); - - assert!(ds.ackable_work.is_empty()); + assert!(rx.try_wait().unwrap().is_ok()); // The write and flush should now be completed. assert_eq!(ds.retired_ids.len(), 2); } @@ -5351,7 +5265,6 @@ pub(crate) mod test { assert!(ds.ds_active.get(&id2).unwrap().acked); // Work stays on active queue till the flush. - assert!(ds.ackable_work.is_empty()); assert!(ds.retired_ids.is_empty()); // Create and send the flush. @@ -5373,13 +5286,6 @@ pub(crate) mod test { None )); - // Ack the flush - ds.ack(flush_id); - - // Should not retire yet - ds.retire_check(flush_id); - - assert!(ds.ackable_work.is_empty()); // Not done yet, until all clients do the work. assert!(ds.retired_ids.is_empty()); @@ -5443,12 +5349,7 @@ pub(crate) mod test { )); // One completion should allow for an ACK - assert_eq!(ds.ackable_work.len(), 1); - assert!(ds.ackable_work.contains(&next_id)); - assert!(!ds.ds_active.get(&next_id).unwrap().acked); - - // The Downstairs will ack jobs immediately, during the event handler - ds.ack(next_id); + assert!(ds.ds_active.get(&next_id).unwrap().acked); // Be sure the job is not yet in replay assert!(!ds.ds_active.get(&next_id).unwrap().replay); @@ -5457,7 +5358,6 @@ pub(crate) mod test { assert!(ds.ds_active.get(&next_id).unwrap().replay); // The IO is still acked - assert!(!ds.ackable_work.contains(&next_id)); assert!(ds.ds_active.get(&next_id).unwrap().acked); } @@ -5480,8 +5380,7 @@ pub(crate) mod test { &UpstairsState::Active, None )); - // We always ack jobs right away - ds.ack(next_id); + assert!(ds.ds_active.get(&next_id).unwrap().acked); // Complete the read on a 2nd downstairs. let response = Ok(build_read_response(&[1, 2, 3, 4])); @@ -5514,7 +5413,6 @@ pub(crate) mod test { &UpstairsState::Active, None )); - assert!(ds.ackable_work.is_empty()); assert!(ds.ds_active.get(&next_id).unwrap().acked); } @@ -5537,14 +5435,11 @@ pub(crate) mod test { &UpstairsState::Active, None )); - // Immediately ack the job (which checks that it's ready) - ds.ack(next_id); + assert!(ds.ds_active.get(&next_id).unwrap().acked); - // Should not retire yet - ds.retire_check(next_id); + // The read has been acked + assert!(ds.ds_active.get(&next_id).unwrap().acked); - // No new ackable work. - assert!(ds.ackable_work.is_empty()); // Verify the IO has not completed yet. assert!(ds.retired_ids.is_empty()); @@ -5564,7 +5459,6 @@ pub(crate) mod test { &UpstairsState::Active, None )); - assert!(ds.ackable_work.is_empty()); assert!(ds.ds_active.get(&next_id).unwrap().acked); } @@ -5610,11 +5504,9 @@ pub(crate) mod test { &UpstairsState::Active, None )); + assert!(ds.ds_active.get(&next_id).unwrap().acked); - // Ack the read to the guest. - ds.ack(next_id); - - // Before re replay_jobs, the IO is not replay + // Before we replay_jobs, the IO is not replay assert!(!ds.ds_active.get(&next_id).unwrap().replay); // Now, take that downstairs offline ds.replay_jobs(ClientId::new(0)); @@ -5682,6 +5574,7 @@ pub(crate) mod test { &UpstairsState::Active, None )); + assert!(ds.ds_active.get(&next_id).unwrap().acked); // Construct our fake response for another downstairs. let response = Ok(build_read_response( @@ -5697,9 +5590,6 @@ pub(crate) mod test { None )); - // Ack the read to the guest. - ds.ack(next_id); - // Now, take the second downstairs offline ds.replay_jobs(ClientId::new(1)); // Now the IO should be replay @@ -5778,7 +5668,7 @@ pub(crate) mod test { // The job should remain acked assert!(ds.ds_active.get(&id1).unwrap().acked); - // Re-submit and complete the write + // Re-submit and complete the write, which should not ack again assert_eq!(ds.job_state(id1, 1), IOState::InProgress); assert!(!ds.process_ds_completion( id1, @@ -5787,10 +5677,6 @@ pub(crate) mod test { &UpstairsState::Active, None )); - - // State should remain acked and not ackable - assert!(ds.ds_active.get(&id1).unwrap().acked); - assert!(ds.ackable_work.is_empty()) } #[test] @@ -5831,9 +5717,6 @@ pub(crate) mod test { // Check that it should have been fast-acked assert!(ds.ds_active.get(&id1).unwrap().acked); - // Verify no more ackable work - assert!(ds.ackable_work.is_empty()); - // Now, take that downstairs offline ds.replay_jobs(ClientId::new(0)); @@ -6460,10 +6343,8 @@ pub(crate) mod test { None, ); - let ack_list = ds.ackable_work().clone(); - assert!(ack_list.is_empty()); let done = ds.ds_active.get(&next_id).unwrap(); - assert!(done.acked); + assert!(done.acked); // fast-acked assert!(done.result().is_ok()); } @@ -6508,7 +6389,6 @@ pub(crate) mod test { let done = ds.ds_active.get(&next_id).unwrap(); assert!(done.acked); - assert!(ds.ackable_work().is_empty()); assert!(done.result().is_err()); } @@ -6562,7 +6442,6 @@ pub(crate) mod test { let done = ds.ds_active.get(&next_id).unwrap(); assert!(done.acked); - assert!(ds.ackable_work().is_empty()); assert!(done.result().is_err()); } @@ -6577,37 +6456,27 @@ pub(crate) mod test { // Create a flush, enqueue it on both the downstairs // and the guest work queues. - let next_id = ds.submit_flush(None, None, None); + let (mut rx, tx) = BlockOpWaiter::pair(); + let next_id = ds.submit_flush(None, Some(tx), None); let response = || Ok(Default::default()); - ds.process_ds_completion( + assert!(!ds.process_ds_completion( next_id, ClientId::new(0), response(), &UpstairsState::Active, None, - ); + )); - ds.process_ds_completion( + assert!(ds.process_ds_completion( next_id, ClientId::new(2), response(), &UpstairsState::Active, None, - ); - - let ack_list = ds.ackable_work().clone(); - assert_eq!(ack_list.len(), 1); + )); // acked! - // Simulation of what happens in up_ds_listen - for ds_id_done in ack_list.iter() { - assert_eq!(*ds_id_done, next_id); - - ds.ack(*ds_id_done); - - let done = ds.ds_active.get(ds_id_done).unwrap(); - assert!(done.result().is_ok()); - } + assert!(rx.try_wait().unwrap().is_ok()); } #[test] @@ -6624,6 +6493,8 @@ pub(crate) mod test { // Create a flush, enqueue it on both the downstairs // and the guest work queues. let next_id = ds.submit_flush(None, None, None); + + // It should be acked when the first completion returns assert!(ds.process_ds_completion( next_id, ClientId::new(0), @@ -6631,19 +6502,6 @@ pub(crate) mod test { &UpstairsState::Active, None, )); - - let ack_list = ds.ackable_work().clone(); - assert_eq!(ack_list.len(), 1); - - // Simulation of what happens in up_ds_listen - for ds_id_done in ack_list.iter() { - assert_eq!(*ds_id_done, next_id); - - ds.ack(*ds_id_done); - - let done = ds.ds_active.get(ds_id_done).unwrap(); - assert!(done.result().is_err()); - } } #[test] @@ -6657,7 +6515,8 @@ pub(crate) mod test { // Create a flush, enqueue it on both the downstairs // and the guest work queues. - let next_id = ds.submit_flush(None, None, None); + let (mut rx, tx) = BlockOpWaiter::pair(); + let next_id = ds.submit_flush(None, Some(tx), None); // DS 1 has a failure, and this won't return true as we don't // have enough success yet to ACK to the guest. @@ -6679,18 +6538,7 @@ pub(crate) mod test { None )); - let ack_list = ds.ackable_work().clone(); - assert_eq!(ack_list.len(), 1); - - // Simulation of what happens in up_ds_listen - for ds_id_done in ack_list.iter() { - assert_eq!(*ds_id_done, next_id); - - ds.ack(*ds_id_done); - - let done = ds.ds_active.get(ds_id_done).unwrap(); - assert!(done.result().is_err()); - } + assert!(rx.try_wait().unwrap().is_err()); } #[test] @@ -6747,7 +6595,6 @@ pub(crate) mod test { assert_eq!(ds.clients[ClientId::new(2)].state(), DsState::Faulted); // Verify that this work should have been fast-acked - assert_eq!(ds.ackable_work().len(), 0); assert!(ds.ds_active.get(&next_id).unwrap().acked); } @@ -6788,7 +6635,7 @@ pub(crate) mod test { None )); - // the operation was previously marked as ackable, because it's a write + // the operation was already fast-acked, because it's a write assert!(!ds.process_ds_completion( next_id, ClientId::new(2), @@ -6821,7 +6668,7 @@ pub(crate) mod test { let response = || Ok(build_read_response(&[])); - // Process the operation for client 1 this should return true + // Process the operation for client 1; we send the ack here assert!(ds.process_ds_completion( next_id, ClientId::new(1), @@ -6830,7 +6677,7 @@ pub(crate) mod test { None, )); - // Process the operation for client 2 this should return false + // Process the operation for client 2, which does not ack assert!(!ds.process_ds_completion( next_id, ClientId::new(2), @@ -6839,10 +6686,6 @@ pub(crate) mod test { None )); - // Verify we can ack this work, then ack it. - assert_eq!(ds.ackable_work().len(), 1); - ds.ack(next_id); - // Perform the flush. let next_id = { let next_id = ds.submit_flush(None, None, None); @@ -6866,7 +6709,7 @@ pub(crate) mod test { None, )); - // process_ds_operation should return true after we process this. + // process_ds_operation should ack after we process this. assert!(ds.process_ds_completion( next_id, ClientId::new(2), @@ -6875,13 +6718,6 @@ pub(crate) mod test { None )); - // ACK the flush and let retire_check move things along. - assert_eq!(ds.ackable_work().len(), 1); - ds.ack(next_id); - ds.retire_check(next_id); - - assert_eq!(ds.ackable_work().len(), 0); - // The write, the read, and now the flush should be completed. assert_eq!(ds.completed().len(), 3); @@ -6965,7 +6801,7 @@ pub(crate) mod test { let response = Ok(build_read_response(&[])); - // Process the operation for client 2, which makes the job ackable + // Process the operation for client 2, which acks the job assert!(ds.process_ds_completion( next_id, ClientId::new(2), @@ -7009,8 +6845,8 @@ pub(crate) mod test { None )); - // process_ds_operation for client 2; the job was already ackable - // (because it's a write) so this returns false + // process_ds_operation for client 2; the job was already acked (because + // it's a write) so this returns false assert!(!ds.process_ds_completion( next_id, ClientId::new(2), @@ -7031,7 +6867,6 @@ pub(crate) mod test { // Verify this work should have been fast-acked assert!(ds.ds_active.get(&next_id).unwrap().acked); - assert_eq!(ds.ackable_work().len(), 0); // Now, do another write. let next_id = ds.create_and_enqueue_generic_write_eob(false); @@ -7051,7 +6886,7 @@ pub(crate) mod test { )); // We don't process client 1, it had failed - // again, the job was ackable immediately + // again, the job was acked immediately because it's a write assert!(!ds.process_ds_completion( next_id, ClientId::new(2), @@ -7061,7 +6896,6 @@ pub(crate) mod test { )); // Verify we should have fast-acked this work too assert!(ds.ds_active.get(&next_id).unwrap().acked); - assert_eq!(ds.ackable_work().len(), 0); // One downstairs should have a skipped job on its list. assert_eq!(ds.clients[ClientId::new(0)].skipped_jobs.len(), 0); @@ -7089,7 +6923,7 @@ pub(crate) mod test { None )); - // process_ds_operation should return true after we process client 2. + // process_ds_operation should ack after we process client 2. assert!(ds.process_ds_completion( flush_id, ClientId::new(2), @@ -7098,14 +6932,6 @@ pub(crate) mod test { None )); - // ACK all the jobs and let retire_check move things along. - assert_eq!(ds.ackable_work().len(), 1); - // first two writes should have been fast-acked - ds.ack(flush_id); - ds.retire_check(flush_id); - - assert_eq!(ds.ackable_work().len(), 0); - // The two writes and the flush should be completed. assert_eq!(ds.completed().len(), 3); @@ -7161,7 +6987,7 @@ pub(crate) mod test { None )); - // Process the operation for client 2. The job was already ackable, so + // Process the operation for client 2. The job was already acked, so // this returns `false` assert!(!ds.process_ds_completion( write_id, @@ -7176,9 +7002,6 @@ pub(crate) mod test { assert_eq!(ds.clients[ClientId::new(1)].state(), DsState::Faulted); assert_eq!(ds.clients[ClientId::new(2)].state(), DsState::Active); - // This job was immediately acked - assert_eq!(ds.ackable_work().len(), 0); - // Verify the read switched from InProgress to Skipped let job = ds.ds_active.get(&read_id).unwrap(); @@ -7245,7 +7068,7 @@ pub(crate) mod test { assert_eq!(ds.clients[ClientId::new(2)].state(), DsState::Active); // The write was fast-acked, and the read is still going - assert!(ds.ackable_work().is_empty()); + assert!(ds.ds_active.get(&write_id).unwrap().acked); // Verify the read switched from new to skipped @@ -7303,8 +7126,9 @@ pub(crate) mod test { ); } - // The write has been fast-acked; the read is ackable - assert_eq!(ds.ackable_work().len(), 1); + // The write has been fast-acked; the read has been acked + assert!(ds.ds_active.get(&write_one).unwrap().acked); + assert!(ds.ds_active.get(&read_one).unwrap().acked); // Verify all IOs are done @@ -7324,39 +7148,35 @@ pub(crate) mod test { let err_response = Err(CrucibleError::GenericError("bad".to_string())); - // Process the operation for client 0, 1 - ds.process_ds_completion( + // Process the operation for client 0, 1, 2. It should already be acked + assert!(!ds.process_ds_completion( write_fail, ClientId::new(0), Ok(Default::default()), &UpstairsState::Active, None, - ); - ds.process_ds_completion( + )); + assert!(!ds.process_ds_completion( write_fail, ClientId::new(1), Ok(Default::default()), &UpstairsState::Active, None, - ); - ds.process_ds_completion( + )); + assert!(!ds.process_ds_completion( write_fail, ClientId::new(2), err_response, &UpstairsState::Active, None, - ); + )); // Verify client states assert_eq!(ds.clients[ClientId::new(0)].state(), DsState::Active); assert_eq!(ds.clients[ClientId::new(1)].state(), DsState::Active); assert_eq!(ds.clients[ClientId::new(2)].state(), DsState::Faulted); - // Only the first read remains ackable - assert_eq!(ds.ackable_work().len(), 1); - // Verify all IOs are done - for cid in ClientId::iter() { let job = ds.ds_active.get(&read_one).unwrap(); assert_eq!(job.state[cid], IOState::Done); @@ -7413,8 +7233,9 @@ pub(crate) mod test { ); } - // Verify the read can be acked (the write was fast-acked) - assert_eq!(ds.ackable_work().len(), 1); + // Verify the read has been acked (the write was fast-acked) + assert!(ds.ds_active.get(&read_one).unwrap().acked); + assert!(ds.ds_active.get(&write_one).unwrap().acked); // Verify all IOs are done @@ -7436,39 +7257,35 @@ pub(crate) mod test { let read_two = ds.create_and_enqueue_generic_read_eob(); // Process the write operation for downstairs 0, 1 - ds.process_ds_completion( + assert!(!ds.process_ds_completion( write_fail, ClientId::new(0), Ok(Default::default()), &UpstairsState::Active, None, - ); - ds.process_ds_completion( + )); + assert!(!ds.process_ds_completion( write_fail, ClientId::new(1), Ok(Default::default()), &UpstairsState::Active, None, - ); + )); // Have downstairs 2 return error. - ds.process_ds_completion( + assert!(!ds.process_ds_completion( write_fail, ClientId::new(2), err_response, &UpstairsState::Active, None, - ); + )); // Verify client states assert_eq!(ds.clients[ClientId::new(0)].state(), DsState::Active); assert_eq!(ds.clients[ClientId::new(1)].state(), DsState::Active); assert_eq!(ds.clients[ClientId::new(2)].state(), DsState::Faulted); - // The first read remains the only ackable work - assert_eq!(ds.ackable_work().len(), 1); - // Verify all IOs are done - for cid in ClientId::iter() { // First read, still Done let job = ds.ds_active.get(&read_one).unwrap(); @@ -7537,87 +7354,75 @@ pub(crate) mod test { assert_eq!(ds.clients[ClientId::new(2)].skipped_jobs.len(), 0); // Do the write - ds.process_ds_completion( + assert!(!ds.process_ds_completion( write_one, ClientId::new(1), Ok(Default::default()), &UpstairsState::Active, None, - ); - ds.process_ds_completion( + )); + assert!(!ds.process_ds_completion( write_one, ClientId::new(2), Ok(Default::default()), &UpstairsState::Active, None, - ); + )); // Make the read ok response, do the read let rr = || Ok(build_read_response(&[])); - ds.process_ds_completion( + assert!(ds.process_ds_completion( read_one, ClientId::new(1), rr(), &UpstairsState::Active, None, - ); - ds.process_ds_completion( + )); + assert!(!ds.process_ds_completion( read_one, ClientId::new(2), rr(), &UpstairsState::Active, None, - ); + )); + + let job = ds.ds_active.get(&read_one).unwrap(); + assert_eq!(job.state[ClientId::new(1)], IOState::Done); + assert_eq!(job.state[ClientId::new(2)], IOState::Done); + let job = ds.ds_active.get(&write_one).unwrap(); + assert_eq!(job.state[ClientId::new(1)], IOState::Done); + assert_eq!(job.state[ClientId::new(2)], IOState::Done); // Do the flush - ds.process_ds_completion( + assert!(!ds.process_ds_completion( flush_one, ClientId::new(1), Ok(Default::default()), &UpstairsState::Active, None, - ); - ds.process_ds_completion( + )); + let job = ds.ds_active.get(&flush_one).unwrap(); + assert_eq!(job.state[ClientId::new(0)], IOState::Skipped); + assert_eq!(job.state[ClientId::new(1)], IOState::Done); + assert_eq!(job.state[ClientId::new(2)], IOState::InProgress); + assert!(ds.process_ds_completion( flush_one, ClientId::new(2), Ok(Default::default()), &UpstairsState::Active, None, - ); - - // Verify three jobs can be acked (or should have been fast-acked) - assert!(ds.ds_active.get(&write_one).unwrap().acked); - assert_eq!(ds.ackable_work().len(), 2); - - // Verify all IOs are done - - let job = ds.ds_active.get(&read_one).unwrap(); - assert_eq!(job.state[ClientId::new(1)], IOState::Done); - assert_eq!(job.state[ClientId::new(2)], IOState::Done); - let job = ds.ds_active.get(&write_one).unwrap(); - assert_eq!(job.state[ClientId::new(1)], IOState::Done); - assert_eq!(job.state[ClientId::new(2)], IOState::Done); - let job = ds.ds_active.get(&flush_one).unwrap(); - assert_eq!(job.state[ClientId::new(1)], IOState::Done); - assert_eq!(job.state[ClientId::new(2)], IOState::Done); - - ds.ack(read_one); - // write has already been fast-acked - ds.ack(flush_one); - ds.retire_check(flush_one); + )); + // The flush retires itself and preceding jobs // Skipped jobs just has the flush assert_eq!(ds.clients[ClientId::new(0)].skipped_jobs.len(), 1); assert!(ds.clients[ClientId::new(0)] .skipped_jobs .contains(&JobId(1002))); - assert_eq!(ds.ackable_work().len(), 0); // The writes, the read, and the flush should be completed. assert_eq!(ds.completed().len(), 3); - // No more ackable work - assert_eq!(ds.ackable_work().len(), 0); // No more jobs on the queue assert_eq!(ds.active_count(), 0); } @@ -7640,7 +7445,8 @@ pub(crate) mod test { let read_one = ds.create_and_enqueue_generic_read_eob(); // Finally, add a flush - let flush_one = ds.submit_flush(None, None, None); + let (mut rx, tx) = BlockOpWaiter::pair(); + let flush_one = ds.submit_flush(None, Some(tx), None); let job = ds.ds_active.get(&write_one).unwrap(); assert_eq!(job.state[ClientId::new(0)], IOState::Skipped); @@ -7663,51 +7469,42 @@ pub(crate) mod test { assert_eq!(ds.clients[ClientId::new(2)].skipped_jobs.len(), 3); // Do the write - ds.process_ds_completion( + assert!(!ds.process_ds_completion( write_one, ClientId::new(1), Ok(Default::default()), &UpstairsState::Active, None, - ); + )); + let job = ds.ds_active.get(&write_one).unwrap(); + assert!(job.acked); // fast-ack + assert_eq!(job.state[ClientId::new(1)], IOState::Done); // Make the read ok response, do the read let rr = Ok(build_read_response(&[])); - ds.process_ds_completion( + assert!(ds.process_ds_completion( read_one, ClientId::new(1), rr, &UpstairsState::Active, None, - ); + )); - // Do the flush - ds.process_ds_completion( + // Verify that the read IO is done + let job = ds.ds_active.get(&read_one).unwrap(); + assert!(job.acked); + assert_eq!(job.state[ClientId::new(1)], IOState::Done); + + // Do the flush, which retires other jobs + assert!(ds.process_ds_completion( flush_one, ClientId::new(1), Ok(Default::default()), &UpstairsState::Active, None, - ); - - // Verify the write should be fast-acked and the others are ackable - assert!(ds.ds_active.get(&write_one).unwrap().acked); - assert_eq!(ds.ackable_work().len(), 2); - - // Verify all IOs are done - - let job = ds.ds_active.get(&read_one).unwrap(); - assert_eq!(job.state[ClientId::new(1)], IOState::Done); - let job = ds.ds_active.get(&write_one).unwrap(); - assert_eq!(job.state[ClientId::new(1)], IOState::Done); - let job = ds.ds_active.get(&flush_one).unwrap(); - assert_eq!(job.state[ClientId::new(1)], IOState::Done); - - ds.ack(read_one); - // write should be fast-acked - ds.ack(flush_one); - ds.retire_check(flush_one); + )); + assert!(rx.try_wait().unwrap().is_err()); // Skipped jobs now just have the flush. assert_eq!(ds.clients[ClientId::new(0)].skipped_jobs.len(), 1); @@ -7718,12 +7515,9 @@ pub(crate) mod test { assert!(ds.clients[ClientId::new(2)] .skipped_jobs .contains(&JobId(1002))); - assert_eq!(ds.ackable_work().len(), 0); // The writes, the read, and the flush should be completed. assert_eq!(ds.completed().len(), 3); - // No more ackable work - assert_eq!(ds.ackable_work().len(), 0); // No more jobs on the queue assert_eq!(ds.active_count(), 0); } @@ -7754,21 +7548,12 @@ pub(crate) mod test { assert_eq!(ds.clients[ClientId::new(2)].skipped_jobs.len(), 1); // Verify jobs can be acked. - assert_eq!(ds.ackable_work().len(), 1); - - // Verify all IOs are done - // We are simulating what would happen here by the up_ds_listen - // task, after it receives a notification from the ds_done_tx. - - ds.ack(read_one); - - ds.retire_check(read_one); + assert!(ds.ds_active.get(&read_one).unwrap().acked); // Our skipped jobs have not yet been cleared. assert_eq!(ds.clients[ClientId::new(0)].skipped_jobs.len(), 1); assert_eq!(ds.clients[ClientId::new(1)].skipped_jobs.len(), 1); assert_eq!(ds.clients[ClientId::new(2)].skipped_jobs.len(), 1); - assert_eq!(ds.ackable_work().len(), 0); } #[test] @@ -7797,17 +7582,10 @@ pub(crate) mod test { assert_eq!(ds.clients[ClientId::new(1)].skipped_jobs.len(), 1); assert_eq!(ds.clients[ClientId::new(2)].skipped_jobs.len(), 1); - // Verify all IOs are done - // We are simulating what would happen here by the up_ds_listen - // task, after it receives a notification from the ds_done_tx. - - ds.retire_check(write_one); // No flush, no change in skipped jobs. assert_eq!(ds.clients[ClientId::new(0)].skipped_jobs.len(), 1); assert_eq!(ds.clients[ClientId::new(1)].skipped_jobs.len(), 1); assert_eq!(ds.clients[ClientId::new(2)].skipped_jobs.len(), 1); - - assert_eq!(ds.ackable_work().len(), 0); } #[test] @@ -7824,36 +7602,19 @@ pub(crate) mod test { } // Create a flush. - let flush_one = ds.submit_flush(None, None, None); - let job = ds.ds_active.get(&flush_one).unwrap(); - assert_eq!(job.state[ClientId::new(0)], IOState::Skipped); - assert_eq!(job.state[ClientId::new(1)], IOState::Skipped); - assert_eq!(job.state[ClientId::new(2)], IOState::Skipped); + let (mut rx, tx) = BlockOpWaiter::pair(); + let _ = ds.submit_flush(None, Some(tx), None); + // This job should be insta-skipped and then retired, so it's gone + assert!(rx.try_wait().unwrap().is_err()); for cid in ClientId::iter() { assert_eq!(ds.clients[cid].skipped_jobs.len(), 1); assert!(ds.clients[cid].skipped_jobs.contains(&JobId(1000))); } - // Verify jobs can be acked. - assert_eq!(ds.ackable_work().len(), 1); - - ds.ack(flush_one); - ds.retire_check(flush_one); - - assert_eq!(ds.ackable_work().len(), 0); - // The flush should remove all work from the ds queue. assert_eq!(ds.completed().len(), 1); - // No more ackable work - assert_eq!(ds.ackable_work().len(), 0); // No more jobs on the queue assert_eq!(ds.active_count(), 0); - - // Skipped jobs still has the flush. - for cid in ClientId::iter() { - assert_eq!(ds.clients[cid].skipped_jobs.len(), 1); - assert!(ds.clients[cid].skipped_jobs.contains(&JobId(1000))); - } } #[test] @@ -7873,40 +7634,30 @@ pub(crate) mod test { // Create a read. let read_one = ds.create_and_enqueue_generic_read_eob(); + // The read should be acked because all downstairs are offline + assert!(ds.ds_active.get(&read_one).unwrap().acked); + // Create a write. let write_one = ds.create_and_enqueue_generic_write_eob(false); - // Create a flush - let flush_one = ds.submit_flush(None, None, None); - // Verify all jobs can be acked (or should have been fast-acked) - let write_job = ds.ds_active.get(&write_one).unwrap(); - assert!(write_job.acked); - assert_eq!(ds.ackable_work().len(), 2); + // The write should be fast-acked + assert!(ds.ds_active.get(&write_one).unwrap().acked); // Skipped jobs are not yet cleared. for cid in ClientId::iter() { - assert!(ds.clients[cid].skipped_jobs.contains(&JobId(1000))); - assert!(ds.clients[cid].skipped_jobs.contains(&JobId(1001))); - assert!(ds.clients[cid].skipped_jobs.contains(&JobId(1002))); - assert_eq!(ds.clients[cid].skipped_jobs.len(), 3); + assert!(ds.clients[cid].skipped_jobs.contains(&read_one)); + assert!(ds.clients[cid].skipped_jobs.contains(&write_one)); + assert_eq!(ds.clients[cid].skipped_jobs.len(), 2); } - // Verify all IOs are done - // We are simulating what would happen here by the up_ds_listen - // task, after it receives a notification from the ds_done_tx. - ds.ack(read_one); - // write has already been fast-acked - ds.ack(flush_one); - - // Don't bother with retire check for read/write, just flush - ds.retire_check(flush_one); - - assert_eq!(ds.ackable_work().len(), 0); + // Create a flush + let (mut rx, tx) = BlockOpWaiter::pair(); + let _flush_one = ds.submit_flush(None, Some(tx), None); + // The flush instantly returns an error and retires old jobs + assert!(rx.try_wait().unwrap().is_err()); // The flush should remove all work from the ds queue. assert_eq!(ds.completed().len(), 3); - // No more ackable work - assert_eq!(ds.ackable_work().len(), 0); // No more jobs on the queue assert_eq!(ds.active_count(), 0); @@ -7936,41 +7687,39 @@ pub(crate) mod test { // Create a read, write, flush let read_one = ds.create_and_enqueue_generic_read_eob(); let write_one = ds.create_and_enqueue_generic_write_eob(false); - let flush_one = ds.submit_flush(None, None, None); - - // Create more IOs. - let _read_two = ds.create_and_enqueue_generic_read_eob(); - let _write_two = ds.create_and_enqueue_generic_write_eob(false); - let _flush_two = ds.submit_flush(None, None, None); - // Six jobs have been skipped. + // The read and write are skipped + acked for cid in ClientId::iter() { - assert!(ds.clients[cid].skipped_jobs.contains(&JobId(1000))); - assert!(ds.clients[cid].skipped_jobs.contains(&JobId(1001))); - assert!(ds.clients[cid].skipped_jobs.contains(&JobId(1002))); - assert!(ds.clients[cid].skipped_jobs.contains(&JobId(1003))); - assert!(ds.clients[cid].skipped_jobs.contains(&JobId(1004))); - assert!(ds.clients[cid].skipped_jobs.contains(&JobId(1005))); - assert_eq!(ds.clients[cid].skipped_jobs.len(), 6); + assert!(ds.clients[cid].skipped_jobs.contains(&read_one)); + assert!(ds.clients[cid].skipped_jobs.contains(&write_one)); + assert_eq!(ds.clients[cid].skipped_jobs.len(), 2); } - - // Ack the read and flush, and confirm that the write was fast-acked - ds.ack(read_one); + assert!(ds.ds_active.get(&read_one).unwrap().acked); assert!(ds.ds_active.get(&write_one).unwrap().acked); - ds.ack(flush_one); - assert_eq!(ds.ackable_work().len(), 2); - // Don't bother with retire check for read/write, just flush - ds.retire_check(flush_one); + let (mut rx, tx) = BlockOpWaiter::pair(); + let flush_one = ds.submit_flush(None, Some(tx), None); + assert!(rx.try_wait().unwrap().is_err()); + + let read_two = ds.create_and_enqueue_generic_read_eob(); + let write_two = ds.create_and_enqueue_generic_write_eob(false); - // The first two skipped jobs are now cleared and the non-acked - // jobs remain on the list, as well as the last flush. for cid in ClientId::iter() { - assert!(ds.clients[cid].skipped_jobs.contains(&JobId(1002))); - assert!(ds.clients[cid].skipped_jobs.contains(&JobId(1003))); - assert!(ds.clients[cid].skipped_jobs.contains(&JobId(1004))); - assert!(ds.clients[cid].skipped_jobs.contains(&JobId(1005))); - assert_eq!(ds.clients[cid].skipped_jobs.len(), 4); + assert!(ds.clients[cid].skipped_jobs.contains(&flush_one)); + assert!(ds.clients[cid].skipped_jobs.contains(&read_two)); + assert!(ds.clients[cid].skipped_jobs.contains(&write_two)); + assert_eq!(ds.clients[cid].skipped_jobs.len(), 3); + } + assert!(ds.ds_active.get(&read_two).unwrap().acked); + assert!(ds.ds_active.get(&write_two).unwrap().acked); + + let (mut rx, tx) = BlockOpWaiter::pair(); + let flush_two = ds.submit_flush(None, Some(tx), None); + assert!(rx.try_wait().unwrap().is_err()); + + for cid in ClientId::iter() { + assert!(ds.clients[cid].skipped_jobs.contains(&flush_two)); + assert_eq!(ds.clients[cid].skipped_jobs.len(), 1); } } @@ -8740,10 +8489,10 @@ pub(crate) mod test { repair_downstairs: vec![ClientId::new(1)], aborting_repair: false, state: LiveRepairState::Closing { - close_id: extent_repair_ids.close_id, - repair_id: extent_repair_ids.repair_id, - noop_id: extent_repair_ids.noop_id, - reopen_id: extent_repair_ids.reopen_id, + close_job: PendingJob::new(extent_repair_ids.close_id), + repair_job: PendingJob::new(extent_repair_ids.repair_id), + noop_job: PendingJob::new(extent_repair_ids.noop_id), + reopen_job: PendingJob::new(extent_repair_ids.reopen_id), }, }); } @@ -9653,10 +9402,10 @@ pub(crate) mod test { repair_downstairs: vec![ClientId::new(1)], aborting_repair: false, state: LiveRepairState::Closing { - close_id: JobId(1000), - repair_id: JobId(1001), - noop_id: JobId(1002), - reopen_id: JobId(1003), + close_job: PendingJob::new(JobId(1000)), + repair_job: PendingJob::new(JobId(1001)), + noop_job: PendingJob::new(JobId(1002)), + reopen_job: PendingJob::new(JobId(1003)), }, }); @@ -9719,10 +9468,10 @@ pub(crate) mod test { repair_downstairs: vec![ClientId::new(1)], aborting_repair: false, state: LiveRepairState::Closing { - close_id: JobId(1000), - repair_id: JobId(1001), - noop_id: JobId(1002), - reopen_id: JobId(1003), + close_job: PendingJob::new(JobId(1000)), + repair_job: PendingJob::new(JobId(1001)), + noop_job: PendingJob::new(JobId(1002)), + reopen_job: PendingJob::new(JobId(1003)), }, }); @@ -9781,10 +9530,10 @@ pub(crate) mod test { repair_downstairs: vec![ClientId::new(1)], aborting_repair: false, state: LiveRepairState::Closing { - close_id: JobId(1000), - repair_id: JobId(1001), - noop_id: JobId(1002), - reopen_id: JobId(1003), + close_job: PendingJob::new(JobId(1000)), + repair_job: PendingJob::new(JobId(1001)), + noop_job: PendingJob::new(JobId(1002)), + reopen_job: PendingJob::new(JobId(1003)), }, }); @@ -9810,10 +9559,10 @@ pub(crate) mod test { repair_downstairs: vec![ClientId::new(1)], aborting_repair: false, state: LiveRepairState::Closing { - close_id: JobId(1004), - repair_id: JobId(1005), - noop_id: JobId(1006), - reopen_id: JobId(1007), + close_job: PendingJob::new(JobId(1004)), + repair_job: PendingJob::new(JobId(1005)), + noop_job: PendingJob::new(JobId(1006)), + reopen_job: PendingJob::new(JobId(1007)), }, }); @@ -9874,10 +9623,10 @@ pub(crate) mod test { repair_downstairs: vec![ClientId::new(1)], aborting_repair: false, state: LiveRepairState::Closing { - close_id: JobId(next_id), - repair_id: JobId(next_id + 1), - noop_id: JobId(next_id + 2), - reopen_id: JobId(next_id + 3), + close_job: PendingJob::new(JobId(next_id)), + repair_job: PendingJob::new(JobId(next_id + 1)), + noop_job: PendingJob::new(JobId(next_id + 2)), + reopen_job: PendingJob::new(JobId(next_id + 3)), }, }); @@ -9896,10 +9645,10 @@ pub(crate) mod test { repair_downstairs: vec![ClientId::new(1)], aborting_repair: false, state: LiveRepairState::Closing { - close_id: JobId(next_id), - repair_id: JobId(next_id + 1), - noop_id: JobId(next_id + 2), - reopen_id: JobId(next_id + 3), + close_job: PendingJob::new(JobId(next_id)), + repair_job: PendingJob::new(JobId(next_id + 1)), + noop_job: PendingJob::new(JobId(next_id + 2)), + reopen_job: PendingJob::new(JobId(next_id + 3)), }, }); @@ -10042,10 +9791,10 @@ pub(crate) mod test { repair_downstairs: vec![ClientId::new(1)], aborting_repair: false, state: LiveRepairState::Closing { - close_id: JobId(next_id), - repair_id: JobId(next_id + 1), - noop_id: JobId(next_id + 2), - reopen_id: JobId(next_id + 3), + close_job: PendingJob::new(JobId(next_id)), + repair_job: PendingJob::new(JobId(next_id + 1)), + noop_job: PendingJob::new(JobId(next_id + 2)), + reopen_job: PendingJob::new(JobId(next_id + 3)), }, }); @@ -10298,10 +10047,10 @@ pub(crate) mod test { repair_downstairs: vec![ClientId::new(1)], aborting_repair: false, state: LiveRepairState::Closing { - close_id: JobId(1000), - repair_id: JobId(1001), - noop_id: JobId(1002), - reopen_id: JobId(1003), + close_job: PendingJob::new(JobId(1000)), + repair_job: PendingJob::new(JobId(1001)), + noop_job: PendingJob::new(JobId(1002)), + reopen_job: PendingJob::new(JobId(1003)), }, }); @@ -10393,10 +10142,10 @@ pub(crate) mod test { repair_downstairs: vec![ClientId::new(1)], aborting_repair: false, state: LiveRepairState::Closing { - close_id: JobId(1000), - repair_id: JobId(1001), - noop_id: JobId(1002), - reopen_id: JobId(1003), + close_job: PendingJob::new(JobId(1000)), + repair_job: PendingJob::new(JobId(1001)), + noop_job: PendingJob::new(JobId(1002)), + reopen_job: PendingJob::new(JobId(1003)), }, }); diff --git a/upstairs/src/upstairs.rs b/upstairs/src/upstairs.rs index 31a518b7e..58ce09638 100644 --- a/upstairs/src/upstairs.rs +++ b/upstairs/src/upstairs.rs @@ -170,7 +170,6 @@ impl UpCounters { /// /// For example, we _always_ do things like /// - Send all pending IO to the client work tasks -/// - Ack all ackable jobs to the guest /// - Step through the live-repair state machine (if it's running) /// - Check for client-side deactivation (if it's pending) /// - Set backpressure time in the clients @@ -611,16 +610,8 @@ impl Upstairs { } // Check to see whether live-repair can continue - // - // This must be called before acking jobs, because it looks in - // `Downstairs::ackable_jobs` to see which jobs are done. self.downstairs.check_and_continue_live_repair(&self.state); - // Handle any jobs that have become ready for acks - if self.downstairs.has_ackable_jobs() { - self.downstairs.ack_jobs() - } - // Check for client-side deactivation if matches!(&self.state, UpstairsState::Deactivating(..)) { info!(self.log, "checking for deactivation"); @@ -1655,7 +1646,7 @@ impl Upstairs { // IO operation replies // - // This may cause jobs to become ackable! + // This may cause jobs to be acked! Message::WriteAck { .. } | Message::WriteUnwrittenAck { .. } | Message::FlushAck { .. }