Skip to content

Commit

Permalink
Move ClientActionStateResult to SimpleSchedulerStateManager
Browse files Browse the repository at this point in the history
To help with pre-work needed for redis scheduler we make this struct
usable in mods outside of MemoryAwaitedActionDb.

towards #359
  • Loading branch information
allada committed Sep 1, 2024
1 parent 8f26eb7 commit 1a7df05
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 62 deletions.
61 changes: 1 addition & 60 deletions nativelink-scheduler/src/memory_awaited_action_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,16 @@ use std::sync::Arc;
use std::time::Duration;

use async_lock::Mutex;
use async_trait::async_trait;
use futures::{FutureExt, Stream};
use nativelink_config::stores::EvictionPolicy;
use nativelink_error::{error_if, make_err, Code, Error, ResultExt};
use nativelink_metric::MetricsComponent;
use nativelink_util::action_messages::{
ActionInfo, ActionStage, ActionState, ActionUniqueKey, ActionUniqueQualifier, OperationId,
ActionInfo, ActionStage, ActionUniqueKey, ActionUniqueQualifier, OperationId,
};
use nativelink_util::chunked_stream::ChunkedStream;
use nativelink_util::evicting_map::{EvictingMap, LenEntry};
use nativelink_util::instant_wrapper::InstantWrapper;
use nativelink_util::operation_state_manager::ActionStateResult;
use nativelink_util::spawn;
use nativelink_util::task::JoinHandleDropGuard;
use tokio::sync::{mpsc, watch};
Expand Down Expand Up @@ -207,63 +205,6 @@ where
}
}

pub struct MatchingEngineActionStateResult<T: AwaitedActionSubscriber> {
awaited_action_sub: T,
}
impl<T: AwaitedActionSubscriber> MatchingEngineActionStateResult<T> {
pub fn new(awaited_action_sub: T) -> Self {
Self { awaited_action_sub }
}
}

#[async_trait]
impl<T: AwaitedActionSubscriber> ActionStateResult for MatchingEngineActionStateResult<T> {
async fn as_state(&self) -> Result<Arc<ActionState>, Error> {
Ok(self.awaited_action_sub.borrow().state().clone())
}

async fn changed(&mut self) -> Result<Arc<ActionState>, Error> {
let awaited_action = self.awaited_action_sub.changed().await.map_err(|e| {
make_err!(
Code::Internal,
"Failed to wait for awaited action to change {e:?}"
)
})?;
Ok(awaited_action.state().clone())
}

async fn as_action_info(&self) -> Result<Arc<ActionInfo>, Error> {
Ok(self.awaited_action_sub.borrow().action_info().clone())
}
}

pub(crate) struct ClientActionStateResult<T: AwaitedActionSubscriber> {
inner: MatchingEngineActionStateResult<T>,
}

impl<T: AwaitedActionSubscriber> ClientActionStateResult<T> {
pub fn new(sub: T) -> Self {
Self {
inner: MatchingEngineActionStateResult::new(sub),
}
}
}

#[async_trait]
impl<T: AwaitedActionSubscriber> ActionStateResult for ClientActionStateResult<T> {
async fn as_state(&self) -> Result<Arc<ActionState>, Error> {
self.inner.as_state().await
}

async fn changed(&mut self) -> Result<Arc<ActionState>, Error> {
self.inner.changed().await
}

async fn as_action_info(&self) -> Result<Arc<ActionInfo>, Error> {
self.inner.as_action_info().await
}
}

/// A struct that is used to keep the devloper from trying to
/// return early from a function.
struct NoEarlyReturn;
Expand Down
60 changes: 58 additions & 2 deletions nativelink-scheduler/src/simple_scheduler_state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use tracing::{event, Level};
use super::awaited_action_db::{
AwaitedAction, AwaitedActionDb, AwaitedActionSubscriber, SortedAwaitedActionState,
};
use crate::memory_awaited_action_db::{ClientActionStateResult, MatchingEngineActionStateResult};

/// Maximum number of times an update to the database
/// can fail before giving up.
Expand Down Expand Up @@ -123,7 +122,64 @@ fn apply_filter_predicate(awaited_action: &AwaitedAction, filter: &OperationFilt
true
}

/// MemorySchedulerStateManager is responsible for maintaining the state of the scheduler.
pub struct MatchingEngineActionStateResult<T: AwaitedActionSubscriber> {
awaited_action_sub: T,
}
impl<T: AwaitedActionSubscriber> MatchingEngineActionStateResult<T> {
pub fn new(awaited_action_sub: T) -> Self {
Self { awaited_action_sub }
}
}

#[async_trait]
impl<T: AwaitedActionSubscriber> ActionStateResult for MatchingEngineActionStateResult<T> {
async fn as_state(&self) -> Result<Arc<ActionState>, Error> {
Ok(self.awaited_action_sub.borrow().state().clone())
}

async fn changed(&mut self) -> Result<Arc<ActionState>, Error> {
let awaited_action = self.awaited_action_sub.changed().await.map_err(|e| {
make_err!(
Code::Internal,
"Failed to wait for awaited action to change {e:?}"
)
})?;
Ok(awaited_action.state().clone())
}

async fn as_action_info(&self) -> Result<Arc<ActionInfo>, Error> {
Ok(self.awaited_action_sub.borrow().action_info().clone())
}
}

pub(crate) struct ClientActionStateResult<T: AwaitedActionSubscriber> {
inner: MatchingEngineActionStateResult<T>,
}

impl<T: AwaitedActionSubscriber> ClientActionStateResult<T> {
pub fn new(sub: T) -> Self {
Self {
inner: MatchingEngineActionStateResult::new(sub),
}
}
}

#[async_trait]
impl<T: AwaitedActionSubscriber> ActionStateResult for ClientActionStateResult<T> {
async fn as_state(&self) -> Result<Arc<ActionState>, Error> {
self.inner.as_state().await
}

async fn changed(&mut self) -> Result<Arc<ActionState>, Error> {
self.inner.changed().await
}

async fn as_action_info(&self) -> Result<Arc<ActionInfo>, Error> {
self.inner.as_action_info().await
}
}

/// SimpleSchedulerStateManager is responsible for maintaining the state of the scheduler.
/// Scheduler state includes the actions that are queued, active, and recently completed.
/// It also includes the workers that are available to execute actions based on allocation
/// strategy.
Expand Down

0 comments on commit 1a7df05

Please sign in to comment.