Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Add Redis Scheduler #1343

Merged
merged 1 commit into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions nativelink-config/src/schedulers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,16 @@ pub struct SimpleScheduler {
pub enum ExperimentalSimpleSchedulerBackend {
/// Use an in-memory store for the scheduler.
memory,
/// Use a redis store for the scheduler.
redis(ExperimentalRedisSchedulerBackend),
}

#[derive(Deserialize, Debug, Default)]
#[serde(deny_unknown_fields)]
pub struct ExperimentalRedisSchedulerBackend {
/// A reference to the redis store to use for the scheduler.
/// Note: This MUST resolve to a RedisStore.
pub redis_store: StoreRefName,
}

/// A scheduler that simply forwards requests to an upstream scheduler. This
Expand Down
5 changes: 5 additions & 0 deletions nativelink-scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ rust_test_suite(
"tests/action_messages_test.rs",
"tests/cache_lookup_scheduler_test.rs",
"tests/property_modifier_scheduler_test.rs",
"tests/redis_store_awaited_action_db_test.rs",
"tests/simple_scheduler_test.rs",
],
compile_data = [
Expand All @@ -79,10 +80,14 @@ rust_test_suite(
"//nativelink-store",
"//nativelink-util",
"@crates//:async-lock",
"@crates//:bytes",
"@crates//:fred",
"@crates//:futures",
"@crates//:mock_instant",
"@crates//:parking_lot",
"@crates//:pretty_assertions",
"@crates//:prost",
"@crates//:serde_json",
"@crates//:tokio",
"@crates//:tokio-stream",
"@crates//:uuid",
Expand Down
3 changes: 3 additions & 0 deletions nativelink-scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,6 @@ static_assertions = "1.1.0"
[dev-dependencies]
nativelink-macro = { path = "../nativelink-macro" }
pretty_assertions = { version = "1.4.0", features = ["std"] }
fred = { version = "9.1.2", default-features = false, features = [
"mocks",
] }
14 changes: 7 additions & 7 deletions nativelink-scheduler/src/awaited_action_db/awaited_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use static_assertions::{assert_eq_size, const_assert, const_assert_eq};
/// The version of the awaited action.
/// This number will always increment by one each time
/// the action is updated.
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
struct AwaitedActionVersion(u64);

impl MetricsComponent for AwaitedActionVersion {
Expand Down Expand Up @@ -80,7 +80,7 @@ pub struct AwaitedAction {
}

impl AwaitedAction {
pub fn new(operation_id: OperationId, action_info: Arc<ActionInfo>) -> Self {
pub fn new(operation_id: OperationId, action_info: Arc<ActionInfo>, now: SystemTime) -> Self {
let stage = ActionStage::Queued;
let sort_key = AwaitedActionSortKey::new_with_unique_key(
action_info.priority,
Expand All @@ -102,7 +102,7 @@ impl AwaitedAction {
operation_id,
sort_key,
attempts: 0,
last_worker_updated_timestamp: SystemTime::now(),
last_worker_updated_timestamp: now,
worker_id: None,
state,
}
Expand All @@ -120,19 +120,19 @@ impl AwaitedAction {
self.version = AwaitedActionVersion(self.version.0 + 1);
}

pub(crate) fn action_info(&self) -> &Arc<ActionInfo> {
pub fn action_info(&self) -> &Arc<ActionInfo> {
&self.action_info
}

pub(crate) fn operation_id(&self) -> &OperationId {
pub fn operation_id(&self) -> &OperationId {
&self.operation_id
}

pub(crate) fn sort_key(&self) -> AwaitedActionSortKey {
self.sort_key
}

pub(crate) fn state(&self) -> &Arc<ActionState> {
pub fn state(&self) -> &Arc<ActionState> {
&self.state
}

Expand All @@ -158,7 +158,7 @@ impl AwaitedAction {

/// Sets the current state of the action and notifies subscribers.
/// Returns true if the state was set, false if there are no subscribers.
pub(crate) fn set_state(&mut self, mut state: Arc<ActionState>, now: Option<SystemTime>) {
pub fn set_state(&mut self, mut state: Arc<ActionState>, now: Option<SystemTime>) {
std::mem::swap(&mut self.state, &mut state);
if let Some(now) = now {
self.keep_alive(now);
Expand Down
40 changes: 38 additions & 2 deletions nativelink-scheduler/src/default_scheduler_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use std::time::SystemTime;

use nativelink_config::schedulers::{ExperimentalSimpleSchedulerBackend, SchedulerConfig};
use nativelink_config::stores::EvictionPolicy;
use nativelink_error::{Error, ResultExt};
use nativelink_error::{make_input_err, Error, ResultExt};
use nativelink_store::redis_store::RedisStore;
use nativelink_store::store_manager::StoreManager;
use nativelink_util::instant_wrapper::InstantWrapper;
use nativelink_util::operation_state_manager::ClientStateManager;
Expand All @@ -28,6 +29,7 @@ use crate::grpc_scheduler::GrpcScheduler;
use crate::memory_awaited_action_db::MemoryAwaitedActionDb;
use crate::property_modifier_scheduler::PropertyModifierScheduler;
use crate::simple_scheduler::SimpleScheduler;
use crate::store_awaited_action_db::StoreAwaitedActionDb;
use crate::worker_scheduler::WorkerScheduler;

/// Default timeout for recently completed actions in seconds.
Expand All @@ -51,7 +53,9 @@ fn inner_scheduler_factory(
store_manager: &StoreManager,
) -> Result<SchedulerFactoryResults, Error> {
let scheduler: SchedulerFactoryResults = match scheduler_type_cfg {
SchedulerConfig::simple(config) => simple_scheduler_factory(config)?,
SchedulerConfig::simple(config) => {
simple_scheduler_factory(config, store_manager, SystemTime::now)?
}
SchedulerConfig::grpc(config) => (Some(Arc::new(GrpcScheduler::new(config)?)), None),
SchedulerConfig::cache_lookup(config) => {
let ac_store = store_manager
Expand Down Expand Up @@ -83,6 +87,8 @@ fn inner_scheduler_factory(

fn simple_scheduler_factory(
config: &nativelink_config::schedulers::SimpleScheduler,
store_manager: &StoreManager,
now_fn: fn() -> SystemTime,
) -> Result<SchedulerFactoryResults, Error> {
match config
.experimental_backend
Expand All @@ -100,6 +106,36 @@ fn simple_scheduler_factory(
SimpleScheduler::new(config, awaited_action_db, task_change_notify);
Ok((Some(action_scheduler), Some(worker_scheduler)))
}
ExperimentalSimpleSchedulerBackend::redis(redis_config) => {
let store = store_manager
.get_store(redis_config.redis_store.as_ref())
.err_tip(|| {
format!(
"'redis_store': '{}' does not exist",
redis_config.redis_store
)
})?;
let task_change_notify = Arc::new(Notify::new());
let store = store
.into_inner()
.as_any_arc()
.downcast::<RedisStore>()
.map_err(|_| {
make_input_err!(
"Could not downcast to redis store in RedisAwaitedActionDb::new"
)
})?;
let awaited_action_db = StoreAwaitedActionDb::new(
store,
task_change_notify.clone(),
now_fn,
Default::default,
)
.err_tip(|| "In state_manager_factory::redis_state_manager")?;
let (action_scheduler, worker_scheduler) =
SimpleScheduler::new(config, awaited_action_db, task_change_notify);
Ok((Some(action_scheduler), Some(worker_scheduler)))
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion nativelink-scheduler/src/memory_awaited_action_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,8 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI
ActionUniqueQualifier::Uncachable(_unique_key) => None,
};
let operation_id = OperationId::default();
let awaited_action = AwaitedAction::new(operation_id.clone(), action_info);
let awaited_action =
AwaitedAction::new(operation_id.clone(), action_info, (self.now_fn)().now());
debug_assert!(
ActionStage::Queued == awaited_action.state().stage,
"Expected action to be queued"
Expand Down
1 change: 0 additions & 1 deletion nativelink-scheduler/src/simple_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,6 @@ impl SimpleScheduler {
let worker_change_notify = Arc::new(Notify::new());
let state_manager = SimpleSchedulerStateManager::new(
max_job_retries,
// TODO(allada) This should probably have its own config.
Duration::from_secs(worker_timeout_s),
awaited_action_db,
now_fn,
Expand Down
16 changes: 11 additions & 5 deletions nativelink-scheduler/src/store_awaited_action_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,17 +296,19 @@ impl SchedulerStoreDataProvider for UpdateClientIdToOperationId {
}

#[derive(MetricsComponent)]
pub struct StoreAwaitedActionDb<S: SchedulerStore> {
pub struct StoreAwaitedActionDb<S: SchedulerStore, F: Fn() -> OperationId> {
store: Arc<S>,
now_fn: fn() -> SystemTime,
operation_id_creator: F,
_pull_task_change_subscriber_spawn: JoinHandleDropGuard<()>,
}

impl<S: SchedulerStore> StoreAwaitedActionDb<S> {
impl<S: SchedulerStore, F: Fn() -> OperationId> StoreAwaitedActionDb<S, F> {
pub fn new(
store: Arc<S>,
task_change_publisher: Arc<Notify>,
now_fn: fn() -> SystemTime,
operation_id_creator: F,
) -> Result<Self, Error> {
let mut subscription = store
.subscription_manager()
Expand Down Expand Up @@ -340,6 +342,7 @@ impl<S: SchedulerStore> StoreAwaitedActionDb<S> {
Ok(Self {
store,
now_fn,
operation_id_creator,
_pull_task_change_subscriber_spawn: pull_task_change_subscriber,
})
}
Expand Down Expand Up @@ -409,7 +412,9 @@ impl<S: SchedulerStore> StoreAwaitedActionDb<S> {
}
}

impl<S: SchedulerStore> AwaitedActionDb for StoreAwaitedActionDb<S> {
impl<S: SchedulerStore, F: Fn() -> OperationId + Send + Sync + Unpin + 'static> AwaitedActionDb
for StoreAwaitedActionDb<S, F>
{
type Subscriber = OperationSubscriber<S>;

async fn get_awaited_action_by_id(
Expand Down Expand Up @@ -466,8 +471,9 @@ impl<S: SchedulerStore> AwaitedActionDb for StoreAwaitedActionDb<S> {
return Ok(sub);
}

let new_operation_id = OperationId::default();
let awaited_action = AwaitedAction::new(new_operation_id.clone(), action_info);
let new_operation_id = (self.operation_id_creator)();
let awaited_action =
AwaitedAction::new(new_operation_id.clone(), action_info, (self.now_fn)());
debug_assert!(
ActionStage::Queued == awaited_action.state().stage,
"Expected action to be queued"
Expand Down
Loading