Skip to content

Commit

Permalink
Simplify fault and restart code (1/3) (#1568)
Browse files Browse the repository at this point in the history
The conditions under which we restart a client IO task are somewhat
baroque:

- It may halt spontaneously (e.g. upon a socket error)
- We may **fault** it explicitly (e.g. upon IO error). In this case, we
  want to skip all work associated with this client.
- We may restart the IO task if initial negotiation fails
- ...and when replacing a Downstairs
- ...and when deactivated

This PR adds more specific types: `enum ClientNegotiationFailed` and
`enum ClientFaultReason`. Mid-negotiation restarts must provide a
`ClientNegotiationFailed`; faulting the IO task must provide a
`ClientFaultReason`. Both of these types are then converted into a
`ClientStopReason` for logging.

In addition, faulting a client is now done through
`Downstairs::fault_client` instead of calling both
`Downstairs::skip_all_jobs` and `DownstairsClient::fault`. Having a
single function makes this harder to mess up!
  • Loading branch information
mkeeter authored Jan 6, 2025
1 parent 3865ee1 commit 01ab087
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 120 deletions.
148 changes: 78 additions & 70 deletions upstairs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ impl DownstairsClient {
}

fn halt_io_task(&mut self, r: ClientStopReason) {
info!(self.log, "halting IO task due to {r:?}");
if let Some(t) = self.client_task.client_stop_tx.take() {
if let Err(_e) = t.send(r) {
warn!(self.log, "failed to send stop request")
Expand Down Expand Up @@ -847,17 +848,12 @@ impl DownstairsClient {
self.state
}

pub(crate) fn restart_connection(
pub(crate) fn abort_negotiation(
&mut self,
up_state: &UpstairsState,
reason: ClientStopReason,
reason: ClientNegotiationFailed,
) {
let new_state = match self.state {
DsState::Active | DsState::Offline
if matches!(reason, ClientStopReason::IneligibleForReplay) =>
{
DsState::Faulted
}
DsState::Active | DsState::Offline => DsState::Offline,
DsState::Faulted => DsState::Faulted,
DsState::Deactivated => DsState::New,
Expand All @@ -883,7 +879,7 @@ impl DownstairsClient {
);

self.checked_state_transition(up_state, new_state);
self.halt_io_task(reason);
self.halt_io_task(reason.into());
}

/// Sets the current state to `DsState::Active`
Expand Down Expand Up @@ -1185,26 +1181,8 @@ impl DownstairsClient {
}
}

/// Aborts an in-progress live repair, conditionally restarting the task
///
/// # Panics
/// If this client is not in `DsState::LiveRepair` and `restart_task` is
/// `true`, or vice versa.
pub(crate) fn abort_repair(
&mut self,
up_state: &UpstairsState,
restart_task: bool,
) {
if restart_task {
assert_eq!(self.state, DsState::LiveRepair);
self.checked_state_transition(up_state, DsState::Faulted);
self.halt_io_task(ClientStopReason::FailedLiveRepair);
} else {
// Someone else (i.e. receiving an error upon IO completion) already
// restarted the IO task and kicked us out of the live-repair state,
// but we'll do further cleanup here.
assert_ne!(self.state, DsState::LiveRepair);
}
/// Sets `repair_info` to `None` and increments `live_repair_aborted`
pub(crate) fn clear_repair_state(&mut self) {
self.repair_info = None;
self.stats.live_repair_aborted += 1;
}
Expand All @@ -1213,10 +1191,10 @@ impl DownstairsClient {
pub(crate) fn fault(
&mut self,
up_state: &UpstairsState,
reason: ClientStopReason,
reason: ClientFaultReason,
) {
self.checked_state_transition(up_state, DsState::Faulted);
self.halt_io_task(reason);
self.halt_io_task(reason.into());
}

/// Finishes an in-progress live repair, setting our state to `Active`
Expand Down Expand Up @@ -1522,9 +1500,9 @@ impl DownstairsClient {
} => {
if self.negotiation_state != NegotiationState::Start {
error!(self.log, "got version already");
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::BadNegotiationOrder,
ClientNegotiationFailed::BadNegotiationOrder,
);
return Ok(false);
}
Expand All @@ -1535,9 +1513,9 @@ impl DownstairsClient {
CRUCIBLE_MESSAGE_VERSION,
version
);
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::Incompatible,
ClientNegotiationFailed::Incompatible,
);
return Ok(false);
}
Expand Down Expand Up @@ -1593,9 +1571,9 @@ impl DownstairsClient {
"downstairs version is {version}, \
ours is {CRUCIBLE_MESSAGE_VERSION}"
);
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::Incompatible,
ClientNegotiationFailed::Incompatible,
);
}
Message::EncryptedMismatch { expected } => {
Expand All @@ -1604,9 +1582,9 @@ impl DownstairsClient {
"downstairs encrypted is {expected}, ours is {}",
self.cfg.encrypted()
);
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::Incompatible,
ClientNegotiationFailed::Incompatible,
);
}
Message::ReadOnlyMismatch { expected } => {
Expand All @@ -1615,9 +1593,9 @@ impl DownstairsClient {
"downstairs read_only is {expected}, ours is {}",
self.cfg.read_only,
);
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::Incompatible,
ClientNegotiationFailed::Incompatible,
);
}
Message::YouAreNowActive {
Expand All @@ -1631,9 +1609,9 @@ impl DownstairsClient {
"Received YouAreNowActive out of order! {:?}",
self.negotiation_state
);
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::BadNegotiationOrder,
ClientNegotiationFailed::BadNegotiationOrder,
);
return Ok(false);
}
Expand Down Expand Up @@ -1675,17 +1653,17 @@ impl DownstairsClient {
"Generation requested:{} found:{}",
gen, upstairs_gen,
);
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::Incompatible,
ClientNegotiationFailed::Incompatible,
);
return Err(CrucibleError::GenerationNumberTooLow(
gen_error,
));
} else {
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::Incompatible,
ClientNegotiationFailed::Incompatible,
);
return Err(CrucibleError::UuidMismatch);
}
Expand All @@ -1698,9 +1676,9 @@ impl DownstairsClient {
if self.negotiation_state != NegotiationState::WaitForRegionInfo
{
error!(self.log, "Received RegionInfo out of order!");
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::BadNegotiationOrder,
ClientNegotiationFailed::BadNegotiationOrder,
);
return Ok(false);
}
Expand All @@ -1715,9 +1693,9 @@ impl DownstairsClient {
// collection for each downstairs.
if region_def.get_encrypted() != self.cfg.encrypted() {
error!(self.log, "encryption expectation mismatch!");
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::Incompatible,
ClientNegotiationFailed::Incompatible,
);
return Ok(false);
}
Expand Down Expand Up @@ -1861,9 +1839,9 @@ impl DownstairsClient {
self.log,
"exiting negotiation because we're replacing"
);
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::Replacing,
ClientNegotiationFailed::Replacing,
);
}
bad_state => {
Expand All @@ -1880,9 +1858,9 @@ impl DownstairsClient {
Message::LastFlushAck { last_flush_number } => {
if self.negotiation_state != NegotiationState::GetLastFlush {
error!(self.log, "Received LastFlushAck out of order!");
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::BadNegotiationOrder,
ClientNegotiationFailed::BadNegotiationOrder,
);
return Ok(false); // TODO should we trigger set_inactive?
}
Expand All @@ -1893,9 +1871,9 @@ impl DownstairsClient {
"exiting negotiation due to LastFlushAck \
while replacing"
);
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::Replacing,
ClientNegotiationFailed::Replacing,
);
return Ok(false); // TODO should we trigger set_inactive?
}
Expand All @@ -1922,9 +1900,9 @@ impl DownstairsClient {
if self.negotiation_state != NegotiationState::GetExtentVersions
{
error!(self.log, "Received ExtentVersions out of order!");
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::BadNegotiationOrder,
ClientNegotiationFailed::BadNegotiationOrder,
);
return Ok(false); // TODO should we trigger set_inactive?
}
Expand All @@ -1941,9 +1919,9 @@ impl DownstairsClient {
"exiting negotiation due to ExtentVersions while \
replacing"
);
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::Replacing,
ClientNegotiationFailed::Replacing,
);
return Ok(false); // TODO should we trigger set_inactive?
}
Expand Down Expand Up @@ -2209,18 +2187,46 @@ pub(crate) enum ClientStopReason {
/// (for example, we have received `Message::YouAreNoLongerActive`)
Disabled,

/// The upstairs has requested that we deactivate
Deactivated,

/// Something went wrong during negotiation
#[allow(unused)] // logged in debug messages
NegotiationFailed(ClientNegotiationFailed),

/// We have explicitly faulted the client
#[allow(unused)] // logged in debug messages
Fault(ClientFaultReason),
}

