Skip to content

Commit

Permalink
Prepare scheduler config & move owner of notify task change owner
Browse files Browse the repository at this point in the history
It is easier to do this in one PR than in two because they are dependent
on each other.

1. Moves where notifications happen in the scheduler to happen instead
   in the underlying AwaitedActionDb.
2. Prepare the config changes (non-breaking).

towards #359
  • Loading branch information
allada committed Sep 5, 2024
1 parent 4b45662 commit 627d6c9
Show file tree
Hide file tree
Showing 6 changed files with 275 additions and 118 deletions.
11 changes: 11 additions & 0 deletions nativelink-config/src/schedulers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,17 @@ pub struct SimpleScheduler {
/// The strategy used to assign workers jobs.
#[serde(default)]
pub allocation_strategy: WorkerAllocationStrategy,

/// The storage backend to use for the scheduler.
/// Default: memory
pub experimental_backend: Option<ExperimentalSimpleSchedulerBackend>,
}

#[allow(non_camel_case_types)]
#[derive(Deserialize, Debug)]
pub enum ExperimentalSimpleSchedulerBackend {
/// Use an in-memory store for the scheduler.
memory,
}

/// A scheduler that simply forwards requests to an upstream scheduler. This
Expand Down
60 changes: 55 additions & 5 deletions nativelink-scheduler/src/default_scheduler_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,27 @@
// limitations under the License.

use std::sync::Arc;
use std::time::SystemTime;

use nativelink_config::schedulers::SchedulerConfig;
use nativelink_config::schedulers::{ExperimentalSimpleSchedulerBackend, SchedulerConfig};
use nativelink_config::stores::EvictionPolicy;
use nativelink_error::{Error, ResultExt};
use nativelink_store::store_manager::StoreManager;
use nativelink_util::instant_wrapper::InstantWrapper;
use nativelink_util::operation_state_manager::ClientStateManager;
use tokio::sync::Notify;

use crate::cache_lookup_scheduler::CacheLookupScheduler;
use crate::grpc_scheduler::GrpcScheduler;
use crate::memory_awaited_action_db::MemoryAwaitedActionDb;
use crate::property_modifier_scheduler::PropertyModifierScheduler;
use crate::simple_scheduler::SimpleScheduler;
use crate::worker_scheduler::WorkerScheduler;

/// Default timeout for recently completed actions in seconds.
/// If this changes, remember to change the documentation in the config.
const DEFAULT_RETAIN_COMPLETED_FOR_S: u32 = 60;

