From 01ab087d6e638bee28e3cf79c7272740774a4d33 Mon Sep 17 00:00:00 2001 From: Matt Keeter Date: Mon, 6 Jan 2025 14:16:09 -0500 Subject: [PATCH] Simplify fault and restart code (1/3) (#1568) The conditions under which we restart a client IO task are somewhat baroque: - It may halt spontaneously (e.g. upon a socket error) - We may **fault** it explicitly (e.g. upon IO error). In this case, we want to skip all work associated with this client. - We may restart the IO task if initial negotiation fails - ...and when replacing a Downstairs - ...and when deactivated This PR adds more specific types: `enum ClientNegotiationFailed` and `enum ClientFaultReason`. Mid-negotiation restarts must provide a `ClientNegotiationFailed`; faulting the IO task must provide a `ClientFaultReason`. Both of these types are then converted into a `ClientStopReason` for logging. In addition, faulting a client is now done through `Downstairs::fault_client` instead of calling both `Downstairs::skip_all_jobs` and `DownstairsClient::fault`. Having a single function makes this harder to mess up! --- upstairs/src/client.rs | 148 +++++++++++++++++++------------------ upstairs/src/downstairs.rs | 104 ++++++++++++++------------ upstairs/src/upstairs.rs | 6 +- 3 files changed, 138 insertions(+), 120 deletions(-) diff --git a/upstairs/src/client.rs b/upstairs/src/client.rs index 074bbfde0..a890248e6 100644 --- a/upstairs/src/client.rs +++ b/upstairs/src/client.rs @@ -318,6 +318,7 @@ impl DownstairsClient { } fn halt_io_task(&mut self, r: ClientStopReason) { + info!(self.log, "halting IO task due to {r:?}"); if let Some(t) = self.client_task.client_stop_tx.take() { if let Err(_e) = t.send(r) { warn!(self.log, "failed to send stop request") @@ -847,17 +848,12 @@ impl DownstairsClient { self.state } - pub(crate) fn restart_connection( + pub(crate) fn abort_negotiation( &mut self, up_state: &UpstairsState, - reason: ClientStopReason, + reason: ClientNegotiationFailed, ) { let new_state = match self.state { - DsState::Active | DsState::Offline - if matches!(reason, ClientStopReason::IneligibleForReplay) => - { - DsState::Faulted - } DsState::Active | DsState::Offline => DsState::Offline, DsState::Faulted => DsState::Faulted, DsState::Deactivated => DsState::New, @@ -883,7 +879,7 @@ impl DownstairsClient { ); self.checked_state_transition(up_state, new_state); - self.halt_io_task(reason); + self.halt_io_task(reason.into()); } /// Sets the current state to `DsState::Active` @@ -1185,26 +1181,8 @@ impl DownstairsClient { } } - /// Aborts an in-progress live repair, conditionally restarting the task - /// - /// # Panics - /// If this client is not in `DsState::LiveRepair` and `restart_task` is - /// `true`, or vice versa. - pub(crate) fn abort_repair( - &mut self, - up_state: &UpstairsState, - restart_task: bool, - ) { - if restart_task { - assert_eq!(self.state, DsState::LiveRepair); - self.checked_state_transition(up_state, DsState::Faulted); - self.halt_io_task(ClientStopReason::FailedLiveRepair); - } else { - // Someone else (i.e. receiving an error upon IO completion) already - // restarted the IO task and kicked us out of the live-repair state, - // but we'll do further cleanup here. - assert_ne!(self.state, DsState::LiveRepair); - } + /// Sets `repair_info` to `None` and increments `live_repair_aborted` + pub(crate) fn clear_repair_state(&mut self) { self.repair_info = None; self.stats.live_repair_aborted += 1; } @@ -1213,10 +1191,10 @@ impl DownstairsClient { pub(crate) fn fault( &mut self, up_state: &UpstairsState, - reason: ClientStopReason, + reason: ClientFaultReason, ) { self.checked_state_transition(up_state, DsState::Faulted); - self.halt_io_task(reason); + self.halt_io_task(reason.into()); } /// Finishes an in-progress live repair, setting our state to `Active` @@ -1522,9 +1500,9 @@ impl DownstairsClient { } => { if self.negotiation_state != NegotiationState::Start { error!(self.log, "got version already"); - self.restart_connection( + self.abort_negotiation( up_state, - ClientStopReason::BadNegotiationOrder, + ClientNegotiationFailed::BadNegotiationOrder, ); return Ok(false); } @@ -1535,9 +1513,9 @@ impl DownstairsClient { CRUCIBLE_MESSAGE_VERSION, version ); - self.restart_connection( + self.abort_negotiation( up_state, - ClientStopReason::Incompatible, + ClientNegotiationFailed::Incompatible, ); return Ok(false); } @@ -1593,9 +1571,9 @@ impl DownstairsClient { "downstairs version is {version}, \ ours is {CRUCIBLE_MESSAGE_VERSION}" ); - self.restart_connection( + self.abort_negotiation( up_state, - ClientStopReason::Incompatible, + ClientNegotiationFailed::Incompatible, ); } Message::EncryptedMismatch { expected } => { @@ -1604,9 +1582,9 @@ impl DownstairsClient { "downstairs encrypted is {expected}, ours is {}", self.cfg.encrypted() ); - self.restart_connection( + self.abort_negotiation( up_state, - ClientStopReason::Incompatible, + ClientNegotiationFailed::Incompatible, ); } Message::ReadOnlyMismatch { expected } => { @@ -1615,9 +1593,9 @@ impl DownstairsClient { "downstairs read_only is {expected}, ours is {}", self.cfg.read_only, ); - self.restart_connection( + self.abort_negotiation( up_state, - ClientStopReason::Incompatible, + ClientNegotiationFailed::Incompatible, ); } Message::YouAreNowActive { @@ -1631,9 +1609,9 @@ impl DownstairsClient { "Received YouAreNowActive out of order! {:?}", self.negotiation_state ); - self.restart_connection( + self.abort_negotiation( up_state, - ClientStopReason::BadNegotiationOrder, + ClientNegotiationFailed::BadNegotiationOrder, ); return Ok(false); } @@ -1675,17 +1653,17 @@ impl DownstairsClient { "Generation requested:{} found:{}", gen, upstairs_gen, ); - self.restart_connection( + self.abort_negotiation( up_state, - ClientStopReason::Incompatible, + ClientNegotiationFailed::Incompatible, ); return Err(CrucibleError::GenerationNumberTooLow( gen_error, )); } else { - self.restart_connection( + self.abort_negotiation( up_state, - ClientStopReason::Incompatible, + ClientNegotiationFailed::Incompatible, ); return Err(CrucibleError::UuidMismatch); } @@ -1698,9 +1676,9 @@ impl DownstairsClient { if self.negotiation_state != NegotiationState::WaitForRegionInfo { error!(self.log, "Received RegionInfo out of order!"); - self.restart_connection( + self.abort_negotiation( up_state, - ClientStopReason::BadNegotiationOrder, + ClientNegotiationFailed::BadNegotiationOrder, ); return Ok(false); } @@ -1715,9 +1693,9 @@ impl DownstairsClient { // collection for each downstairs. if region_def.get_encrypted() != self.cfg.encrypted() { error!(self.log, "encryption expectation mismatch!"); - self.restart_connection( + self.abort_negotiation( up_state, - ClientStopReason::Incompatible, + ClientNegotiationFailed::Incompatible, ); return Ok(false); } @@ -1861,9 +1839,9 @@ impl DownstairsClient { self.log, "exiting negotiation because we're replacing" ); - self.restart_connection( + self.abort_negotiation( up_state, - ClientStopReason::Replacing, + ClientNegotiationFailed::Replacing, ); } bad_state => { @@ -1880,9 +1858,9 @@ impl DownstairsClient { Message::LastFlushAck { last_flush_number } => { if self.negotiation_state != NegotiationState::GetLastFlush { error!(self.log, "Received LastFlushAck out of order!"); - self.restart_connection( + self.abort_negotiation( up_state, - ClientStopReason::BadNegotiationOrder, + ClientNegotiationFailed::BadNegotiationOrder, ); return Ok(false); // TODO should we trigger set_inactive? } @@ -1893,9 +1871,9 @@ impl DownstairsClient { "exiting negotiation due to LastFlushAck \ while replacing" ); - self.restart_connection( + self.abort_negotiation( up_state, - ClientStopReason::Replacing, + ClientNegotiationFailed::Replacing, ); return Ok(false); // TODO should we trigger set_inactive? } @@ -1922,9 +1900,9 @@ impl DownstairsClient { if self.negotiation_state != NegotiationState::GetExtentVersions { error!(self.log, "Received ExtentVersions out of order!"); - self.restart_connection( + self.abort_negotiation( up_state, - ClientStopReason::BadNegotiationOrder, + ClientNegotiationFailed::BadNegotiationOrder, ); return Ok(false); // TODO should we trigger set_inactive? } @@ -1941,9 +1919,9 @@ impl DownstairsClient { "exiting negotiation due to ExtentVersions while \ replacing" ); - self.restart_connection( + self.abort_negotiation( up_state, - ClientStopReason::Replacing, + ClientNegotiationFailed::Replacing, ); return Ok(false); // TODO should we trigger set_inactive? } @@ -2209,18 +2187,46 @@ pub(crate) enum ClientStopReason { /// (for example, we have received `Message::YouAreNoLongerActive`) Disabled, + /// The upstairs has requested that we deactivate + Deactivated, + + /// Something went wrong during negotiation + #[allow(unused)] // logged in debug messages + NegotiationFailed(ClientNegotiationFailed), + + /// We have explicitly faulted the client + #[allow(unused)] // logged in debug messages + Fault(ClientFaultReason), +} + +/// Subset of [`ClientStopReason`] for faulting a client +#[derive(Debug)] +pub(crate) enum ClientNegotiationFailed { /// Reconcile failed and we're restarting FailedReconcile, - /// Received an error from some IO - IOError, - /// Negotiation message received out of order BadNegotiationOrder, /// Negotiation says that we are incompatible Incompatible, + /// We are trying to replace the client task + Replacing, +} + +impl From for ClientStopReason { + fn from(f: ClientNegotiationFailed) -> ClientStopReason { + ClientStopReason::NegotiationFailed(f) + } +} + +/// Subset of [`ClientStopReason`] for faulting a client +#[derive(Debug)] +pub(crate) enum ClientFaultReason { + /// Received an error from some non-recoverable IO (write or flush) + IOError, + /// Live-repair failed FailedLiveRepair, @@ -2230,18 +2236,20 @@ pub(crate) enum ClientStopReason { /// Too many bytes in the queue TooManyOutstandingBytes, - /// The upstairs has requested that we deactivate - Deactivated, - - /// The test suite has requested a fault - #[cfg(test)] - RequestedFault, - /// The upstairs has requested that we deactivate when we were offline OfflineDeactivated, /// The Upstairs has dropped jobs that would be needed for replay IneligibleForReplay, + + #[cfg(test)] + RequestedFault, +} + +impl From for ClientStopReason { + fn from(f: ClientFaultReason) -> ClientStopReason { + ClientStopReason::Fault(f) + } } /// Response received from the I/O task diff --git a/upstairs/src/downstairs.rs b/upstairs/src/downstairs.rs index e03ebadab..d16c9b637 100644 --- a/upstairs/src/downstairs.rs +++ b/upstairs/src/downstairs.rs @@ -8,7 +8,10 @@ use std::{ use crate::{ cdt, - client::{ClientAction, ClientStopReason, DownstairsClient, EnqueueResult}, + client::{ + ClientAction, ClientFaultReason, ClientNegotiationFailed, + DownstairsClient, EnqueueResult, + }, guest::GuestBlockRes, io_limits::{IOLimitGuard, IOLimits}, live_repair::ExtentInfo, @@ -728,9 +731,11 @@ impl Downstairs { // requested deactivation to finish. if self.clients[client_id].state() == DsState::Offline { info!(self.log, "[{}] Offline client moved to Faulted", client_id); - self.skip_all_jobs(client_id); - self.clients[client_id] - .fault(up_state, ClientStopReason::OfflineDeactivated); + self.fault_client( + client_id, + up_state, + ClientFaultReason::OfflineDeactivated, + ); return false; } // If there are jobs in the queue, then we have to check them! @@ -1911,9 +1916,9 @@ impl Downstairs { if c.state() == DsState::Reconcile { // Restart the IO task. This will cause the Upstairs to // deactivate through a ClientAction::TaskStopped. - c.restart_connection( + c.abort_negotiation( up_state, - ClientStopReason::FailedReconcile, + ClientNegotiationFailed::FailedReconcile, ); error!(self.log, "Mark {} as FAILED REPAIR", i); } @@ -2626,30 +2631,40 @@ impl Downstairs { self.log, "downstairs failed, too many outstanding jobs {work_count}" ); - Some(ClientStopReason::TooManyOutstandingJobs) + Some(ClientFaultReason::TooManyOutstandingJobs) } else if byte_count as u64 > crate::IO_OUTSTANDING_MAX_BYTES { warn!( self.log, "downstairs failed, too many outstanding bytes {byte_count}" ); - Some(ClientStopReason::TooManyOutstandingBytes) + Some(ClientFaultReason::TooManyOutstandingBytes) } else if !self.can_replay { // XXX can this actually happen? warn!( self.log, "downstairs became ineligible for replay while offline" ); - Some(ClientStopReason::IneligibleForReplay) + Some(ClientFaultReason::IneligibleForReplay) } else { None }; if let Some(err) = failed { - self.skip_all_jobs(client_id); - self.clients[client_id].fault(up_state, err); + self.fault_client(client_id, up_state, err); } } + /// Marks a client as faulted, skipping pending IO and stopping the worker + pub(crate) fn fault_client( + &mut self, + client_id: ClientId, + up_state: &UpstairsState, + err: ClientFaultReason, + ) { + self.skip_all_jobs(client_id); + self.clients[client_id].fault(up_state, err); + } + /// Move all `New` and `InProgress` jobs for the given client to `Skipped` /// /// This may lead to jobs being acked, since a skipped job counts as @@ -2705,30 +2720,23 @@ impl Downstairs { for i in ClientId::iter() { match self.clients[i].state() { DsState::LiveRepair => { - self.skip_all_jobs(i); - self.clients[i].abort_repair(up_state, true); - } - DsState::Faulted => { - // Jobs were already skipped when we hit the IO error that - // marked us as faulted - self.clients[i].abort_repair(up_state, false); + self.fault_client( + i, + up_state, + ClientFaultReason::FailedLiveRepair, + ); } DsState::LiveRepairReady => { // TODO I don't think this is necessary self.skip_all_jobs(i); - - // Set repair_info to None, so that the next - // ExtentFlushClose sees it empty (as expected). repair_info - // is set on all clients, even those not directly - // participating in live-repair, so we have to always clear - // it; in the cases above, it's cleared in `abort_repair`. - self.clients[i].repair_info = None; - } - _ => { - // (see comment above) - self.clients[i].repair_info = None; } + _ => {} } + // Set repair_info to None, so that the next ExtentFlushClose sees + // it empty (as expected). repair_info is set on all clients, even + // those not directly participating in live-repair, so we have to + // always clear it. + self.clients[i].clear_repair_state(); } if let Some(repair) = &mut self.repair { @@ -3383,15 +3391,11 @@ impl Downstairs { | IOop::ExtentLiveNoOp { .. } | IOop::ExtentLiveReopen { .. } ) { - // This error means the downstairs will go to Faulted. - // Walk the active job list and mark any that were - // new or in progress to skipped. - self.skip_all_jobs(client_id); - self.clients[client_id] - .checked_state_transition(up_state, DsState::Faulted); - self.clients[client_id].restart_connection( + // Send this Downstairs to faulted + self.fault_client( + client_id, up_state, - ClientStopReason::IOError, + ClientFaultReason::IOError, ); } } @@ -4337,7 +4341,7 @@ struct DownstairsBackpressureConfig { #[cfg(test)] pub(crate) mod test { - use super::{Downstairs, PendingJob}; + use super::{ClientFaultReason, Downstairs, PendingJob}; use crate::{ downstairs::{LiveRepairData, LiveRepairState, ReconcileData}, live_repair::ExtentInfo, @@ -9619,9 +9623,11 @@ pub(crate) mod test { // Fault the downstairs let to_repair = ClientId::new(1); - ds.skip_all_jobs(to_repair); - ds.clients[to_repair] - .checked_state_transition(&UpstairsState::Active, DsState::Faulted); + ds.fault_client( + to_repair, + &UpstairsState::Active, + ClientFaultReason::RequestedFault, + ); ds.clients[to_repair].checked_state_transition( &UpstairsState::Active, DsState::LiveRepairReady, @@ -9786,9 +9792,11 @@ pub(crate) mod test { // Fault the downstairs let to_repair = ClientId::new(1); - ds.skip_all_jobs(to_repair); - ds.clients[to_repair] - .checked_state_transition(&UpstairsState::Active, DsState::Faulted); + ds.fault_client( + to_repair, + &UpstairsState::Active, + ClientFaultReason::RequestedFault, + ); ds.clients[to_repair].checked_state_transition( &UpstairsState::Active, DsState::LiveRepairReady, @@ -9940,9 +9948,11 @@ pub(crate) mod test { // Fault the downstairs let to_repair = ClientId::new(1); - ds.skip_all_jobs(to_repair); - ds.clients[to_repair] - .checked_state_transition(&UpstairsState::Active, DsState::Faulted); + ds.fault_client( + to_repair, + &UpstairsState::Active, + ClientFaultReason::RequestedFault, + ); ds.clients[to_repair].checked_state_transition( &UpstairsState::Active, DsState::LiveRepairReady, diff --git a/upstairs/src/upstairs.rs b/upstairs/src/upstairs.rs index b47f09f2e..7d845bff1 100644 --- a/upstairs/src/upstairs.rs +++ b/upstairs/src/upstairs.rs @@ -1159,10 +1159,10 @@ impl Upstairs { #[cfg(test)] BlockOp::FaultDownstairs { client_id, done } => { - self.downstairs.skip_all_jobs(client_id); - self.downstairs.clients[client_id].fault( + self.downstairs.fault_client( + client_id, &self.state, - crate::client::ClientStopReason::RequestedFault, + crate::client::ClientFaultReason::RequestedFault, ); done.send_ok(()); }