Skip to content

Commit

Permalink
SimpleScheduler version matching uses Aborted to know if failure
Browse files Browse the repository at this point in the history
In the event of a failure of version matching for scheduler owning an
operation we use Aborted error code to now signal that the version
failed and can be retried.

This is not a bug, current in-memory scheduler guarantees protections
here, this is for lockless schedulers.

towards #359
  • Loading branch information
allada committed Sep 1, 2024
1 parent b774be5 commit 771e98b
Show file tree
Hide file tree
Showing 5 changed files with 391 additions and 39 deletions.
1 change: 1 addition & 0 deletions nativelink-scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ rust_test_suite(
"//nativelink-proto",
"//nativelink-store",
"//nativelink-util",
"@crates//:async-lock",
"@crates//:futures",
"@crates//:mock_instant",
"@crates//:pretty_assertions",
Expand Down
4 changes: 2 additions & 2 deletions nativelink-scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
// limitations under the License.

pub mod api_worker_scheduler;
mod awaited_action_db;
pub mod awaited_action_db;
pub mod cache_lookup_scheduler;
pub mod default_scheduler_factory;
pub mod grpc_scheduler;
mod memory_awaited_action_db;
pub mod memory_awaited_action_db;
pub mod platform_property_manager;
pub mod property_modifier_scheduler;
pub mod simple_scheduler;
Expand Down
19 changes: 16 additions & 3 deletions nativelink-scheduler/src/simple_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use futures::Future;
use nativelink_error::{Error, ResultExt};
use nativelink_error::{Code, Error, ResultExt};
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
use nativelink_util::action_messages::{
ActionInfo, ActionStage, ActionState, OperationId, WorkerId,
Expand Down Expand Up @@ -172,6 +172,10 @@ impl SimpleScheduler {
.err_tip(|| "In SimpleScheduler::get_queued_operations getting filter result")
}

pub async fn do_try_match_for_test(&self) -> Result<(), Error> {
self.do_try_match().await
}

// TODO(blaise.bruer) This is an O(n*m) (aka n^2) algorithm. In theory we
// can create a map of capabilities of each worker and then try and match
// the actions to the worker using the map lookup (ie. map reduce).
Expand Down Expand Up @@ -223,10 +227,19 @@ impl SimpleScheduler {
};

// Tell the matching engine that the operation is being assigned to a worker.
matching_engine_state_manager
let assign_result = matching_engine_state_manager
.assign_operation(&operation_id, Ok(&worker_id))
.await
.err_tip(|| "Failed to assign operation in do_try_match")?;
.err_tip(|| "Failed to assign operation in do_try_match");
if let Err(err) = assign_result {
if err.code == Code::Aborted {
// If the operation was aborted, it means that the operation was
// cancelled due to another operation being assigned to the worker.
return Ok(());
}
// Any other error is a real error.
return Err(err);
}

// Notify the worker to run the action.
{
Expand Down
183 changes: 152 additions & 31 deletions nativelink-scheduler/src/simple_scheduler_state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
// limitations under the License.

use std::ops::Bound;
use std::sync::Arc;
use std::sync::{Arc, Weak};
use std::time::{Duration, SystemTime};

use async_lock::Mutex;
use async_trait::async_trait;
use futures::{future, stream, StreamExt, TryStreamExt};
use nativelink_error::{make_err, Code, Error, ResultExt};
Expand All @@ -28,6 +30,7 @@ use nativelink_util::operation_state_manager::{
ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager,
OperationFilter, OperationStageFlags, OrderDirection, WorkerStateManager,
};
use tokio::time::timeout;
use tracing::{event, Level};

use super::awaited_action_db::{
Expand Down Expand Up @@ -121,20 +124,22 @@ fn apply_filter_predicate(awaited_action: &AwaitedAction, filter: &OperationFilt
true
}

struct ClientActionStateResult<T> {
inner: MatchingEngineActionStateResult<T>,
struct ClientActionStateResult<T, U: AwaitedActionDb> {
inner: MatchingEngineActionStateResult<T, U>,
}

impl<T: AwaitedActionSubscriber> ClientActionStateResult<T> {
fn new(sub: T) -> Self {
impl<T: AwaitedActionSubscriber, U: AwaitedActionDb> ClientActionStateResult<T, U> {
fn new(sub: T, simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<U>>) -> Self {
Self {
inner: MatchingEngineActionStateResult::new(sub),
inner: MatchingEngineActionStateResult::new(sub, simple_scheduler_state_manager),
}
}
}

#[async_trait]
impl<T: AwaitedActionSubscriber> ActionStateResult for ClientActionStateResult<T> {
impl<T: AwaitedActionSubscriber, U: AwaitedActionDb> ActionStateResult
for ClientActionStateResult<T, U>
{
async fn as_state(&self) -> Result<Arc<ActionState>, Error> {
self.inner.as_state().await
}
Expand All @@ -148,26 +153,88 @@ impl<T: AwaitedActionSubscriber> ActionStateResult for ClientActionStateResult<T
}
}

struct MatchingEngineActionStateResult<T> {
struct MatchingEngineActionStateResult<T, U: AwaitedActionDb> {
awaited_action_sub: T,
simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<U>>,
}
impl<T: AwaitedActionSubscriber> MatchingEngineActionStateResult<T> {
fn new(awaited_action_sub: T) -> Self {
Self { awaited_action_sub }
impl<T: AwaitedActionSubscriber, U: AwaitedActionDb> MatchingEngineActionStateResult<T, U> {
fn new(
awaited_action_sub: T,
simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<U>>,
) -> Self {
Self {
awaited_action_sub,
simple_scheduler_state_manager,
}
}
}

#[async_trait]
impl<T: AwaitedActionSubscriber> ActionStateResult for MatchingEngineActionStateResult<T> {
impl<T: AwaitedActionSubscriber, U: AwaitedActionDb> ActionStateResult
for MatchingEngineActionStateResult<T, U>
{
async fn as_state(&self) -> Result<Arc<ActionState>, Error> {
Ok(self.awaited_action_sub.borrow().state().clone())
}

async fn changed(&mut self) -> Result<Arc<ActionState>, Error> {
self.awaited_action_sub
.changed()
.await
.map(|v| v.state().clone())
let mut timeout_attempts = 0;
loop {
// TODO!(make const/config)
let timeout_duration = Duration::from_secs(10);
if let Ok(awaited_action_result) =
timeout(timeout_duration, self.awaited_action_sub.changed()).await
{
return awaited_action_result
.err_tip(|| "In MatchingEngineActionStateResult::changed")
.map(|v| v.state().clone());
}
// AwaitedAction timed out. Check to see if the action is being executed
// if it is then issue a retry.
let awaited_action = self.awaited_action_sub.borrow();

if !matches!(awaited_action.state().stage, ActionStage::Executing) {
// We only timeout actions that are executing.
// If they are queued or completed, we should not timeout.
println!(
"operation not in executing state: {:?} - {}",
awaited_action.state().stage,
awaited_action.operation_id()
);
continue;
}

let simple_scheduler_state_manager = self
.simple_scheduler_state_manager
.upgrade()
.err_tip(|| format!("Failed to upgrade weak reference to SimpleSchedulerStateManager in MatchingEngineActionStateResult::changed at attempt: {timeout_attempts}"))?;

event!(
Level::ERROR,
?awaited_action,
"OperationId {} timed out after {} seconds issuing a retry",
awaited_action.operation_id(),
timeout_duration.as_secs_f32(),
);

println!("timing out operation id: {}", awaited_action.operation_id());
simple_scheduler_state_manager
.timeout_operation_id(awaited_action.operation_id())
.await
.err_tip(|| "In MatchingEngineActionStateResult::changed")?;
println!(
"done timing out operation id: {}",
awaited_action.operation_id()
);
if timeout_attempts >= MAX_UPDATE_RETRIES {
return Err(make_err!(
Code::Internal,
"Failed to update action after {} retries with no error set in MatchingEngineActionStateResult::changed",
MAX_UPDATE_RETRIES,
));
}
timeout_attempts += 1;
}
}

async fn as_action_info(&self) -> Result<Arc<ActionInfo>, Error> {
Expand All @@ -190,16 +257,70 @@ pub struct SimpleSchedulerStateManager<T: AwaitedActionDb> {
// of always having it on every SimpleScheduler.
#[metric(help = "Maximum number of times a job can be retried")]
max_job_retries: usize,

timeout_operation_mux: Mutex<()>,

/// Weak reference to self.
weak_self: Weak<Self>,
}

impl<T: AwaitedActionDb> SimpleSchedulerStateManager<T> {
pub fn new(max_job_retries: usize, action_db: T) -> Arc<Self> {
Arc::new(Self {
Arc::new_cyclic(|weak_self| Self {
action_db,
max_job_retries,
timeout_operation_mux: Mutex::new(()),
weak_self: weak_self.clone(),
})
}

/// Let the scheduler know that an operation has timed out from
/// the client side (ie: worker has not updated in a while).
async fn timeout_operation_id(&self, operation_id: &OperationId) -> Result<(), Error> {
// Ensure that only one timeout operation is running at a time.
// Failing to do this could result in the same operation being
// timed out multiple times at the same time.
let _lock = self.timeout_operation_mux.lock().await;

let awaited_action_subscriber = self
.action_db
.get_by_operation_id(operation_id)
.await
.err_tip(|| "In SimpleSchedulerStateManager::timeout_operation_id")?
.err_tip(|| {
format!("Operation id {operation_id} does not exist in SimpleSchedulerStateManager::timeout_operation_id")
})?;

let awaited_action = awaited_action_subscriber.borrow();

// If the action is not executing, we should not timeout the action.
if !matches!(awaited_action.state().stage, ActionStage::Executing) {
return Ok(());
}

// todo!(make this a config).
let timeout_duration = Duration::from_secs(10);
let timeout_operation_older_than = SystemTime::now()
.checked_sub(timeout_duration)
.err_tip(|| "Date too big in SimpleSchedulerStateManager::timeout_operation_id")?;
if awaited_action.last_worker_updated_timestamp() > timeout_operation_older_than {
// The action was updated recently, we should not timeout the action.
// This is to prevent timing out actions that have recently been updated
// (like multiple clients timeout the same action at the same time).
return Ok(());
}

self.assign_operation(
operation_id,
Err(make_err!(
Code::DeadlineExceeded,
"Operation timed out after {} seconds",
timeout_duration.as_secs_f32(),
)),
)
.await
}

async fn inner_update_operation(
&self,
operation_id: &OperationId,
Expand All @@ -213,26 +334,18 @@ impl<T: AwaitedActionDb> SimpleSchedulerStateManager<T> {
.get_by_operation_id(operation_id)
.await
.err_tip(|| "In SimpleSchedulerStateManager::update_operation")?;
println!("HERE2");
let awaited_action_subscriber = match maybe_awaited_action_subscriber {
Some(sub) => sub,
// No action found. It is ok if the action was not found. It probably
// means that the action was dropped, but worker was still processing
// it.
None => return Ok(()),
};
println!("HERE");

let mut awaited_action = awaited_action_subscriber.borrow();

// Make sure we don't update an action that is already completed.
if awaited_action.state().stage.is_finished() {
return Err(make_err!(
Code::Internal,
"Action {operation_id:?} is already completed with state {:?} - maybe_worker_id: {:?}",
awaited_action.state().stage,
maybe_worker_id,
));
}

// Make sure the worker id matches the awaited action worker id.
// This might happen if the worker sending the update is not the
// worker that was assigned.
Expand Down Expand Up @@ -461,15 +574,20 @@ impl<T: AwaitedActionDb> ClientStateManager for SimpleSchedulerStateManager<T> {
.inner_add_operation(client_operation_id.clone(), action_info.clone())
.await?;

Ok(Box::new(ClientActionStateResult::new(sub)))
Ok(Box::new(ClientActionStateResult::new(
sub,
self.weak_self.clone(),
)))
}

async fn filter_operations<'a>(
&'a self,
filter: OperationFilter,
) -> Result<ActionStateResultStream<'a>, Error> {
self.inner_filter_operations(filter, move |rx| Box::new(ClientActionStateResult::new(rx)))
.await
self.inner_filter_operations(filter, move |rx| {
Box::new(ClientActionStateResult::new(rx, self.weak_self.clone()))
})
.await
}

fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> {
Expand Down Expand Up @@ -497,7 +615,10 @@ impl<T: AwaitedActionDb> MatchingEngineStateManager for SimpleSchedulerStateMana
filter: OperationFilter,
) -> Result<ActionStateResultStream<'a>, Error> {
self.inner_filter_operations(filter, |rx| {
Box::new(MatchingEngineActionStateResult::new(rx))
Box::new(MatchingEngineActionStateResult::new(
rx,
self.weak_self.clone(),
))
})
.await
}
Expand Down
Loading

0 comments on commit 771e98b

Please sign in to comment.