Skip to content

Commit

Permalink
[Feature] Add Redis Scheduler
Browse files Browse the repository at this point in the history
Adds an experimental redis scheduler that can be used as a distributed
state-persistent scheduler backend. This scheduler is optimized to have
each worker be its own scheduler or many small schedulers.

closes: #359
  • Loading branch information
allada committed Sep 10, 2024
1 parent ac4ca57 commit 3917a90
Show file tree
Hide file tree
Showing 19 changed files with 1,362 additions and 33 deletions.
51 changes: 51 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions nativelink-config/src/schedulers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,16 @@ pub struct SimpleScheduler {
pub enum ExperimentalSimpleSchedulerBackend {
/// Use an in-memory store for the scheduler.
memory,
/// Use a redis store for the scheduler.
redis(ExperimentalRedisSchedulerBackend),
}

#[derive(Deserialize, Debug, Default)]
#[serde(deny_unknown_fields)]
pub struct ExperimentalRedisSchedulerBackend {
/// A reference to the redis store to use for the scheduler.
/// Note: This MUST resolve to a RedisStore.
pub redis_store: StoreRefName,
}

/// A scheduler that simply forwards requests to an upstream scheduler. This
Expand Down
5 changes: 5 additions & 0 deletions nativelink-scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ rust_test_suite(
"tests/action_messages_test.rs",
"tests/cache_lookup_scheduler_test.rs",
"tests/property_modifier_scheduler_test.rs",
"tests/redis_store_awaited_action_db_test.rs",
"tests/simple_scheduler_test.rs",
],
compile_data = [
Expand All @@ -79,10 +80,14 @@ rust_test_suite(
"//nativelink-store",
"//nativelink-util",
"@crates//:async-lock",
"@crates//:bytes",
"@crates//:fred",
"@crates//:futures",
"@crates//:mock_instant",
"@crates//:parking_lot",
"@crates//:pretty_assertions",
"@crates//:prost",
"@crates//:serde_json",
"@crates//:tokio",
"@crates//:tokio-stream",
"@crates//:uuid",
Expand Down
3 changes: 3 additions & 0 deletions nativelink-scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,6 @@ static_assertions = "1.1.0"
[dev-dependencies]
nativelink-macro = { path = "../nativelink-macro" }
pretty_assertions = { version = "1.4.0", features = ["std"] }
fred = { version = "9.1.2", default-features = false, features = [
"mocks",
] }
14 changes: 7 additions & 7 deletions nativelink-scheduler/src/awaited_action_db/awaited_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use static_assertions::{assert_eq_size, const_assert, const_assert_eq};
/// The version of the awaited action.
/// This number will always increment by one each time
/// the action is updated.
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
struct AwaitedActionVersion(u64);

impl MetricsComponent for AwaitedActionVersion {
Expand Down Expand Up @@ -80,7 +80,7 @@ pub struct AwaitedAction {
}

impl AwaitedAction {
pub fn new(operation_id: OperationId, action_info: Arc<ActionInfo>) -> Self {
pub fn new(operation_id: OperationId, action_info: Arc<ActionInfo>, now: SystemTime) -> Self {
let stage = ActionStage::Queued;
let sort_key = AwaitedActionSortKey::new_with_unique_key(
action_info.priority,
Expand All @@ -102,7 +102,7 @@ impl AwaitedAction {
operation_id,
sort_key,
attempts: 0,
last_worker_updated_timestamp: SystemTime::now(),
last_worker_updated_timestamp: now,
worker_id: None,
state,
}
Expand All @@ -120,19 +120,19 @@ impl AwaitedAction {
self.version = AwaitedActionVersion(self.version.0 + 1);
}

pub(crate) fn action_info(&self) -> &Arc<ActionInfo> {
pub fn action_info(&self) -> &Arc<ActionInfo> {
&self.action_info
}

pub(crate) fn operation_id(&self) -> &OperationId {
pub fn operation_id(&self) -> &OperationId {
&self.operation_id
}

pub(crate) fn sort_key(&self) -> AwaitedActionSortKey {
self.sort_key
}

pub(crate) fn state(&self) -> &Arc<ActionState> {
pub fn state(&self) -> &Arc<ActionState> {
&self.state
}

Expand All @@ -158,7 +158,7 @@ impl AwaitedAction {

/// Sets the current state of the action and notifies subscribers.
/// Returns true if the state was set, false if there are no subscribers.
pub(crate) fn set_state(&mut self, mut state: Arc<ActionState>, now: Option<SystemTime>) {
pub fn set_state(&mut self, mut state: Arc<ActionState>, now: Option<SystemTime>) {
std::mem::swap(&mut self.state, &mut state);
if let Some(now) = now {
self.keep_alive(now);
Expand Down
40 changes: 38 additions & 2 deletions nativelink-scheduler/src/default_scheduler_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use std::time::SystemTime;

use nativelink_config::schedulers::{ExperimentalSimpleSchedulerBackend, SchedulerConfig};
use nativelink_config::stores::EvictionPolicy;
use nativelink_error::{Error, ResultExt};
use nativelink_error::{make_input_err, Error, ResultExt};
use nativelink_store::redis_store::RedisStore;
use nativelink_store::store_manager::StoreManager;
use nativelink_util::instant_wrapper::InstantWrapper;
use nativelink_util::operation_state_manager::ClientStateManager;
Expand All @@ -28,6 +29,7 @@ use crate::grpc_scheduler::GrpcScheduler;
use crate::memory_awaited_action_db::MemoryAwaitedActionDb;
use crate::property_modifier_scheduler::PropertyModifierScheduler;
use crate::simple_scheduler::SimpleScheduler;
use crate::store_awaited_action_db::StoreAwaitedActionDb;
use crate::worker_scheduler::WorkerScheduler;

/// Default timeout for recently completed actions in seconds.
Expand All @@ -51,7 +53,9 @@ fn inner_scheduler_factory(
store_manager: &StoreManager,
) -> Result<SchedulerFactoryResults, Error> {
let scheduler: SchedulerFactoryResults = match scheduler_type_cfg {
SchedulerConfig::simple(config) => simple_scheduler_factory(config)?,
SchedulerConfig::simple(config) => {
simple_scheduler_factory(config, store_manager, SystemTime::now)?
}
SchedulerConfig::grpc(config) => (Some(Arc::new(GrpcScheduler::new(config)?)), None),
SchedulerConfig::cache_lookup(config) => {
let ac_store = store_manager
Expand Down Expand Up @@ -83,6 +87,8 @@ fn inner_scheduler_factory(

fn simple_scheduler_factory(
config: &nativelink_config::schedulers::SimpleScheduler,
store_manager: &StoreManager,
now_fn: fn() -> SystemTime,
) -> Result<SchedulerFactoryResults, Error> {
match config
.experimental_backend
Expand All @@ -100,6 +106,36 @@ fn simple_scheduler_factory(
SimpleScheduler::new(config, awaited_action_db, task_change_notify);
Ok((Some(action_scheduler), Some(worker_scheduler)))
}
ExperimentalSimpleSchedulerBackend::redis(redis_config) => {
let store = store_manager
.get_store(redis_config.redis_store.as_ref())
.err_tip(|| {
format!(
"'redis_store': '{}' does not exist",
redis_config.redis_store
)
})?;
let task_change_notify = Arc::new(Notify::new());
let store = store
.into_inner()
.as_any_arc()
.downcast::<RedisStore>()
.map_err(|_| {
make_input_err!(
"Could not downcast to redis store in RedisAwaitedActionDb::new"
)
})?;
let awaited_action_db = StoreAwaitedActionDb::new(
store,
task_change_notify.clone(),
now_fn,
Default::default,
)
.err_tip(|| "In state_manager_factory::redis_state_manager")?;
let (action_scheduler, worker_scheduler) =
SimpleScheduler::new(config, awaited_action_db, task_change_notify);
Ok((Some(action_scheduler), Some(worker_scheduler)))
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion nativelink-scheduler/src/memory_awaited_action_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,8 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI
ActionUniqueQualifier::Uncachable(_unique_key) => None,
};
let operation_id = OperationId::default();
let awaited_action = AwaitedAction::new(operation_id.clone(), action_info);
let awaited_action =
AwaitedAction::new(operation_id.clone(), action_info, (self.now_fn)().now());
debug_assert!(
ActionStage::Queued == awaited_action.state().stage,
"Expected action to be queued"
Expand Down
1 change: 0 additions & 1 deletion nativelink-scheduler/src/simple_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,6 @@ impl SimpleScheduler {
let worker_change_notify = Arc::new(Notify::new());
let state_manager = SimpleSchedulerStateManager::new(
max_job_retries,
// TODO(allada) This should probably have its own config.
Duration::from_secs(worker_timeout_s),
awaited_action_db,
now_fn,
Expand Down
16 changes: 11 additions & 5 deletions nativelink-scheduler/src/store_awaited_action_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,17 +296,19 @@ impl SchedulerStoreDataProvider for UpdateClientIdToOperationId {
}

#[derive(MetricsComponent)]
pub struct StoreAwaitedActionDb<S: SchedulerStore> {
pub struct StoreAwaitedActionDb<S: SchedulerStore, F: Fn() -> OperationId> {
store: Arc<S>,
now_fn: fn() -> SystemTime,
operation_id_creator: F,
_pull_task_change_subscriber_spawn: JoinHandleDropGuard<()>,
}

impl<S: SchedulerStore> StoreAwaitedActionDb<S> {
impl<S: SchedulerStore, F: Fn() -> OperationId> StoreAwaitedActionDb<S, F> {
pub fn new(
store: Arc<S>,
task_change_publisher: Arc<Notify>,
now_fn: fn() -> SystemTime,
operation_id_creator: F,
) -> Result<Self, Error> {
let mut subscription = store
.subscription_manager()
Expand Down Expand Up @@ -340,6 +342,7 @@ impl<S: SchedulerStore> StoreAwaitedActionDb<S> {
Ok(Self {
store,
now_fn,
operation_id_creator,
_pull_task_change_subscriber_spawn: pull_task_change_subscriber,
})
}
Expand Down Expand Up @@ -409,7 +412,9 @@ impl<S: SchedulerStore> StoreAwaitedActionDb<S> {
}
}

impl<S: SchedulerStore> AwaitedActionDb for StoreAwaitedActionDb<S> {
impl<S: SchedulerStore, F: Fn() -> OperationId + Send + Sync + Unpin + 'static> AwaitedActionDb
for StoreAwaitedActionDb<S, F>
{
type Subscriber = OperationSubscriber<S>;

async fn get_awaited_action_by_id(
Expand Down Expand Up @@ -466,8 +471,9 @@ impl<S: SchedulerStore> AwaitedActionDb for StoreAwaitedActionDb<S> {
return Ok(sub);
}

let new_operation_id = OperationId::default();
let awaited_action = AwaitedAction::new(new_operation_id.clone(), action_info);
let new_operation_id = (self.operation_id_creator)();
let awaited_action =
AwaitedAction::new(new_operation_id.clone(), action_info, (self.now_fn)());
debug_assert!(
ActionStage::Queued == awaited_action.state().stage,
"Expected action to be queued"
Expand Down
Loading

0 comments on commit 3917a90

Please sign in to comment.