Skip to content

Commit

Permalink
Create scheduler state module
Browse files Browse the repository at this point in the history
Moving enough code out of `simple_scheduler` for decoupling stateful
structures to support trait based interfaces in the future. There is
no logical flow change in this refactor, all tests pass. Some
structures will have `pub(crate)` visable modules and fields to help
with the transition of code reordering.
  • Loading branch information
Adam Singer committed Jun 7, 2024
1 parent f59a1d7 commit c17bf9b
Show file tree
Hide file tree
Showing 8 changed files with 469 additions and 265 deletions.
5 changes: 5 additions & 0 deletions nativelink-scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ rust_library(
"src/operation_state_manager.rs",
"src/platform_property_manager.rs",
"src/property_modifier_scheduler.rs",
"src/scheduler_state/awaited_action.rs",
"src/scheduler_state/completed_action.rs",
"src/scheduler_state/mod.rs",
"src/scheduler_state/state_manager.rs",
"src/scheduler_state/workers.rs",
"src/simple_scheduler.rs",
"src/worker.rs",
"src/worker_scheduler.rs",
Expand Down
1 change: 1 addition & 0 deletions nativelink-scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod grpc_scheduler;
pub mod operation_state_manager;
pub mod platform_property_manager;
pub mod property_modifier_scheduler;
pub mod scheduler_state;
pub mod simple_scheduler;
pub mod worker;
pub mod worker_scheduler;
69 changes: 69 additions & 0 deletions nativelink-scheduler/src/scheduler_state/awaited_action.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2024 The NativeLink Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use nativelink_error::Error;
use nativelink_util::action_messages::{ActionInfo, ActionState};
use nativelink_util::metrics_utils::{CollectorState, MetricsComponent};
use tokio::sync::watch;

use crate::worker::WorkerId;

/// An action that is being awaited on and last known state.
pub struct AwaitedAction {
/// The action that is being awaited on.
pub(crate) action_info: Arc<ActionInfo>,

/// The current state of the action.
pub(crate) current_state: Arc<ActionState>,

/// The channel to notify subscribers of state changes when updated, completed or retrying.
pub(crate) notify_channel: watch::Sender<Arc<ActionState>>,

/// Number of attempts the job has been tried.
pub(crate) attempts: usize,

/// Possible last error set by the worker. If empty and attempts is set, it may be due to
/// something like a worker timeout.
pub(crate) last_error: Option<Error>,

/// Worker that is currently running this action, None if unassigned.
pub(crate) worker_id: Option<WorkerId>,
}

impl MetricsComponent for AwaitedAction {
fn gather_metrics(&self, c: &mut CollectorState) {
c.publish(
"action_digest",
&self.action_info.unique_qualifier.action_name(),
"The digest of the action.",
);
c.publish(
"current_state",
self.current_state.as_ref(),
"The current stage of the action.",
);
c.publish(
"attempts",
&self.attempts,
"The number of attempts this action has tried.",
);
c.publish(
"last_error",
&format!("{:?}", self.last_error),
"The last error this action caused from a retry (if any).",
);
}
}
72 changes: 72 additions & 0 deletions nativelink-scheduler/src/scheduler_state/completed_action.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2024 The NativeLink Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::borrow::Borrow;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use std::time::SystemTime;

use nativelink_util::action_messages::{ActionInfoHashKey, ActionState, OperationId};
use nativelink_util::metrics_utils::{CollectorState, MetricsComponent};

/// A completed action that has no listeners.
pub struct CompletedAction {
/// The time the action was completed.
pub(crate) completed_time: SystemTime,
/// The current state of the action when it was completed.
pub(crate) state: Arc<ActionState>,
}

impl Hash for CompletedAction {
fn hash<H: Hasher>(&self, state: &mut H) {
OperationId::hash(&self.state.id, state);
}
}

impl PartialEq for CompletedAction {
fn eq(&self, other: &Self) -> bool {
OperationId::eq(&self.state.id, &other.state.id)
}
}

impl Eq for CompletedAction {}

impl Borrow<OperationId> for CompletedAction {
#[inline]
fn borrow(&self) -> &OperationId {
&self.state.id
}
}

impl Borrow<ActionInfoHashKey> for CompletedAction {
#[inline]
fn borrow(&self) -> &ActionInfoHashKey {
&self.state.id.unique_qualifier
}
}

impl MetricsComponent for CompletedAction {
fn gather_metrics(&self, c: &mut CollectorState) {
c.publish(
"completed_timestamp",
&self.completed_time,
"The timestamp this action was completed",
);
c.publish(
"current_state",
self.state.as_ref(),
"The current stage of the action.",
);
}
}
18 changes: 18 additions & 0 deletions nativelink-scheduler/src/scheduler_state/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright 2024 The NativeLink Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub(crate) mod awaited_action;
pub(crate) mod completed_action;
pub(crate) mod state_manager;
pub(crate) mod workers;
66 changes: 66 additions & 0 deletions nativelink-scheduler/src/scheduler_state/state_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2024 The NativeLink Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::sync::Arc;

use hashbrown::{HashMap, HashSet};
use nativelink_util::action_messages::ActionInfo;

use crate::scheduler_state::awaited_action::AwaitedAction;
use crate::scheduler_state::completed_action::CompletedAction;
use crate::scheduler_state::workers::Workers;

/// StateManager is responsible for maintaining the state of the scheduler. Scheduler state
/// includes the actions that are queued, active, and recently completed. It also includes the
/// workers that are available to execute actions based on allocation strategy.
pub(crate) struct StateManager {
// TODO(adams): Move `queued_actions_set` and `queued_actions` into a single struct that
// provides a unified interface for interacting with the two containers.

// Important: `queued_actions_set` and `queued_actions` are two containers that provide
// different search and sort capabilities. We are using the two different containers to
// optimize different use cases. `HashSet` is used to look up actions in O(1) time. The
// `BTreeMap` is used to sort actions in O(log n) time based on priority and timestamp.
// These two fields must be kept in-sync, so if you modify one, you likely need to modify the
// other.
/// A `HashSet` of all actions that are queued. A hashset is used to find actions that are queued
/// in O(1) time. This set allows us to find and join on new actions onto already existing
/// (or queued) actions where insert timestamp of queued actions is not known. Using an
/// additional `HashSet` will prevent us from having to iterate the `BTreeMap` to find actions.
///
/// Important: `queued_actions_set` and `queued_actions` must be kept in sync.
pub(crate) queued_actions_set: HashSet<Arc<ActionInfo>>,

/// A BTreeMap of sorted actions that are primarily based on priority and insert timestamp.
/// `ActionInfo` implements `Ord` that defines the `cmp` function for order. Using a BTreeMap
/// gives us to sorted actions that are queued in O(log n) time.
///
/// Important: `queued_actions_set` and `queued_actions` must be kept in sync.
pub(crate) queued_actions: BTreeMap<Arc<ActionInfo>, AwaitedAction>,

/// A `Workers` pool that contains all workers that are available to execute actions in a priority
/// order based on the allocation strategy.
pub(crate) workers: Workers,

/// A map of all actions that are active. A hashmap is used to find actions that are active in
/// O(1) time. The key is the `ActionInfo` struct. The value is the `AwaitedAction` struct.
pub(crate) active_actions: HashMap<Arc<ActionInfo>, AwaitedAction>,

/// These actions completed recently but had no listener, they might have
/// completed while the caller was thinking about calling wait_execution, so
/// keep their completion state around for a while to send back.
/// TODO(#192) Revisit if this is the best way to handle recently completed actions.
pub(crate) recently_completed_actions: HashSet<CompletedAction>,
}
123 changes: 123 additions & 0 deletions nativelink-scheduler/src/scheduler_state/workers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2024 The NativeLink Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use lru::LruCache;
use nativelink_config::schedulers::WorkerAllocationStrategy;
use nativelink_error::{error_if, make_input_err, Error, ResultExt};
use nativelink_util::action_messages::ActionStage;
use tracing::{event, Level};

use crate::scheduler_state::awaited_action::AwaitedAction;
use crate::worker::{Worker, WorkerId, WorkerTimestamp};

pub struct Workers {
pub(crate) workers: LruCache<WorkerId, Worker>,
/// The allocation strategy for workers.
pub(crate) allocation_strategy: WorkerAllocationStrategy,
}

impl Workers {
pub(crate) fn new(allocation_strategy: WorkerAllocationStrategy) -> Self {
Self {
workers: LruCache::unbounded(),
allocation_strategy,
}
}

/// Refreshes the lifetime of the worker with the given timestamp.
pub(crate) fn refresh_lifetime(
&mut self,
worker_id: &WorkerId,
timestamp: WorkerTimestamp,
) -> Result<(), Error> {
let worker = self.workers.get_mut(worker_id).ok_or_else(|| {
make_input_err!(
"Worker not found in worker map in refresh_lifetime() {}",
worker_id
)
})?;
error_if!(
worker.last_update_timestamp > timestamp,
"Worker already had a timestamp of {}, but tried to update it with {}",
worker.last_update_timestamp,
timestamp
);
worker.last_update_timestamp = timestamp;
Ok(())
}

/// Adds a worker to the pool.
/// Note: This function will not do any task matching.
pub(crate) fn add_worker(&mut self, worker: Worker) -> Result<(), Error> {
let worker_id = worker.id;
self.workers.put(worker_id, worker);

// Worker is not cloneable, and we do not want to send the initial connection results until
// we have added it to the map, or we might get some strange race conditions due to the way
// the multi-threaded runtime works.
let worker = self.workers.peek_mut(&worker_id).unwrap();
let res = worker
.send_initial_connection_result()
.err_tip(|| "Failed to send initial connection result to worker");
if let Err(err) = &res {
event!(
Level::ERROR,
?worker_id,
?err,
"Worker connection appears to have been closed while adding to pool"
);
}
res
}

/// Removes worker from pool.
/// Note: The caller is responsible for any rescheduling of any tasks that might be
/// running.
pub(crate) fn remove_worker(&mut self, worker_id: &WorkerId) -> Option<Worker> {
self.workers.pop(worker_id)
}

/// Attempts to find a worker that is capable of running this action.
// TODO(blaise.bruer) This algorithm is not very efficient. Simple testing using a tree-like
// structure showed worse performance on a 10_000 worker * 7 properties * 1000 queued tasks
// simulation of worst cases in a single threaded environment.
pub(crate) fn find_worker_for_action_mut<'a>(
&'a mut self,
awaited_action: &AwaitedAction,
) -> Option<&'a mut Worker> {
assert!(matches!(
awaited_action.current_state.stage,
ActionStage::Queued
));
let action_properties = &awaited_action.action_info.platform_properties;
let mut workers_iter = self.workers.iter_mut();
let workers_iter = match self.allocation_strategy {
// Use rfind to get the least recently used that satisfies the properties.
WorkerAllocationStrategy::least_recently_used => workers_iter.rfind(|(_, w)| {
w.can_accept_work() && action_properties.is_satisfied_by(&w.platform_properties)
}),
// Use find to get the most recently used that satisfies the properties.
WorkerAllocationStrategy::most_recently_used => workers_iter.find(|(_, w)| {
w.can_accept_work() && action_properties.is_satisfied_by(&w.platform_properties)
}),
};
let worker_id = workers_iter.map(|(_, w)| &w.id);
// We need to "touch" the worker to ensure it gets re-ordered in the LRUCache, since it was selected.
if let Some(&worker_id) = worker_id {
self.workers.get_mut(&worker_id)
} else {
None
}
}
}
Loading

0 comments on commit c17bf9b

Please sign in to comment.