Skip to content

Commit

Permalink
In redis scheduler removes items that are queued for too long
Browse files Browse the repository at this point in the history
In the event a client is no longer requesting to execute a task, the
scheduler will now move items out of the queued state after 60 seconds
(configurable). This will ensure we don't end up with items in the queue
forever.
  • Loading branch information
allada committed Oct 22, 2024
1 parent 2238ef9 commit 8b12016
Show file tree
Hide file tree
Showing 7 changed files with 335 additions and 148 deletions.
6 changes: 6 additions & 0 deletions nativelink-config/src/schedulers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ pub struct SimpleScheduler {
#[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
pub retain_completed_for_s: u32,

/// Mark operations as completed with error if no client has updated them
/// within this duration.
/// Default: 60 (seconds)
#[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
pub client_action_timeout_s: u64,

/// Remove workers from pool once the worker has not responded in this
/// amount of time in seconds.
/// Default: 5 (seconds)
Expand Down
29 changes: 21 additions & 8 deletions nativelink-scheduler/src/awaited_action_db/awaited_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ pub struct AwaitedAction {
#[metric(help = "The last time the worker updated the AwaitedAction")]
last_worker_updated_timestamp: SystemTime,

/// The last time the client sent a keepalive message.
#[metric(help = "The last time the client sent a keepalive message")]
last_client_keepalive_timestamp: SystemTime,

/// Worker that is currently running this action, None if unassigned.
#[metric(help = "The worker id of the AwaitedAction")]
worker_id: Option<WorkerId>,
Expand Down Expand Up @@ -103,6 +107,7 @@ impl AwaitedAction {
sort_key,
attempts: 0,
last_worker_updated_timestamp: now,
last_client_keepalive_timestamp: now,
worker_id: None,
state,
}
Expand Down Expand Up @@ -144,25 +149,33 @@ impl AwaitedAction {
self.last_worker_updated_timestamp
}

pub(crate) fn keep_alive(&mut self, now: SystemTime) {
pub(crate) fn worker_keep_alive(&mut self, now: SystemTime) {
self.last_worker_updated_timestamp = now;
}

pub(crate) fn last_client_keepalive_timestamp(&self) -> SystemTime {
self.last_client_keepalive_timestamp
}
pub(crate) fn update_client_keep_alive(&mut self, now: SystemTime) {
self.last_client_keepalive_timestamp = now;
}

pub(crate) fn set_client_operation_id(&mut self, client_operation_id: OperationId) {
Arc::make_mut(&mut self.state).client_operation_id = client_operation_id;
}

/// Sets the worker id that is currently processing this action.
pub(crate) fn set_worker_id(&mut self, new_maybe_worker_id: Option<WorkerId>, now: SystemTime) {
if self.worker_id != new_maybe_worker_id {
self.worker_id = new_maybe_worker_id;
self.keep_alive(now);
self.worker_keep_alive(now);
}
}

/// Sets the current state of the action and notifies subscribers.
/// Returns true if the state was set, false if there are no subscribers.
pub fn set_state(&mut self, mut state: Arc<ActionState>, now: Option<SystemTime>) {
/// Sets the current state of the action and updates the last worker updated timestamp.
pub fn worker_set_state(&mut self, mut state: Arc<ActionState>, now: SystemTime) {
std::mem::swap(&mut self.state, &mut state);
if let Some(now) = now {
self.keep_alive(now);
}
self.worker_keep_alive(now);
}
}

Expand Down
8 changes: 2 additions & 6 deletions nativelink-scheduler/src/memory_awaited_action_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,18 +201,14 @@ where
// At this stage we know that this event is a client request, so we need
// to populate the client_operation_id.
let mut awaited_action = self.awaited_action_rx.borrow().clone();
let mut state = awaited_action.state().as_ref().clone();
state.client_operation_id = client_operation_id;
awaited_action.set_state(Arc::new(state), None);
awaited_action.set_client_operation_id(client_operation_id);
Ok(awaited_action)
}

async fn borrow(&self) -> Result<AwaitedAction, Error> {
let mut awaited_action = self.awaited_action_rx.borrow().clone();
if let Some(client_info) = self.client_info.as_ref() {
let mut state = awaited_action.state().as_ref().clone();
state.client_operation_id = client_info.client_operation_id.clone();
awaited_action.set_state(Arc::new(state), None);
awaited_action.set_client_operation_id(client_info.client_operation_id.clone());
}
Ok(awaited_action)
}
Expand Down
11 changes: 11 additions & 0 deletions nativelink-scheduler/src/simple_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ use crate::worker_scheduler::WorkerScheduler;
/// If this changes, remember to change the documentation in the config.
const DEFAULT_WORKER_TIMEOUT_S: u64 = 5;

/// Mark operations as completed with error if no client has updated them
/// within this duration.
/// If this changes, remember to change the documentation in the config.
const DEFAULT_CLIENT_ACTION_TIMEOUT_S: u64 = 60;

/// Default times a job can retry before failing.
/// If this changes, remember to change the documentation in the config.
const DEFAULT_MAX_JOB_RETRIES: usize = 3;
Expand Down Expand Up @@ -324,6 +329,11 @@ impl SimpleScheduler {
worker_timeout_s = DEFAULT_WORKER_TIMEOUT_S;
}

let mut client_action_timeout_s = scheduler_cfg.client_action_timeout_s;
if client_action_timeout_s == 0 {
client_action_timeout_s = DEFAULT_CLIENT_ACTION_TIMEOUT_S;
}

let mut max_job_retries = scheduler_cfg.max_job_retries;
if max_job_retries == 0 {
max_job_retries = DEFAULT_MAX_JOB_RETRIES;
Expand All @@ -333,6 +343,7 @@ impl SimpleScheduler {
let state_manager = SimpleSchedulerStateManager::new(
max_job_retries,
Duration::from_secs(worker_timeout_s),
Duration::from_secs(client_action_timeout_s),
awaited_action_db,
now_fn,
);
Expand Down
Loading

0 comments on commit 8b12016

Please sign in to comment.