/// Subset of [`ClientStopReason`] for faulting a client
#[derive(Debug)]
pub(crate) enum ClientNegotiationFailed {
/// Reconcile failed and we're restarting
FailedReconcile,

/// Received an error from some IO
IOError,

/// Negotiation message received out of order
BadNegotiationOrder,

/// Negotiation says that we are incompatible
Incompatible,

/// We are trying to replace the client task
Replacing,
}

impl From<ClientNegotiationFailed> for ClientStopReason {
fn from(f: ClientNegotiationFailed) -> ClientStopReason {
ClientStopReason::NegotiationFailed(f)
}
}

/// Subset of [`ClientStopReason`] for faulting a client
#[derive(Debug)]
pub(crate) enum ClientFaultReason {
/// Received an error from some non-recoverable IO (write or flush)
IOError,

/// Live-repair failed
FailedLiveRepair,

Expand All @@ -2230,18 +2236,20 @@ pub(crate) enum ClientStopReason {
/// Too many bytes in the queue
TooManyOutstandingBytes,

/// The upstairs has requested that we deactivate
Deactivated,

/// The test suite has requested a fault
#[cfg(test)]
RequestedFault,

/// The upstairs has requested that we deactivate when we were offline
OfflineDeactivated,

/// The Upstairs has dropped jobs that would be needed for replay
IneligibleForReplay,

#[cfg(test)]
RequestedFault,
}

impl From<ClientFaultReason> for ClientStopReason {
fn from(f: ClientFaultReason) -> ClientStopReason {
ClientStopReason::Fault(f)
}
}

/// Response received from the I/O task
Expand Down
Loading

0 comments on commit 01ab087

Please sign in to comment.