diff --git a/nativelink-scheduler/src/scheduler_state/workers.rs b/nativelink-scheduler/src/scheduler_state/workers.rs index d69005d94..25e78e2bb 100644 --- a/nativelink-scheduler/src/scheduler_state/workers.rs +++ b/nativelink-scheduler/src/scheduler_state/workers.rs @@ -15,10 +15,10 @@ use lru::LruCache; use nativelink_config::schedulers::WorkerAllocationStrategy; use nativelink_error::{error_if, make_input_err, Error, ResultExt}; -use nativelink_util::action_messages::{ActionStage, WorkerId}; +use nativelink_util::action_messages::WorkerId; +use nativelink_util::platform_properties::PlatformProperties; use tracing::{event, Level}; -use crate::scheduler_state::awaited_action::AwaitedAction; use crate::worker::{Worker, WorkerTimestamp}; /// A collection of workers that are available to run tasks. @@ -96,22 +96,17 @@ impl Workers { // simulation of worst cases in a single threaded environment. pub(crate) fn find_worker_for_action( &self, - awaited_action: &AwaitedAction, + platform_properties: &PlatformProperties, ) -> Option { - assert!(matches!( - awaited_action.current_state.stage, - ActionStage::Queued - )); - let action_properties = &awaited_action.action_info.platform_properties; let mut workers_iter = self.workers.iter(); let workers_iter = match self.allocation_strategy { // Use rfind to get the least recently used that satisfies the properties. WorkerAllocationStrategy::least_recently_used => workers_iter.rfind(|(_, w)| { - w.can_accept_work() && action_properties.is_satisfied_by(&w.platform_properties) + w.can_accept_work() && platform_properties.is_satisfied_by(&w.platform_properties) }), // Use find to get the most recently used that satisfies the properties. WorkerAllocationStrategy::most_recently_used => workers_iter.find(|(_, w)| { - w.can_accept_work() && action_properties.is_satisfied_by(&w.platform_properties) + w.can_accept_work() && platform_properties.is_satisfied_by(&w.platform_properties) }), }; workers_iter.map(|(_, w)| &w.id).copied() diff --git a/nativelink-scheduler/src/simple_scheduler.rs b/nativelink-scheduler/src/simple_scheduler.rs index 6894f5c5c..4a651fe5a 100644 --- a/nativelink-scheduler/src/simple_scheduler.rs +++ b/nativelink-scheduler/src/simple_scheduler.rs @@ -45,7 +45,6 @@ use crate::operation_state_manager::{ OperationStageFlags, WorkerStateManager, }; use crate::platform_property_manager::PlatformPropertyManager; -use crate::scheduler_state::awaited_action::AwaitedAction; use crate::scheduler_state::metrics::Metrics as SchedulerMetrics; use crate::scheduler_state::state_manager::StateManager; use crate::scheduler_state::workers::Workers; @@ -274,6 +273,17 @@ impl SimpleSchedulerImpl { match action_state_results { Ok(mut stream) => { while let Some(action_state_result) = stream.next().await { + let as_state_result = action_state_result.as_state().await; + let Ok(state) = as_state_result else { + let _ = as_state_result.inspect_err(|err| { + event!( + Level::ERROR, + ?err, + "Failed to get action_info from as_state_result stream" + ); + }); + continue; + }; let action_state_result = action_state_result.as_action_info().await; let Ok(action_info) = action_state_result else { let _ = action_state_result.inspect_err(|err| { @@ -286,28 +296,14 @@ impl SimpleSchedulerImpl { continue; }; - let Some(awaited_action): Option<&AwaitedAction> = self - .state_manager - .inner - .queued_actions - .get(action_info.as_ref()) - else { - event!( - Level::ERROR, - ?action_info, - "queued_actions out of sync with itself" - ); - continue; - }; - let maybe_worker_id: Option = { self.state_manager .inner .workers - .find_worker_for_action(awaited_action) + .find_worker_for_action(&action_info.platform_properties) }; - let operation_id = awaited_action.current_state.id.clone(); + let operation_id = state.id.clone(); let ret = ::update_operation( &mut self.state_manager, operation_id.clone(),