Skip to content

Commit

Permalink
Client action listeners will now timeout actions
Browse files Browse the repository at this point in the history
Listeners of the actions will now flag actions as timedout instad of
just if the scheduler detects that the worker went offline.

This is to support distributed schedulers. Since any worker or scheduler
may now go offline we need to ensure the "owner" of the action is
actually the one who cares about it, in this case the client.

towards #359
  • Loading branch information
allada committed Sep 5, 2024
1 parent 753c1e7 commit 8dd54cc
Show file tree
Hide file tree
Showing 3 changed files with 375 additions and 39 deletions.
14 changes: 13 additions & 1 deletion nativelink-scheduler/src/simple_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::sync::Arc;
use std::time::SystemTime;

use async_trait::async_trait;
use futures::Future;
Expand All @@ -21,6 +22,7 @@ use nativelink_metric::{MetricsComponent, RootMetricsComponent};
use nativelink_util::action_messages::{
ActionInfo, ActionStage, ActionState, OperationId, WorkerId,
};
use nativelink_util::instant_wrapper::InstantWrapper;
use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider;
use nativelink_util::operation_state_manager::{
ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager,
Expand Down Expand Up @@ -295,18 +297,22 @@ impl SimpleScheduler {
tokio::time::sleep(Duration::from_millis(1))
},
task_change_notify,
SystemTime::now,
)
}

pub fn new_with_callback<
Fut: Future<Output = ()> + Send,
F: Fn() -> Fut + Send + Sync + 'static,
A: AwaitedActionDb,
I: InstantWrapper,
NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
>(
scheduler_cfg: &nativelink_config::schedulers::SimpleScheduler,
awaited_action_db: A,
on_matching_engine_run: F,
task_change_notify: Arc<Notify>,
now_fn: NowFn,
) -> (Arc<Self>, Arc<dyn WorkerScheduler>) {
let platform_property_manager = Arc::new(PlatformPropertyManager::new(
scheduler_cfg
Expand All @@ -326,7 +332,13 @@ impl SimpleScheduler {
}

let worker_change_notify = Arc::new(Notify::new());
let state_manager = SimpleSchedulerStateManager::new(max_job_retries, awaited_action_db);
let state_manager = SimpleSchedulerStateManager::new(
max_job_retries,
// TODO(allada) This should probably have its own config.
Duration::from_secs(worker_timeout_s),
awaited_action_db,
now_fn,
);

let worker_scheduler = ApiWorkerScheduler::new(
state_manager.clone(),
Expand Down
Loading

0 comments on commit 8dd54cc

Please sign in to comment.