pub type SchedulerFactoryResults = (
Option<Arc<dyn ClientStateManager>>,
Option<Arc<dyn WorkerScheduler>>,
Expand All @@ -42,10 +51,7 @@ fn inner_scheduler_factory(
store_manager: &StoreManager,
) -> Result<SchedulerFactoryResults, Error> {
let scheduler: SchedulerFactoryResults = match scheduler_type_cfg {
SchedulerConfig::simple(config) => {
let (action_scheduler, worker_scheduler) = SimpleScheduler::new(config);
(Some(action_scheduler), Some(worker_scheduler))
}
SchedulerConfig::simple(config) => simple_scheduler_factory(config)?,
SchedulerConfig::grpc(config) => (Some(Arc::new(GrpcScheduler::new(config)?)), None),
SchedulerConfig::cache_lookup(config) => {
let ac_store = store_manager
Expand Down Expand Up @@ -74,3 +80,47 @@ fn inner_scheduler_factory(

Ok(scheduler)
}

fn simple_scheduler_factory(
config: &nativelink_config::schedulers::SimpleScheduler,
) -> Result<SchedulerFactoryResults, Error> {
match config
.experimental_backend
.as_ref()
.unwrap_or(&ExperimentalSimpleSchedulerBackend::memory)
{
ExperimentalSimpleSchedulerBackend::memory => {
let task_change_notify = Arc::new(Notify::new());
let awaited_action_db = memory_awaited_action_db_factory(
config.retain_completed_for_s,
task_change_notify.clone(),
SystemTime::now,
);
let (action_scheduler, worker_scheduler) =
SimpleScheduler::new(config, awaited_action_db, task_change_notify);
Ok((Some(action_scheduler), Some(worker_scheduler)))
}
}
}

pub fn memory_awaited_action_db_factory<I, NowFn>(
mut retain_completed_for_s: u32,
task_change_notify: Arc<Notify>,
now_fn: NowFn,
) -> MemoryAwaitedActionDb<I, NowFn>
where
I: InstantWrapper,
NowFn: Fn() -> I + Clone + Send + Sync + 'static,
{
if retain_completed_for_s == 0 {
retain_completed_for_s = DEFAULT_RETAIN_COMPLETED_FOR_S;
}
MemoryAwaitedActionDb::new(
&EvictionPolicy {
max_seconds: retain_completed_for_s,
..Default::default()
},
task_change_notify.clone(),
now_fn,
)
}
26 changes: 19 additions & 7 deletions nativelink-scheduler/src/memory_awaited_action_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use nativelink_util::evicting_map::{EvictingMap, LenEntry};
use nativelink_util::instant_wrapper::InstantWrapper;
use nativelink_util::spawn;
use nativelink_util::task::JoinHandleDropGuard;
use tokio::sync::{mpsc, watch};
use tokio::sync::{mpsc, watch, Notify};
use tracing::{event, Level};

use crate::awaited_action_db::{
Expand Down Expand Up @@ -125,14 +125,15 @@ pub struct MemoryAwaitedActionSubscriber<I: InstantWrapper, NowFn: Fn() -> I> {
}

impl<I: InstantWrapper, NowFn: Fn() -> I> MemoryAwaitedActionSubscriber<I, NowFn> {
pub fn new(mut awaited_action_rx: watch::Receiver<AwaitedAction>) -> Self {
fn new(mut awaited_action_rx: watch::Receiver<AwaitedAction>) -> Self {
awaited_action_rx.mark_changed();
Self {
awaited_action_rx,
client_info: None,
}
}
pub fn new_with_client(

fn new_with_client(
mut awaited_action_rx: watch::Receiver<AwaitedAction>,
client_operation_id: OperationId,
event_tx: mpsc::UnboundedSender<ActionEvent>,
Expand Down Expand Up @@ -799,13 +800,18 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI
pub struct MemoryAwaitedActionDb<I: InstantWrapper, NowFn: Fn() -> I> {
#[metric]
inner: Arc<Mutex<AwaitedActionDbImpl<I, NowFn>>>,
tasks_change_notify: Arc<Notify>,
_handle_awaited_action_events: JoinHandleDropGuard<()>,
}

impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync + 'static>
MemoryAwaitedActionDb<I, NowFn>
{
pub fn new(eviction_config: &EvictionPolicy, now_fn: NowFn) -> Self {
pub fn new(
eviction_config: &EvictionPolicy,
tasks_change_notify: Arc<Notify>,
now_fn: NowFn,
) -> Self {
let (action_event_tx, mut action_event_rx) = mpsc::unbounded_channel();
let inner = Arc::new(Mutex::new(AwaitedActionDbImpl {
client_operation_to_awaited_action: EvictingMap::new(eviction_config, (now_fn)()),
Expand All @@ -819,6 +825,7 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync + 'static>
let weak_inner = Arc::downgrade(&inner);
Self {
inner,
tasks_change_notify,
_handle_awaited_action_events: spawn!("handle_awaited_action_events", async move {
let mut dropped_operation_ids = Vec::with_capacity(MAX_ACTION_EVENTS_RX_PER_CYCLE);
loop {
Expand Down Expand Up @@ -927,18 +934,23 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync + 'static> Awaite
self.inner
.lock()
.await
.update_awaited_action(new_awaited_action)
.update_awaited_action(new_awaited_action)?;
self.tasks_change_notify.notify_one();
Ok(())
}

async fn add_action(
&self,
client_operation_id: OperationId,
action_info: Arc<ActionInfo>,
) -> Result<Self::Subscriber, Error> {
self.inner
let subscriber = self
.inner
.lock()
.await
.add_action(client_operation_id, action_info)
.await
.await?;
self.tasks_change_notify.notify_one();
Ok(subscriber)
}
}
50 changes: 18 additions & 32 deletions nativelink-scheduler/src/simple_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,14 @@
// limitations under the License.

use std::sync::Arc;
use std::time::SystemTime;

use async_trait::async_trait;
use futures::Future;
use nativelink_config::stores::EvictionPolicy;
use nativelink_error::{Error, ResultExt};
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
use nativelink_util::action_messages::{
ActionInfo, ActionStage, ActionState, OperationId, WorkerId,
};
use nativelink_util::instant_wrapper::InstantWrapper;
use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider;
use nativelink_util::operation_state_manager::{
ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager,
Expand All @@ -37,7 +34,7 @@ use tokio_stream::StreamExt;
use tracing::{event, Level};

use crate::api_worker_scheduler::ApiWorkerScheduler;
use crate::memory_awaited_action_db::MemoryAwaitedActionDb;
use crate::awaited_action_db::AwaitedActionDb;
use crate::platform_property_manager::PlatformPropertyManager;
use crate::simple_scheduler_state_manager::SimpleSchedulerStateManager;
use crate::worker::{ActionInfoWithProps, Worker, WorkerTimestamp};
Expand All @@ -47,10 +44,6 @@ use crate::worker_scheduler::WorkerScheduler;
/// If this changes, remember to change the documentation in the config.
const DEFAULT_WORKER_TIMEOUT_S: u64 = 5;

/// Default timeout for recently completed actions in seconds.
/// If this changes, remember to change the documentation in the config.
const DEFAULT_RETAIN_COMPLETED_FOR_S: u32 = 60;

/// Default times a job can retry before failing.
/// If this changes, remember to change the documentation in the config.
const DEFAULT_MAX_JOB_RETRIES: usize = 3;
Expand Down Expand Up @@ -269,11 +262,14 @@ impl SimpleScheduler {
}

impl SimpleScheduler {
pub fn new(
pub fn new<A: AwaitedActionDb>(
scheduler_cfg: &nativelink_config::schedulers::SimpleScheduler,
awaited_action_db: A,
task_change_notify: Arc<Notify>,
) -> (Arc<Self>, Arc<dyn WorkerScheduler>) {
Self::new_with_callback(
scheduler_cfg,
awaited_action_db,
|| {
// The cost of running `do_try_match()` is very high, but constant
// in relation to the number of changes that have happened. This
Expand All @@ -285,19 +281,19 @@ impl SimpleScheduler {
// scheduled within a future.
tokio::time::sleep(Duration::from_millis(1))
},
SystemTime::now,
task_change_notify,
)
}

pub fn new_with_callback<
Fut: Future<Output = ()> + Send,
F: Fn() -> Fut + Send + Sync + 'static,
I: InstantWrapper,
NowFn: Fn() -> I + Clone + Send + Sync + 'static,
A: AwaitedActionDb,
>(
scheduler_cfg: &nativelink_config::schedulers::SimpleScheduler,
awaited_action_db: A,
on_matching_engine_run: F,
now_fn: NowFn,
task_change_notify: Arc<Notify>,
) -> (Arc<Self>, Arc<dyn WorkerScheduler>) {
let platform_property_manager = Arc::new(PlatformPropertyManager::new(
scheduler_cfg
Expand All @@ -311,34 +307,19 @@ impl SimpleScheduler {
worker_timeout_s = DEFAULT_WORKER_TIMEOUT_S;
}

let mut retain_completed_for_s = scheduler_cfg.retain_completed_for_s;
if retain_completed_for_s == 0 {
retain_completed_for_s = DEFAULT_RETAIN_COMPLETED_FOR_S;
}

let mut max_job_retries = scheduler_cfg.max_job_retries;
if max_job_retries == 0 {
max_job_retries = DEFAULT_MAX_JOB_RETRIES;
}

let tasks_or_worker_change_notify = Arc::new(Notify::new());
let state_manager = SimpleSchedulerStateManager::new(
tasks_or_worker_change_notify.clone(),
max_job_retries,
MemoryAwaitedActionDb::new(
&EvictionPolicy {
max_seconds: retain_completed_for_s,
..Default::default()
},
now_fn,
),
);
let worker_change_notify = Arc::new(Notify::new());
let state_manager = SimpleSchedulerStateManager::new(max_job_retries, awaited_action_db);

let worker_scheduler = ApiWorkerScheduler::new(
state_manager.clone(),
platform_property_manager.clone(),
scheduler_cfg.allocation_strategy,
tasks_or_worker_change_notify.clone(),
worker_change_notify.clone(),
worker_timeout_s,
);

Expand All @@ -350,7 +331,12 @@ impl SimpleScheduler {
spawn!("simple_scheduler_task_worker_matching", async move {
// Break out of the loop only when the inner is dropped.
loop {
tasks_or_worker_change_notify.notified().await;
let task_change_fut = task_change_notify.notified();
let worker_change_fut = worker_change_notify.notified();
tokio::pin!(task_change_fut);
tokio::pin!(worker_change_fut);
// Wait for either of these futures to be ready.
let _ = futures::future::select(task_change_fut, worker_change_fut).await;
let result = match weak_inner.upgrade() {
Some(scheduler) => scheduler.do_try_match().await,
// If the inner went away it means the scheduler is shutting
Expand Down
Loading

0 comments on commit 627d6c9

Please sign in to comment.