diff --git a/nativelink-scheduler/src/simple_scheduler_state_manager.rs b/nativelink-scheduler/src/simple_scheduler_state_manager.rs index a63c100cc..7320e5a15 100644 --- a/nativelink-scheduler/src/simple_scheduler_state_manager.rs +++ b/nativelink-scheduler/src/simple_scheduler_state_manager.rs @@ -41,6 +41,13 @@ use super::awaited_action_db::{ /// can fail before giving up. const MAX_UPDATE_RETRIES: usize = 5; +/// Maximum number of times an action can timeout before +/// forcefully being retried. +/// This is a safety mechanism to prevent an action from +/// being stuck in a state where it is not updated due to +/// an error in the inner database. +const ABS_MAX_TIMEOUTS_BEFORE_RETRY: usize = 5; + /// Simple struct that implements the ActionStateResult trait and always returns an error. struct ErrorActionStateResult(Error); @@ -233,6 +240,7 @@ where async fn changed(&mut self) -> Result, Error> { let mut timeout_attempts = 0; + let mut timeouts_triggered = 0; loop { tokio::select! { awaited_action_result = self.awaited_action_sub.changed() => { @@ -242,6 +250,7 @@ where } _ = (self.now_fn)().sleep(self.no_event_action_timeout) => { // Timeout happened, do additional checks below. + timeouts_triggered += 1; } } @@ -251,7 +260,9 @@ where .await .err_tip(|| "In MatchingEngineActionStateResult::changed")?; - if matches!(awaited_action.state().stage, ActionStage::Queued) { + if matches!(awaited_action.state().stage, ActionStage::Queued) + && timeouts_triggered < ABS_MAX_TIMEOUTS_BEFORE_RETRY + { // Actions in queued state do not get periodically updated, // so we don't need to timeout them. continue; @@ -388,28 +399,28 @@ where .await .err_tip(|| "In SimpleSchedulerStateManager::timeout_operation_id")?; - // If the action is not executing, we should not timeout the action. - if !matches!(awaited_action.state().stage, ActionStage::Executing) { - return Ok(()); - } - - let last_worker_updated = awaited_action - .last_worker_updated_timestamp() - .duration_since(SystemTime::UNIX_EPOCH) - .map_err(|e| { - make_err!( - Code::Internal, - "Failed to convert last_worker_updated to duration since epoch {e:?}" - ) - })?; - let worker_should_update_before = last_worker_updated - .checked_add(self.no_event_action_timeout) - .err_tip(|| "Timestamp too big in SimpleSchedulerStateManager::timeout_operation_id")?; - if worker_should_update_before < (self.now_fn)().elapsed() { - // The action was updated recently, we should not timeout the action. - // This is to prevent timing out actions that have recently been updated - // (like multiple clients timeout the same action at the same time). - return Ok(()); + // If the action is executing, we check if the worker has updated the action recently. + if matches!(awaited_action.state().stage, ActionStage::Executing) { + let last_worker_updated = awaited_action + .last_worker_updated_timestamp() + .duration_since(SystemTime::UNIX_EPOCH) + .map_err(|e| { + make_err!( + Code::Internal, + "Failed to convert last_worker_updated to duration since epoch {e:?}" + ) + })?; + let worker_should_update_before = last_worker_updated + .checked_add(self.no_event_action_timeout) + .err_tip(|| { + "Timestamp too big in SimpleSchedulerStateManager::timeout_operation_id" + })?; + if worker_should_update_before < (self.now_fn)().elapsed() { + // The action was updated recently, we should not timeout the action. + // This is to prevent timing out actions that have recently been updated + // (like multiple clients timeout the same action at the same time). + return Ok(()); + } } self.assign_operation( @@ -497,9 +508,14 @@ where } UpdateOperationType::UpdateWithActionStage(stage) => stage.clone(), UpdateOperationType::UpdateWithError(err) => { + // Don't count items being placed back in queue when they are already queued. + // This might happen as a safety mechanism to prevent an action from being + // stuck in a state where it has not been updated for a while and just making + // sure it is still in the queue by re-inserting it. + let is_queued = awaited_action.state().stage == ActionStage::Queued; // Don't count a backpressure failure as an attempt for an action. let due_to_backpressure = err.code == Code::ResourceExhausted; - if !due_to_backpressure { + if !due_to_backpressure && !is_queued { awaited_action.attempts += 1; }