From 627d6c9ab408439a7a4a8220fd9e7f59baf38a71 Mon Sep 17 00:00:00 2001 From: Blaise Bruer Date: Sat, 31 Aug 2024 23:30:14 -0500 Subject: [PATCH] Prepare scheduler config & move owner of notify task change owner It is easier to do this in one PR than in two because they are dependent on each other. 1. Moves where notifications happen in the scheduler to happen instead in the underlying AwaitedActionDb. 2. Prepare the config changes (non-breaking). towards #359 --- nativelink-config/src/schedulers.rs | 11 ++ .../src/default_scheduler_factory.rs | 60 ++++++- .../src/memory_awaited_action_db.rs | 26 ++- nativelink-scheduler/src/simple_scheduler.rs | 50 ++---- .../src/simple_scheduler_state_manager.rs | 91 +++++----- .../tests/simple_scheduler_test.rs | 155 +++++++++++++++--- 6 files changed, 275 insertions(+), 118 deletions(-) diff --git a/nativelink-config/src/schedulers.rs b/nativelink-config/src/schedulers.rs index 062fb9c98..2e027359e 100644 --- a/nativelink-config/src/schedulers.rs +++ b/nativelink-config/src/schedulers.rs @@ -119,6 +119,17 @@ pub struct SimpleScheduler { /// The strategy used to assign workers jobs. #[serde(default)] pub allocation_strategy: WorkerAllocationStrategy, + + /// The storage backend to use for the scheduler. + /// Default: memory + pub experimental_backend: Option, +} + +#[allow(non_camel_case_types)] +#[derive(Deserialize, Debug)] +pub enum ExperimentalSimpleSchedulerBackend { + /// Use an in-memory store for the scheduler. + memory, } /// A scheduler that simply forwards requests to an upstream scheduler. This diff --git a/nativelink-scheduler/src/default_scheduler_factory.rs b/nativelink-scheduler/src/default_scheduler_factory.rs index a01c4dccd..963a79f5f 100644 --- a/nativelink-scheduler/src/default_scheduler_factory.rs +++ b/nativelink-scheduler/src/default_scheduler_factory.rs @@ -13,18 +13,27 @@ // limitations under the License. use std::sync::Arc; +use std::time::SystemTime; -use nativelink_config::schedulers::SchedulerConfig; +use nativelink_config::schedulers::{ExperimentalSimpleSchedulerBackend, SchedulerConfig}; +use nativelink_config::stores::EvictionPolicy; use nativelink_error::{Error, ResultExt}; use nativelink_store::store_manager::StoreManager; +use nativelink_util::instant_wrapper::InstantWrapper; use nativelink_util::operation_state_manager::ClientStateManager; +use tokio::sync::Notify; use crate::cache_lookup_scheduler::CacheLookupScheduler; use crate::grpc_scheduler::GrpcScheduler; +use crate::memory_awaited_action_db::MemoryAwaitedActionDb; use crate::property_modifier_scheduler::PropertyModifierScheduler; use crate::simple_scheduler::SimpleScheduler; use crate::worker_scheduler::WorkerScheduler; +/// Default timeout for recently completed actions in seconds. +/// If this changes, remember to change the documentation in the config. +const DEFAULT_RETAIN_COMPLETED_FOR_S: u32 = 60; + pub type SchedulerFactoryResults = ( Option>, Option>, @@ -42,10 +51,7 @@ fn inner_scheduler_factory( store_manager: &StoreManager, ) -> Result { let scheduler: SchedulerFactoryResults = match scheduler_type_cfg { - SchedulerConfig::simple(config) => { - let (action_scheduler, worker_scheduler) = SimpleScheduler::new(config); - (Some(action_scheduler), Some(worker_scheduler)) - } + SchedulerConfig::simple(config) => simple_scheduler_factory(config)?, SchedulerConfig::grpc(config) => (Some(Arc::new(GrpcScheduler::new(config)?)), None), SchedulerConfig::cache_lookup(config) => { let ac_store = store_manager @@ -74,3 +80,47 @@ fn inner_scheduler_factory( Ok(scheduler) } + +fn simple_scheduler_factory( + config: &nativelink_config::schedulers::SimpleScheduler, +) -> Result { + match config + .experimental_backend + .as_ref() + .unwrap_or(&ExperimentalSimpleSchedulerBackend::memory) + { + ExperimentalSimpleSchedulerBackend::memory => { + let task_change_notify = Arc::new(Notify::new()); + let awaited_action_db = memory_awaited_action_db_factory( + config.retain_completed_for_s, + task_change_notify.clone(), + SystemTime::now, + ); + let (action_scheduler, worker_scheduler) = + SimpleScheduler::new(config, awaited_action_db, task_change_notify); + Ok((Some(action_scheduler), Some(worker_scheduler))) + } + } +} + +pub fn memory_awaited_action_db_factory( + mut retain_completed_for_s: u32, + task_change_notify: Arc, + now_fn: NowFn, +) -> MemoryAwaitedActionDb +where + I: InstantWrapper, + NowFn: Fn() -> I + Clone + Send + Sync + 'static, +{ + if retain_completed_for_s == 0 { + retain_completed_for_s = DEFAULT_RETAIN_COMPLETED_FOR_S; + } + MemoryAwaitedActionDb::new( + &EvictionPolicy { + max_seconds: retain_completed_for_s, + ..Default::default() + }, + task_change_notify.clone(), + now_fn, + ) +} diff --git a/nativelink-scheduler/src/memory_awaited_action_db.rs b/nativelink-scheduler/src/memory_awaited_action_db.rs index e50864cfb..f9bd9c906 100644 --- a/nativelink-scheduler/src/memory_awaited_action_db.rs +++ b/nativelink-scheduler/src/memory_awaited_action_db.rs @@ -30,7 +30,7 @@ use nativelink_util::evicting_map::{EvictingMap, LenEntry}; use nativelink_util::instant_wrapper::InstantWrapper; use nativelink_util::spawn; use nativelink_util::task::JoinHandleDropGuard; -use tokio::sync::{mpsc, watch}; +use tokio::sync::{mpsc, watch, Notify}; use tracing::{event, Level}; use crate::awaited_action_db::{ @@ -125,14 +125,15 @@ pub struct MemoryAwaitedActionSubscriber I> { } impl I> MemoryAwaitedActionSubscriber { - pub fn new(mut awaited_action_rx: watch::Receiver) -> Self { + fn new(mut awaited_action_rx: watch::Receiver) -> Self { awaited_action_rx.mark_changed(); Self { awaited_action_rx, client_info: None, } } - pub fn new_with_client( + + fn new_with_client( mut awaited_action_rx: watch::Receiver, client_operation_id: OperationId, event_tx: mpsc::UnboundedSender, @@ -799,13 +800,18 @@ impl I + Clone + Send + Sync> AwaitedActionDbI pub struct MemoryAwaitedActionDb I> { #[metric] inner: Arc>>, + tasks_change_notify: Arc, _handle_awaited_action_events: JoinHandleDropGuard<()>, } impl I + Clone + Send + Sync + 'static> MemoryAwaitedActionDb { - pub fn new(eviction_config: &EvictionPolicy, now_fn: NowFn) -> Self { + pub fn new( + eviction_config: &EvictionPolicy, + tasks_change_notify: Arc, + now_fn: NowFn, + ) -> Self { let (action_event_tx, mut action_event_rx) = mpsc::unbounded_channel(); let inner = Arc::new(Mutex::new(AwaitedActionDbImpl { client_operation_to_awaited_action: EvictingMap::new(eviction_config, (now_fn)()), @@ -819,6 +825,7 @@ impl I + Clone + Send + Sync + 'static> let weak_inner = Arc::downgrade(&inner); Self { inner, + tasks_change_notify, _handle_awaited_action_events: spawn!("handle_awaited_action_events", async move { let mut dropped_operation_ids = Vec::with_capacity(MAX_ACTION_EVENTS_RX_PER_CYCLE); loop { @@ -927,7 +934,9 @@ impl I + Clone + Send + Sync + 'static> Awaite self.inner .lock() .await - .update_awaited_action(new_awaited_action) + .update_awaited_action(new_awaited_action)?; + self.tasks_change_notify.notify_one(); + Ok(()) } async fn add_action( @@ -935,10 +944,13 @@ impl I + Clone + Send + Sync + 'static> Awaite client_operation_id: OperationId, action_info: Arc, ) -> Result { - self.inner + let subscriber = self + .inner .lock() .await .add_action(client_operation_id, action_info) - .await + .await?; + self.tasks_change_notify.notify_one(); + Ok(subscriber) } } diff --git a/nativelink-scheduler/src/simple_scheduler.rs b/nativelink-scheduler/src/simple_scheduler.rs index bc6ae059d..d5d183813 100644 --- a/nativelink-scheduler/src/simple_scheduler.rs +++ b/nativelink-scheduler/src/simple_scheduler.rs @@ -13,17 +13,14 @@ // limitations under the License. use std::sync::Arc; -use std::time::SystemTime; use async_trait::async_trait; use futures::Future; -use nativelink_config::stores::EvictionPolicy; use nativelink_error::{Error, ResultExt}; use nativelink_metric::{MetricsComponent, RootMetricsComponent}; use nativelink_util::action_messages::{ ActionInfo, ActionStage, ActionState, OperationId, WorkerId, }; -use nativelink_util::instant_wrapper::InstantWrapper; use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider; use nativelink_util::operation_state_manager::{ ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager, @@ -37,7 +34,7 @@ use tokio_stream::StreamExt; use tracing::{event, Level}; use crate::api_worker_scheduler::ApiWorkerScheduler; -use crate::memory_awaited_action_db::MemoryAwaitedActionDb; +use crate::awaited_action_db::AwaitedActionDb; use crate::platform_property_manager::PlatformPropertyManager; use crate::simple_scheduler_state_manager::SimpleSchedulerStateManager; use crate::worker::{ActionInfoWithProps, Worker, WorkerTimestamp}; @@ -47,10 +44,6 @@ use crate::worker_scheduler::WorkerScheduler; /// If this changes, remember to change the documentation in the config. const DEFAULT_WORKER_TIMEOUT_S: u64 = 5; -/// Default timeout for recently completed actions in seconds. -/// If this changes, remember to change the documentation in the config. -const DEFAULT_RETAIN_COMPLETED_FOR_S: u32 = 60; - /// Default times a job can retry before failing. /// If this changes, remember to change the documentation in the config. const DEFAULT_MAX_JOB_RETRIES: usize = 3; @@ -269,11 +262,14 @@ impl SimpleScheduler { } impl SimpleScheduler { - pub fn new( + pub fn new( scheduler_cfg: &nativelink_config::schedulers::SimpleScheduler, + awaited_action_db: A, + task_change_notify: Arc, ) -> (Arc, Arc) { Self::new_with_callback( scheduler_cfg, + awaited_action_db, || { // The cost of running `do_try_match()` is very high, but constant // in relation to the number of changes that have happened. This @@ -285,19 +281,19 @@ impl SimpleScheduler { // scheduled within a future. tokio::time::sleep(Duration::from_millis(1)) }, - SystemTime::now, + task_change_notify, ) } pub fn new_with_callback< Fut: Future + Send, F: Fn() -> Fut + Send + Sync + 'static, - I: InstantWrapper, - NowFn: Fn() -> I + Clone + Send + Sync + 'static, + A: AwaitedActionDb, >( scheduler_cfg: &nativelink_config::schedulers::SimpleScheduler, + awaited_action_db: A, on_matching_engine_run: F, - now_fn: NowFn, + task_change_notify: Arc, ) -> (Arc, Arc) { let platform_property_manager = Arc::new(PlatformPropertyManager::new( scheduler_cfg @@ -311,34 +307,19 @@ impl SimpleScheduler { worker_timeout_s = DEFAULT_WORKER_TIMEOUT_S; } - let mut retain_completed_for_s = scheduler_cfg.retain_completed_for_s; - if retain_completed_for_s == 0 { - retain_completed_for_s = DEFAULT_RETAIN_COMPLETED_FOR_S; - } - let mut max_job_retries = scheduler_cfg.max_job_retries; if max_job_retries == 0 { max_job_retries = DEFAULT_MAX_JOB_RETRIES; } - let tasks_or_worker_change_notify = Arc::new(Notify::new()); - let state_manager = SimpleSchedulerStateManager::new( - tasks_or_worker_change_notify.clone(), - max_job_retries, - MemoryAwaitedActionDb::new( - &EvictionPolicy { - max_seconds: retain_completed_for_s, - ..Default::default() - }, - now_fn, - ), - ); + let worker_change_notify = Arc::new(Notify::new()); + let state_manager = SimpleSchedulerStateManager::new(max_job_retries, awaited_action_db); let worker_scheduler = ApiWorkerScheduler::new( state_manager.clone(), platform_property_manager.clone(), scheduler_cfg.allocation_strategy, - tasks_or_worker_change_notify.clone(), + worker_change_notify.clone(), worker_timeout_s, ); @@ -350,7 +331,12 @@ impl SimpleScheduler { spawn!("simple_scheduler_task_worker_matching", async move { // Break out of the loop only when the inner is dropped. loop { - tasks_or_worker_change_notify.notified().await; + let task_change_fut = task_change_notify.notified(); + let worker_change_fut = worker_change_notify.notified(); + tokio::pin!(task_change_fut); + tokio::pin!(worker_change_fut); + // Wait for either of these futures to be ready. + let _ = futures::future::select(task_change_fut, worker_change_fut).await; let result = match weak_inner.upgrade() { Some(scheduler) => scheduler.do_try_match().await, // If the inner went away it means the scheduler is shutting diff --git a/nativelink-scheduler/src/simple_scheduler_state_manager.rs b/nativelink-scheduler/src/simple_scheduler_state_manager.rs index b8c0c1069..255630f53 100644 --- a/nativelink-scheduler/src/simple_scheduler_state_manager.rs +++ b/nativelink-scheduler/src/simple_scheduler_state_manager.rs @@ -28,7 +28,6 @@ use nativelink_util::operation_state_manager::{ ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager, OperationFilter, OperationStageFlags, OrderDirection, WorkerStateManager, }; -use tokio::sync::Notify; use tracing::{event, Level}; use super::awaited_action_db::{ @@ -122,60 +121,57 @@ fn apply_filter_predicate(awaited_action: &AwaitedAction, filter: &OperationFilt true } -pub struct MatchingEngineActionStateResult { - awaited_action_sub: T, +struct ClientActionStateResult { + inner: MatchingEngineActionStateResult, } -impl MatchingEngineActionStateResult { - pub fn new(awaited_action_sub: T) -> Self { - Self { awaited_action_sub } + +impl ClientActionStateResult { + fn new(sub: T) -> Self { + Self { + inner: MatchingEngineActionStateResult::new(sub), + } } } #[async_trait] -impl ActionStateResult for MatchingEngineActionStateResult { +impl ActionStateResult for ClientActionStateResult { async fn as_state(&self) -> Result, Error> { - Ok(self.awaited_action_sub.borrow().state().clone()) + self.inner.as_state().await } async fn changed(&mut self) -> Result, Error> { - let awaited_action = self.awaited_action_sub.changed().await.map_err(|e| { - make_err!( - Code::Internal, - "Failed to wait for awaited action to change {e:?}" - ) - })?; - Ok(awaited_action.state().clone()) + self.inner.changed().await } async fn as_action_info(&self) -> Result, Error> { - Ok(self.awaited_action_sub.borrow().action_info().clone()) + self.inner.as_action_info().await } } -pub(crate) struct ClientActionStateResult { - inner: MatchingEngineActionStateResult, +struct MatchingEngineActionStateResult { + awaited_action_sub: T, } - -impl ClientActionStateResult { - pub fn new(sub: T) -> Self { - Self { - inner: MatchingEngineActionStateResult::new(sub), - } +impl MatchingEngineActionStateResult { + fn new(awaited_action_sub: T) -> Self { + Self { awaited_action_sub } } } #[async_trait] -impl ActionStateResult for ClientActionStateResult { +impl ActionStateResult for MatchingEngineActionStateResult { async fn as_state(&self) -> Result, Error> { - self.inner.as_state().await + Ok(self.awaited_action_sub.borrow().state().clone()) } async fn changed(&mut self) -> Result, Error> { - self.inner.changed().await + self.awaited_action_sub + .changed() + .await + .map(|v| v.state().clone()) } async fn as_action_info(&self) -> Result, Error> { - self.inner.as_action_info().await + Ok(self.awaited_action_sub.borrow().action_info().clone()) } } @@ -189,9 +185,6 @@ pub struct SimpleSchedulerStateManager { #[metric(group = "action_db")] action_db: T, - /// Notify matching engine that work needs to be done. - tasks_change_notify: Arc, - /// Maximum number of times a job can be retried. // TODO(allada) This should be a scheduler decorator instead // of always having it on every SimpleScheduler. @@ -200,14 +193,9 @@ pub struct SimpleSchedulerStateManager { } impl SimpleSchedulerStateManager { - pub fn new( - tasks_change_notify: Arc, - max_job_retries: usize, - action_db: T, - ) -> Arc { + pub fn new(max_job_retries: usize, action_db: T) -> Arc { Arc::new(Self { action_db, - tasks_change_notify, max_job_retries, }) } @@ -224,7 +212,7 @@ impl SimpleSchedulerStateManager { .action_db .get_by_operation_id(operation_id) .await - .err_tip(|| "In MemorySchedulerStateManager::update_operation")?; + .err_tip(|| "In SimpleSchedulerStateManager::update_operation")?; let awaited_action_subscriber = match maybe_awaited_action_subscriber { Some(sub) => sub, // No action found. It is ok if the action was not found. It probably @@ -252,20 +240,22 @@ impl SimpleSchedulerStateManager { && maybe_worker_id.is_some() && maybe_worker_id != awaited_action.worker_id().as_ref() { + // If another worker is already assigned to the action, another + // worker probably picked up the action. We should not update the + // action in this case and abort this operation. let err = make_err!( - Code::Internal, + Code::Aborted, "Worker ids do not match - {:?} != {:?} for {:?}", maybe_worker_id, awaited_action.worker_id(), awaited_action, ); event!( - Level::ERROR, - ?operation_id, - ?maybe_worker_id, - ?awaited_action, - "{}", - err.to_string(), + Level::INFO, + "Worker ids do not match - {:?} != {:?} for {:?}. This is probably due to another worker picking up the action.", + maybe_worker_id, + awaited_action.worker_id(), + awaited_action, ); return Err(err); } @@ -315,7 +305,7 @@ impl SimpleSchedulerStateManager { .action_db .update_awaited_action(awaited_action) .await - .err_tip(|| "In MemorySchedulerStateManager::update_operation"); + .err_tip(|| "In SimpleSchedulerStateManager::update_operation"); if let Err(err) = update_action_result { // We use Aborted to signal that the action was not // updated due to the data being set was not the latest @@ -327,8 +317,6 @@ impl SimpleSchedulerStateManager { return Err(err); } } - - self.tasks_change_notify.notify_one(); return Ok(()); } match last_err { @@ -346,13 +334,10 @@ impl SimpleSchedulerStateManager { new_client_operation_id: OperationId, action_info: Arc, ) -> Result { - let rx = self - .action_db + self.action_db .add_action(new_client_operation_id, action_info) .await - .err_tip(|| "In MemorySchedulerStateManager::add_operation")?; - self.tasks_change_notify.notify_one(); - Ok(rx) + .err_tip(|| "In SimpleSchedulerStateManager::add_operation") } async fn inner_filter_operations<'a, F>( diff --git a/nativelink-scheduler/tests/simple_scheduler_test.rs b/nativelink-scheduler/tests/simple_scheduler_test.rs index fdc56b070..d73b5513e 100644 --- a/nativelink-scheduler/tests/simple_scheduler_test.rs +++ b/nativelink-scheduler/tests/simple_scheduler_test.rs @@ -27,6 +27,7 @@ use nativelink_proto::build::bazel::remote::execution::v2::{digest_function, Exe use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::{ update_for_worker, ConnectionResult, StartExecute, UpdateForWorker, }; +use nativelink_scheduler::default_scheduler_factory::memory_awaited_action_db_factory; use nativelink_scheduler::simple_scheduler::SimpleScheduler; use nativelink_scheduler::worker::Worker; use nativelink_scheduler::worker_scheduler::WorkerScheduler; @@ -41,7 +42,7 @@ use nativelink_util::operation_state_manager::{ }; use nativelink_util::platform_properties::{PlatformProperties, PlatformPropertyValue}; use pretty_assertions::assert_eq; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, Notify}; use utils::scheduler_utils::{make_base_action_info, INSTANCE_NAME}; use uuid::Uuid; @@ -148,10 +149,16 @@ const WORKER_TIMEOUT_S: u64 = 100; async fn basic_add_action_with_one_worker_test() -> Result<(), Error> { let worker_id: WorkerId = WorkerId(Uuid::new_v4()); + let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( &nativelink_config::schedulers::SimpleScheduler::default(), + memory_awaited_action_db_factory( + 0, + task_change_notify.clone(), + MockInstantWrapped::default, + ), || async move {}, - MockInstantWrapped::default, + task_change_notify, ); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -200,10 +207,16 @@ async fn basic_add_action_with_one_worker_test() -> Result<(), Error> { async fn find_executing_action() -> Result<(), Error> { let worker_id: WorkerId = WorkerId(Uuid::new_v4()); + let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( &nativelink_config::schedulers::SimpleScheduler::default(), + memory_awaited_action_db_factory( + 0, + task_change_notify.clone(), + MockInstantWrapped::default, + ), || async move {}, - MockInstantWrapped::default, + task_change_notify, ); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -270,13 +283,19 @@ async fn find_executing_action() -> Result<(), Error> { async fn remove_worker_reschedules_multiple_running_job_test() -> Result<(), Error> { let worker_id1: WorkerId = WorkerId(Uuid::new_v4()); let worker_id2: WorkerId = WorkerId(Uuid::new_v4()); + let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( &nativelink_config::schedulers::SimpleScheduler { worker_timeout_s: WORKER_TIMEOUT_S, ..Default::default() }, + memory_awaited_action_db_factory( + 0, + task_change_notify.clone(), + MockInstantWrapped::default, + ), || async move {}, - MockInstantWrapped::default, + task_change_notify, ); let action_digest1 = DigestInfo::new([99u8; 32], 512); let action_digest2 = DigestInfo::new([88u8; 32], 512); @@ -441,10 +460,16 @@ async fn remove_worker_reschedules_multiple_running_job_test() -> Result<(), Err async fn set_drain_worker_pauses_and_resumes_worker_test() -> Result<(), Error> { let worker_id: WorkerId = WorkerId(Uuid::new_v4()); + let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( &nativelink_config::schedulers::SimpleScheduler::default(), + memory_awaited_action_db_factory( + 0, + task_change_notify.clone(), + MockInstantWrapped::default, + ), || async move {}, - MockInstantWrapped::default, + task_change_notify, ); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -517,13 +542,20 @@ async fn worker_should_not_queue_if_properties_dont_match_test() -> Result<(), E let mut prop_defs = HashMap::new(); prop_defs.insert("prop".to_string(), PropertyType::exact); + + let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( &nativelink_config::schedulers::SimpleScheduler { supported_platform_properties: Some(prop_defs), ..Default::default() }, + memory_awaited_action_db_factory( + 0, + task_change_notify.clone(), + MockInstantWrapped::default, + ), || async move {}, - MockInstantWrapped::default, + task_change_notify, ); let action_digest = DigestInfo::new([99u8; 32], 512); let mut platform_properties = HashMap::new(); @@ -604,10 +636,16 @@ async fn worker_should_not_queue_if_properties_dont_match_test() -> Result<(), E async fn cacheable_items_join_same_action_queued_test() -> Result<(), Error> { let worker_id: WorkerId = WorkerId(Uuid::new_v4()); + let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( &nativelink_config::schedulers::SimpleScheduler::default(), + memory_awaited_action_db_factory( + 0, + task_change_notify.clone(), + MockInstantWrapped::default, + ), || async move {}, - MockInstantWrapped::default, + task_change_notify, ); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -695,12 +733,18 @@ async fn cacheable_items_join_same_action_queued_test() -> Result<(), Error> { #[nativelink_test] async fn worker_disconnects_does_not_schedule_for_execution_test() -> Result<(), Error> { - let worker_id: WorkerId = WorkerId(Uuid::new_v4()); + let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( &nativelink_config::schedulers::SimpleScheduler::default(), + memory_awaited_action_db_factory( + 0, + task_change_notify.clone(), + MockInstantWrapped::default, + ), || async move {}, - MockInstantWrapped::default, + task_change_notify, ); + let worker_id: WorkerId = WorkerId(Uuid::new_v4()); let action_digest = DigestInfo::new([99u8; 32], 512); let rx_from_worker = @@ -731,13 +775,19 @@ async fn worker_disconnects_does_not_schedule_for_execution_test() -> Result<(), async fn worker_timesout_reschedules_running_job_test() -> Result<(), Error> { let worker_id1: WorkerId = WorkerId(Uuid::new_v4()); let worker_id2: WorkerId = WorkerId(Uuid::new_v4()); + let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( &nativelink_config::schedulers::SimpleScheduler { worker_timeout_s: WORKER_TIMEOUT_S, ..Default::default() }, + memory_awaited_action_db_factory( + 0, + task_change_notify.clone(), + MockInstantWrapped::default, + ), || async move {}, - MockInstantWrapped::default, + task_change_notify, ); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -849,10 +899,16 @@ async fn worker_timesout_reschedules_running_job_test() -> Result<(), Error> { async fn update_action_sends_completed_result_to_client_test() -> Result<(), Error> { let worker_id: WorkerId = WorkerId(Uuid::new_v4()); + let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( &nativelink_config::schedulers::SimpleScheduler::default(), + memory_awaited_action_db_factory( + 0, + task_change_notify.clone(), + MockInstantWrapped::default, + ), || async move {}, - MockInstantWrapped::default, + task_change_notify, ); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -941,10 +997,16 @@ async fn update_action_sends_completed_result_to_client_test() -> Result<(), Err async fn update_action_sends_completed_result_after_disconnect() -> Result<(), Error> { let worker_id: WorkerId = WorkerId(Uuid::new_v4()); + let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( &nativelink_config::schedulers::SimpleScheduler::default(), + memory_awaited_action_db_factory( + 0, + task_change_notify.clone(), + MockInstantWrapped::default, + ), || async move {}, - MockInstantWrapped::default, + task_change_notify, ); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -1050,10 +1112,16 @@ async fn update_action_with_wrong_worker_id_errors_test() -> Result<(), Error> { let good_worker_id: WorkerId = WorkerId(Uuid::new_v4()); let rogue_worker_id: WorkerId = WorkerId(Uuid::new_v4()); + let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( &nativelink_config::schedulers::SimpleScheduler::default(), + memory_awaited_action_db_factory( + 0, + task_change_notify.clone(), + MockInstantWrapped::default, + ), || async move {}, - MockInstantWrapped::default, + task_change_notify, ); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -1139,10 +1207,16 @@ async fn update_action_with_wrong_worker_id_errors_test() -> Result<(), Error> { async fn does_not_crash_if_operation_joined_then_relaunched() -> Result<(), Error> { let worker_id: WorkerId = WorkerId(Uuid::new_v4()); + let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( &nativelink_config::schedulers::SimpleScheduler::default(), + memory_awaited_action_db_factory( + 0, + task_change_notify.clone(), + MockInstantWrapped::default, + ), || async move {}, - MockInstantWrapped::default, + task_change_notify, ); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -1270,13 +1344,19 @@ async fn run_two_jobs_on_same_worker_with_platform_properties_restrictions() -> let mut supported_props = HashMap::new(); supported_props.insert("prop1".to_string(), PropertyType::minimum); + let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( &nativelink_config::schedulers::SimpleScheduler { supported_platform_properties: Some(supported_props), ..Default::default() }, + memory_awaited_action_db_factory( + 0, + task_change_notify.clone(), + MockInstantWrapped::default, + ), || async move {}, - MockInstantWrapped::default, + task_change_notify, ); let action_digest1 = DigestInfo::new([11u8; 32], 512); let action_digest2 = DigestInfo::new([99u8; 32], 512); @@ -1421,13 +1501,19 @@ async fn run_jobs_in_the_order_they_were_queued() -> Result<(), Error> { let mut supported_props = HashMap::new(); supported_props.insert("prop1".to_string(), PropertyType::minimum); + let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( &nativelink_config::schedulers::SimpleScheduler { supported_platform_properties: Some(supported_props), ..Default::default() }, + memory_awaited_action_db_factory( + 0, + task_change_notify.clone(), + MockInstantWrapped::default, + ), || async move {}, - MockInstantWrapped::default, + task_change_notify, ); let action_digest1 = DigestInfo::new([11u8; 32], 512); let action_digest2 = DigestInfo::new([99u8; 32], 512); @@ -1481,13 +1567,19 @@ async fn run_jobs_in_the_order_they_were_queued() -> Result<(), Error> { async fn worker_retries_on_internal_error_and_fails_test() -> Result<(), Error> { let worker_id: WorkerId = WorkerId(Uuid::new_v4()); + let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( &nativelink_config::schedulers::SimpleScheduler { max_job_retries: 1, ..Default::default() }, + memory_awaited_action_db_factory( + 0, + task_change_notify.clone(), + MockInstantWrapped::default, + ), || async move {}, - MockInstantWrapped::default, + task_change_notify, ); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -1622,14 +1714,20 @@ async fn ensure_scheduler_drops_inner_spawn() -> Result<(), Error> { // Since the inner spawn owns this callback, we can use the callback to know if the // inner spawn was dropped because our callback would be dropped, which dropps our // DropChecker. + let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( &nativelink_config::schedulers::SimpleScheduler::default(), + memory_awaited_action_db_factory( + 0, + task_change_notify.clone(), + MockInstantWrapped::default, + ), move || { // This will ensure dropping happens if this function is ever dropped. let _drop_checker = drop_checker.clone(); async move {} }, - MockInstantWrapped::default, + task_change_notify, ); assert_eq!(dropped.load(Ordering::Relaxed), false); @@ -1648,10 +1746,16 @@ async fn ensure_task_or_worker_change_notification_received_test() -> Result<(), let worker_id1: WorkerId = WorkerId(Uuid::new_v4()); let worker_id2: WorkerId = WorkerId(Uuid::new_v4()); + let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( &nativelink_config::schedulers::SimpleScheduler::default(), + memory_awaited_action_db_factory( + 0, + task_change_notify.clone(), + MockInstantWrapped::default, + ), || async move {}, - MockInstantWrapped::default, + task_change_notify, ); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -1713,10 +1817,19 @@ async fn ensure_task_or_worker_change_notification_received_test() -> Result<(), // https://github.com/TraceMachina/nativelink/issues/1197 #[nativelink_test] async fn client_reconnect_keeps_action_alive() -> Result<(), Error> { + let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( - &nativelink_config::schedulers::SimpleScheduler::default(), + &nativelink_config::schedulers::SimpleScheduler { + worker_timeout_s: WORKER_TIMEOUT_S, + ..Default::default() + }, + memory_awaited_action_db_factory( + 0, + task_change_notify.clone(), + MockInstantWrapped::default, + ), || async move {}, - MockInstantWrapped::default, + task_change_notify, ); let action_digest = DigestInfo::new([99u8; 32], 512);