Skip to content

Commit

Permalink
Retry tasks during schedule phase of active listener
Browse files Browse the repository at this point in the history
If an item is in the schedule phase too long, it will now retry after a
fixed amount of time to ensure any down stream database didn't forget
about it. So far this is a known issue in redis, where the secondary
indexing might not index a task if the index was created while the task
was being inserted (bug in redis). This should be mostly benine for
cases that do not have these kinds of issues.
  • Loading branch information
allada committed Sep 13, 2024
1 parent ba5b315 commit 95e4fb8
Showing 1 changed file with 40 additions and 24 deletions.
64 changes: 40 additions & 24 deletions nativelink-scheduler/src/simple_scheduler_state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ use super::awaited_action_db::{
/// can fail before giving up.
const MAX_UPDATE_RETRIES: usize = 5;

/// Maximum number of times an action can timeout before
/// forcefully being retried.
/// This is a safety mechanism to prevent an action from
/// being stuck in a state where it is not updated due to
/// an error in the inner database.
const ABS_MAX_TIMEOUTS_BEFORE_RETRY: usize = 5;

/// Simple struct that implements the ActionStateResult trait and always returns an error.
struct ErrorActionStateResult(Error);

Expand Down Expand Up @@ -233,6 +240,7 @@ where

async fn changed(&mut self) -> Result<Arc<ActionState>, Error> {
let mut timeout_attempts = 0;
let mut timeouts_triggered = 0;
loop {
tokio::select! {
awaited_action_result = self.awaited_action_sub.changed() => {
Expand All @@ -242,6 +250,7 @@ where
}
_ = (self.now_fn)().sleep(self.no_event_action_timeout) => {
// Timeout happened, do additional checks below.
timeouts_triggered += 1;
}
}

Expand All @@ -251,7 +260,9 @@ where
.await
.err_tip(|| "In MatchingEngineActionStateResult::changed")?;

if matches!(awaited_action.state().stage, ActionStage::Queued) {
if matches!(awaited_action.state().stage, ActionStage::Queued)
&& timeouts_triggered < ABS_MAX_TIMEOUTS_BEFORE_RETRY
{
// Actions in queued state do not get periodically updated,
// so we don't need to timeout them.
continue;
Expand Down Expand Up @@ -388,28 +399,28 @@ where
.await
.err_tip(|| "In SimpleSchedulerStateManager::timeout_operation_id")?;

// If the action is not executing, we should not timeout the action.
if !matches!(awaited_action.state().stage, ActionStage::Executing) {
return Ok(());
}

let last_worker_updated = awaited_action
.last_worker_updated_timestamp()
.duration_since(SystemTime::UNIX_EPOCH)
.map_err(|e| {
make_err!(
Code::Internal,
"Failed to convert last_worker_updated to duration since epoch {e:?}"
)
})?;
let worker_should_update_before = last_worker_updated
.checked_add(self.no_event_action_timeout)
.err_tip(|| "Timestamp too big in SimpleSchedulerStateManager::timeout_operation_id")?;
if worker_should_update_before < (self.now_fn)().elapsed() {
// The action was updated recently, we should not timeout the action.
// This is to prevent timing out actions that have recently been updated
// (like multiple clients timeout the same action at the same time).
return Ok(());
// If the action is executing, we check if the worker has updated the action recently.
if matches!(awaited_action.state().stage, ActionStage::Executing) {
let last_worker_updated = awaited_action
.last_worker_updated_timestamp()
.duration_since(SystemTime::UNIX_EPOCH)
.map_err(|e| {
make_err!(
Code::Internal,
"Failed to convert last_worker_updated to duration since epoch {e:?}"
)
})?;
let worker_should_update_before = last_worker_updated
.checked_add(self.no_event_action_timeout)
.err_tip(|| {
"Timestamp too big in SimpleSchedulerStateManager::timeout_operation_id"
})?;
if worker_should_update_before < (self.now_fn)().elapsed() {
// The action was updated recently, we should not timeout the action.
// This is to prevent timing out actions that have recently been updated
// (like multiple clients timeout the same action at the same time).
return Ok(());
}
}

self.assign_operation(
Expand Down Expand Up @@ -497,9 +508,14 @@ where
}
UpdateOperationType::UpdateWithActionStage(stage) => stage.clone(),
UpdateOperationType::UpdateWithError(err) => {
// Don't count items being placed back in queue when they are already queued.
// This might happen as a safety mechanism to prevent an action from being
// stuck in a state where it has not been updated for a while and just making
// sure it is still in the queue by re-inserting it.
let is_queued = awaited_action.state().stage == ActionStage::Queued;
// Don't count a backpressure failure as an attempt for an action.
let due_to_backpressure = err.code == Code::ResourceExhausted;
if !due_to_backpressure {
if !due_to_backpressure && !is_queued {
awaited_action.attempts += 1;
}

Expand Down

0 comments on commit 95e4fb8

Please sign in to comment.