Skip to content

Commit

Permalink
SimpleScheduler version matching uses Aborted to know if failure
Browse files Browse the repository at this point in the history
In the event of a failure of version matching for scheduler owning an
operation we use Aborted error code to now signal that the version
failed and can be retried.

This is not a bug, current in-memory scheduler guarantees protections
here, this is for lockless schedulers.

towards #359
  • Loading branch information
allada committed Sep 5, 2024
1 parent 17acce2 commit f5129de
Show file tree
Hide file tree
Showing 4 changed files with 239 additions and 8 deletions.
1 change: 1 addition & 0 deletions nativelink-scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ rust_test_suite(
"//nativelink-proto",
"//nativelink-store",
"//nativelink-util",
"@crates//:async-lock",
"@crates//:futures",
"@crates//:mock_instant",
"@crates//:pretty_assertions",
Expand Down
4 changes: 2 additions & 2 deletions nativelink-scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
// limitations under the License.

pub mod api_worker_scheduler;
mod awaited_action_db;
pub mod awaited_action_db;
pub mod cache_lookup_scheduler;
pub mod default_scheduler_factory;
pub mod grpc_scheduler;
mod memory_awaited_action_db;
pub mod memory_awaited_action_db;
pub mod platform_property_manager;
pub mod property_modifier_scheduler;
pub mod simple_scheduler;
Expand Down
19 changes: 16 additions & 3 deletions nativelink-scheduler/src/simple_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use futures::Future;
use nativelink_error::{Error, ResultExt};
use nativelink_error::{Code, Error, ResultExt};
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
use nativelink_util::action_messages::{
ActionInfo, ActionStage, ActionState, OperationId, WorkerId,
Expand Down Expand Up @@ -172,6 +172,10 @@ impl SimpleScheduler {
.err_tip(|| "In SimpleScheduler::get_queued_operations getting filter result")
}

pub async fn do_try_match_for_test(&self) -> Result<(), Error> {
self.do_try_match().await
}

// TODO(blaise.bruer) This is an O(n*m) (aka n^2) algorithm. In theory we
// can create a map of capabilities of each worker and then try and match
// the actions to the worker using the map lookup (ie. map reduce).
Expand Down Expand Up @@ -223,10 +227,19 @@ impl SimpleScheduler {
};

// Tell the matching engine that the operation is being assigned to a worker.
matching_engine_state_manager
let assign_result = matching_engine_state_manager
.assign_operation(&operation_id, Ok(&worker_id))
.await
.err_tip(|| "Failed to assign operation in do_try_match")?;
.err_tip(|| "Failed to assign operation in do_try_match");
if let Err(err) = assign_result {
if err.code == Code::Aborted {
// If the operation was aborted, it means that the operation was
// cancelled due to another operation being assigned to the worker.
return Ok(());
}
// Any other error is a real error.
return Err(err);
}

// Notify the worker to run the action.
{
Expand Down
223 changes: 220 additions & 3 deletions nativelink-scheduler/tests/simple_scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,34 @@
// limitations under the License.

use std::collections::HashMap;
use std::ops::Bound;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use async_lock::Mutex;
use futures::task::Poll;
use futures::{poll, StreamExt};
use futures::{poll, Stream, StreamExt};
use mock_instant::MockClock;
use nativelink_config::schedulers::PropertyType;
use nativelink_error::{make_err, Code, Error, ResultExt};
use nativelink_macro::nativelink_test;
use nativelink_metric::MetricsComponent;
use nativelink_proto::build::bazel::remote::execution::v2::{digest_function, ExecuteRequest};
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::{
update_for_worker, ConnectionResult, StartExecute, UpdateForWorker,
};
use nativelink_scheduler::awaited_action_db::{
AwaitedAction, AwaitedActionDb, AwaitedActionSubscriber, SortedAwaitedAction,
SortedAwaitedActionState,
};
use nativelink_scheduler::default_scheduler_factory::memory_awaited_action_db_factory;
use nativelink_scheduler::simple_scheduler::SimpleScheduler;
use nativelink_scheduler::worker::Worker;
use nativelink_scheduler::worker_scheduler::WorkerScheduler;
use nativelink_util::action_messages::{
ActionResult, ActionStage, ActionState, DirectoryInfo, ExecutionMetadata, FileInfo, NameOrPath,
OperationId, SymlinkInfo, WorkerId, INTERNAL_ERROR_EXIT_CODE,
ActionInfo, ActionResult, ActionStage, ActionState, DirectoryInfo, ExecutionMetadata, FileInfo,
NameOrPath, OperationId, SymlinkInfo, WorkerId, INTERNAL_ERROR_EXIT_CODE,
};
use nativelink_util::common::DigestInfo;
use nativelink_util::instant_wrapper::MockInstantWrapped;
Expand Down Expand Up @@ -771,6 +778,216 @@ async fn worker_disconnects_does_not_schedule_for_execution_test() -> Result<(),
Ok(())
}

// TODO(allada) These should be gneralized and expanded for more tests.
pub struct MockAwaitedActionSubscriber {}
impl AwaitedActionSubscriber for MockAwaitedActionSubscriber {
async fn changed(&mut self) -> Result<AwaitedAction, Error> {
unreachable!();
}

fn borrow(&self) -> AwaitedAction {
AwaitedAction::new(
OperationId::default(),
make_base_action_info(SystemTime::UNIX_EPOCH, DigestInfo::zero_digest()),
)
}
}

struct MockSenders {
tx_get_awaited_action_by_id:
mpsc::UnboundedSender<Result<Option<MockAwaitedActionSubscriber>, Error>>,
tx_get_by_operation_id:
mpsc::UnboundedSender<Result<Option<MockAwaitedActionSubscriber>, Error>>,
tx_get_range_of_actions: mpsc::UnboundedSender<Vec<Result<MockAwaitedActionSubscriber, Error>>>,
tx_update_awaited_action: mpsc::UnboundedSender<Result<(), Error>>,
}

#[derive(MetricsComponent)]
struct MockAwaitedAction {
rx_get_awaited_action_by_id:
Mutex<mpsc::UnboundedReceiver<Result<Option<MockAwaitedActionSubscriber>, Error>>>,
rx_get_by_operation_id:
Mutex<mpsc::UnboundedReceiver<Result<Option<MockAwaitedActionSubscriber>, Error>>>,
rx_get_range_of_actions:
Mutex<mpsc::UnboundedReceiver<Vec<Result<MockAwaitedActionSubscriber, Error>>>>,
rx_update_awaited_action: Mutex<mpsc::UnboundedReceiver<Result<(), Error>>>,
}
impl MockAwaitedAction {
fn new() -> (MockSenders, Self) {
let (tx_get_awaited_action_by_id, rx_get_awaited_action_by_id) = mpsc::unbounded_channel();
let (tx_get_by_operation_id, rx_get_by_operation_id) = mpsc::unbounded_channel();
let (tx_get_range_of_actions, rx_get_range_of_actions) = mpsc::unbounded_channel();
let (tx_update_awaited_action, rx_update_awaited_action) = mpsc::unbounded_channel();
(
MockSenders {
tx_get_awaited_action_by_id,
tx_get_by_operation_id,
tx_get_range_of_actions,
tx_update_awaited_action,
},
Self {
rx_get_awaited_action_by_id: Mutex::new(rx_get_awaited_action_by_id),
rx_get_by_operation_id: Mutex::new(rx_get_by_operation_id),
rx_get_range_of_actions: Mutex::new(rx_get_range_of_actions),
rx_update_awaited_action: Mutex::new(rx_update_awaited_action),
},
)
}
}
impl AwaitedActionDb for MockAwaitedAction {
type Subscriber = MockAwaitedActionSubscriber;

async fn get_awaited_action_by_id(
&self,
_client_operation_id: &OperationId,
) -> Result<Option<Self::Subscriber>, Error> {
let mut rx_get_awaited_action_by_id = self.rx_get_awaited_action_by_id.lock().await;
rx_get_awaited_action_by_id
.try_recv()
.expect("Could not receive msg in mpsc")
}

async fn get_all_awaited_actions(
&self,
) -> impl Stream<Item = Result<Self::Subscriber, Error>> + Send {
futures::stream::empty()
}

async fn get_by_operation_id(
&self,
_operation_id: &OperationId,
) -> Result<Option<Self::Subscriber>, Error> {
let mut rx_get_by_operation_id = self.rx_get_by_operation_id.lock().await;
rx_get_by_operation_id
.try_recv()
.expect("Could not receive msg in mpsc")
}

async fn get_range_of_actions(
&self,
_state: SortedAwaitedActionState,
_start: Bound<SortedAwaitedAction>,
_end: Bound<SortedAwaitedAction>,
_desc: bool,
) -> impl Stream<Item = Result<Self::Subscriber, Error>> + Send {
let mut rx_get_range_of_actions = self.rx_get_range_of_actions.lock().await;
let items = rx_get_range_of_actions
.try_recv()
.expect("Could not receive msg in mpsc");
futures::stream::iter(items)
}

async fn update_awaited_action(&self, _new_awaited_action: AwaitedAction) -> Result<(), Error> {
let mut rx_update_awaited_action = self.rx_update_awaited_action.lock().await;
rx_update_awaited_action
.try_recv()
.expect("Could not receive msg in mpsc")
}

async fn add_action(
&self,
_client_operation_id: OperationId,
_action_info: Arc<ActionInfo>,
) -> Result<Self::Subscriber, Error> {
unreachable!();
}
}

#[nativelink_test]
async fn matching_engine_fails_sends_abort() -> Result<(), Error> {
{
let task_change_notify = Arc::new(Notify::new());
let (senders, awaited_action) = MockAwaitedAction::new();

let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback(
&nativelink_config::schedulers::SimpleScheduler::default(),
awaited_action,
|| async move {},
task_change_notify,
);
// Initial worker calls do_try_match, so send it no items.
senders.tx_get_range_of_actions.send(vec![]).unwrap();
let _worker_rx = setup_new_worker(
&scheduler,
WorkerId(Uuid::new_v4()),
PlatformProperties::default(),
)
.await
.unwrap();

senders
.tx_get_awaited_action_by_id
.send(Ok(Some(MockAwaitedActionSubscriber {})))
.unwrap();
senders
.tx_get_by_operation_id
.send(Ok(Some(MockAwaitedActionSubscriber {})))
.unwrap();
// This one gets called twice because of Abort triggers retry, just return item not exist on retry.
senders.tx_get_by_operation_id.send(Ok(None)).unwrap();
senders
.tx_get_range_of_actions
.send(vec![Ok(MockAwaitedActionSubscriber {})])
.unwrap();
senders
.tx_update_awaited_action
.send(Err(make_err!(
Code::Aborted,
"This means data version did not match."
)))
.unwrap();

assert_eq!(scheduler.do_try_match_for_test().await, Ok(()));
}
{
let task_change_notify = Arc::new(Notify::new());
let (senders, awaited_action) = MockAwaitedAction::new();

let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback(
&nativelink_config::schedulers::SimpleScheduler::default(),
awaited_action,
|| async move {},
task_change_notify,
);
// senders.tx_get_awaited_action_by_id.send(Ok(None)).unwrap();
senders.tx_get_range_of_actions.send(vec![]).unwrap();
let _worker_rx = setup_new_worker(
&scheduler,
WorkerId(Uuid::new_v4()),
PlatformProperties::default(),
)
.await
.unwrap();

senders
.tx_get_awaited_action_by_id
.send(Ok(Some(MockAwaitedActionSubscriber {})))
.unwrap();
senders
.tx_get_by_operation_id
.send(Ok(Some(MockAwaitedActionSubscriber {})))
.unwrap();
senders
.tx_get_range_of_actions
.send(vec![Ok(MockAwaitedActionSubscriber {})])
.unwrap();
senders
.tx_update_awaited_action
.send(Err(make_err!(
Code::Internal,
"This means an internal error happened."
)))
.unwrap();

assert_eq!(
scheduler.do_try_match_for_test().await.unwrap_err().code,
Code::Internal
);
}

Ok(())
}

#[nativelink_test]
async fn worker_timesout_reschedules_running_job_test() -> Result<(), Error> {
let worker_id1: WorkerId = WorkerId(Uuid::new_v4());
Expand Down

0 comments on commit f5129de

Please sign in to comment.