Skip to content

Commit

Permalink
Migrate much of the ActionScheduler API to ClientStateManager API (Tr…
Browse files Browse the repository at this point in the history
…aceMachina#1241)

Mostly a cosmetic change to move the compatible parts of
ActionSchedulers to use ClientStateManager API instead and implement all
related requirements to existing schedulers.

towards TraceMachina#1213
  • Loading branch information
allada authored Aug 7, 2024
1 parent d57ee8d commit 2b8f1ee
Show file tree
Hide file tree
Showing 13 changed files with 287 additions and 166 deletions.
1 change: 1 addition & 0 deletions nativelink-scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ rust_test_suite(
"@crates//:pretty_assertions",
"@crates//:prost",
"@crates//:tokio",
"@crates//:tokio-stream",
"@crates//:uuid",
],
)
Expand Down
20 changes: 4 additions & 16 deletions nativelink-scheduler/src/action_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,19 @@ use std::sync::Arc;
use async_trait::async_trait;
use nativelink_error::Error;
use nativelink_metric::RootMetricsComponent;
use nativelink_util::action_messages::{ActionInfo, OperationId};
use nativelink_util::operation_state_manager::ActionStateResult;
use nativelink_util::operation_state_manager::ClientStateManager;

use crate::platform_property_manager::PlatformPropertyManager;

/// ActionScheduler interface is responsible for interactions between the scheduler
/// and action related operations.
#[async_trait]
pub trait ActionScheduler: Sync + Send + Unpin + RootMetricsComponent + 'static {
pub trait ActionScheduler:
ClientStateManager + Sync + Send + Unpin + RootMetricsComponent + 'static
{
/// Returns the platform property manager.
async fn get_platform_property_manager(
&self,
instance_name: &str,
) -> Result<Arc<PlatformPropertyManager>, Error>;

/// Adds an action to the scheduler for remote execution.
async fn add_action(
&self,
client_operation_id: OperationId,
action_info: ActionInfo,
) -> Result<Box<dyn ActionStateResult>, Error>;

/// Find an existing action by its name.
async fn find_by_client_operation_id(
&self,
client_operation_id: &OperationId,
) -> Result<Option<Box<dyn ActionStateResult>>, Error>;
}
60 changes: 41 additions & 19 deletions nativelink-scheduler/src/cache_lookup_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ use nativelink_util::action_messages::{
use nativelink_util::background_spawn;
use nativelink_util::common::DigestInfo;
use nativelink_util::digest_hasher::DigestHasherFunc;
use nativelink_util::operation_state_manager::ActionStateResult;
use nativelink_util::operation_state_manager::{
ActionStateResult, ActionStateResultStream, ClientStateManager, OperationFilter,
};
use nativelink_util::store_trait::Store;
use parking_lot::{Mutex, MutexGuard};
use scopeguard::guard;
Expand Down Expand Up @@ -147,23 +149,11 @@ impl CacheLookupScheduler {
inflight_cache_checks: Default::default(),
})
}
}

#[async_trait]
impl ActionScheduler for CacheLookupScheduler {
async fn get_platform_property_manager(
&self,
instance_name: &str,
) -> Result<Arc<PlatformPropertyManager>, Error> {
self.action_scheduler
.get_platform_property_manager(instance_name)
.await
}

async fn add_action(
async fn inner_add_action(
&self,
client_operation_id: OperationId,
action_info: ActionInfo,
action_info: Arc<ActionInfo>,
) -> Result<Box<dyn ActionStateResult>, Error> {
let unique_key = match &action_info.unique_qualifier {
ActionUniqueQualifier::Cachable(unique_key) => unique_key.clone(),
Expand Down Expand Up @@ -320,14 +310,46 @@ impl ActionScheduler for CacheLookupScheduler {
.err_tip(|| "In CacheLookupScheduler::add_action")
}

async fn find_by_client_operation_id(
async fn inner_filter_operations(
&self,
filter: OperationFilter,
) -> Result<ActionStateResultStream, Error> {
self.action_scheduler
.filter_operations(filter)
.await
.err_tip(|| "In CacheLookupScheduler::filter_operations")
}
}

#[async_trait]
impl ActionScheduler for CacheLookupScheduler {
async fn get_platform_property_manager(
&self,
client_operation_id: &OperationId,
) -> Result<Option<Box<dyn ActionStateResult>>, Error> {
instance_name: &str,
) -> Result<Arc<PlatformPropertyManager>, Error> {
self.action_scheduler
.find_by_client_operation_id(client_operation_id)
.get_platform_property_manager(instance_name)
.await
}
}

#[async_trait]
impl ClientStateManager for CacheLookupScheduler {
async fn add_action(
&self,
client_operation_id: OperationId,
action_info: Arc<ActionInfo>,
) -> Result<Box<dyn ActionStateResult>, Error> {
self.inner_add_action(client_operation_id, action_info)
.await
}

async fn filter_operations(
&self,
filter: OperationFilter,
) -> Result<ActionStateResultStream, Error> {
self.inner_filter_operations(filter).await
}
}

impl RootMetricsComponent for CacheLookupScheduler {}
68 changes: 54 additions & 14 deletions nativelink-scheduler/src/grpc_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use std::time::Duration;

use async_trait::async_trait;
use futures::stream::unfold;
use futures::TryFutureExt;
use nativelink_error::{make_err, Code, Error, ResultExt};
use futures::{StreamExt, TryFutureExt};
use nativelink_error::{error_if, make_err, Code, Error, ResultExt};
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
use nativelink_proto::build::bazel::remote::execution::v2::capabilities_client::CapabilitiesClient;
use nativelink_proto::build::bazel::remote::execution::v2::execution_client::ExecutionClient;
Expand All @@ -32,7 +32,9 @@ use nativelink_util::action_messages::{
ActionInfo, ActionState, ActionUniqueQualifier, OperationId, DEFAULT_EXECUTION_PRIORITY,
};
use nativelink_util::connection_manager::ConnectionManager;
use nativelink_util::operation_state_manager::ActionStateResult;
use nativelink_util::operation_state_manager::{
ActionStateResult, ActionStateResultStream, ClientStateManager, OperationFilter,
};
use nativelink_util::retry::{Retrier, RetryResult};
use nativelink_util::{background_spawn, tls_utils};
use parking_lot::Mutex;
Expand Down Expand Up @@ -217,11 +219,8 @@ impl GrpcScheduler {
"Upstream scheduler didn't accept action."
))
}
}

#[async_trait]
impl ActionScheduler for GrpcScheduler {
async fn get_platform_property_manager(
async fn inner_get_platform_property_manager(
&self,
instance_name: &str,
) -> Result<Arc<PlatformPropertyManager>, Error> {
Expand Down Expand Up @@ -268,10 +267,10 @@ impl ActionScheduler for GrpcScheduler {
.await
}

async fn add_action(
async fn inner_add_action(
&self,
_client_operation_id: OperationId,
action_info: ActionInfo,
action_info: Arc<ActionInfo>,
) -> Result<Box<dyn ActionStateResult>, Error> {
let execution_policy = if action_info.priority == DEFAULT_EXECUTION_PRIORITY {
None
Expand Down Expand Up @@ -314,10 +313,17 @@ impl ActionScheduler for GrpcScheduler {
Self::stream_state(result_stream).await
}

async fn find_by_client_operation_id(
async fn inner_filter_operations(
&self,
client_operation_id: &OperationId,
) -> Result<Option<Box<dyn ActionStateResult>>, Error> {
filter: OperationFilter,
) -> Result<ActionStateResultStream, Error> {
error_if!(filter != OperationFilter {
client_operation_id: filter.client_operation_id.clone(),
..Default::default()
}, "Unsupported filter in GrpcScheduler::filter_operations. Only client_operation_id is supported - {filter:?}");
let client_operation_id = filter.client_operation_id.ok_or_else(|| {
make_err!(Code::InvalidArgument, "`client_operation_id` is the only supported filter in GrpcScheduler::filter_operations")
})?;
let request = WaitExecutionRequest {
name: client_operation_id.to_string(),
};
Expand All @@ -336,17 +342,51 @@ impl ActionScheduler for GrpcScheduler {
.and_then(|result_stream| Self::stream_state(result_stream.into_inner()))
.await;
match result_stream {
Ok(result_stream) => Ok(Some(result_stream)),
Ok(result_stream) => Ok(unfold(
Some(result_stream),
|maybe_result_stream| async move { maybe_result_stream.map(|v| (v, None)) },
)
.boxed()),
Err(err) => {
event!(
Level::WARN,
?err,
"Error looking up action with upstream scheduler"
);
Ok(None)
Ok(futures::stream::empty().boxed())
}
}
}
}

#[async_trait]
impl ClientStateManager for GrpcScheduler {
async fn add_action(
&self,
client_operation_id: OperationId,
action_info: Arc<ActionInfo>,
) -> Result<Box<dyn ActionStateResult>, Error> {
self.inner_add_action(client_operation_id, action_info)
.await
}

async fn filter_operations<'a>(
&'a self,
filter: OperationFilter,
) -> Result<ActionStateResultStream<'a>, Error> {
self.inner_filter_operations(filter).await
}
}

#[async_trait]
impl ActionScheduler for GrpcScheduler {
async fn get_platform_property_manager(
&self,
instance_name: &str,
) -> Result<Arc<PlatformPropertyManager>, Error> {
self.inner_get_platform_property_manager(instance_name)
.await
}
}

impl RootMetricsComponent for GrpcScheduler {}
58 changes: 43 additions & 15 deletions nativelink-scheduler/src/property_modifier_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use nativelink_config::schedulers::{PropertyModification, PropertyType};
use nativelink_error::{Error, ResultExt};
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
use nativelink_util::action_messages::{ActionInfo, OperationId};
use nativelink_util::operation_state_manager::ActionStateResult;
use nativelink_util::operation_state_manager::{
ActionStateResult, ActionStateResultStream, ClientStateManager, OperationFilter,
};
use parking_lot::Mutex;

use crate::action_scheduler::ActionScheduler;
Expand All @@ -47,11 +49,8 @@ impl PropertyModifierScheduler {
property_managers: Mutex::new(HashMap::new()),
}
}
}

#[async_trait]
impl ActionScheduler for PropertyModifierScheduler {
async fn get_platform_property_manager(
async fn inner_get_platform_property_manager(
&self,
instance_name: &str,
) -> Result<Arc<PlatformPropertyManager>, Error> {
Expand Down Expand Up @@ -91,27 +90,28 @@ impl ActionScheduler for PropertyModifierScheduler {
Ok(property_manager)
}

async fn add_action(
async fn inner_add_action(
&self,
client_operation_id: OperationId,
mut action_info: ActionInfo,
mut action_info: Arc<ActionInfo>,
) -> Result<Box<dyn ActionStateResult>, Error> {
let platform_property_manager = self
.get_platform_property_manager(action_info.unique_qualifier.instance_name())
.inner_get_platform_property_manager(action_info.unique_qualifier.instance_name())
.await
.err_tip(|| "In PropertyModifierScheduler::add_action")?;
let action_info_mut = Arc::make_mut(&mut action_info);
for modification in &self.modifications {
match modification {
PropertyModification::add(addition) => {
action_info.platform_properties.properties.insert(
action_info_mut.platform_properties.properties.insert(
addition.name.clone(),
platform_property_manager
.make_prop_value(&addition.name, &addition.value)
.err_tip(|| "In PropertyModifierScheduler::add_action")?,
)
}
PropertyModification::remove(name) => {
action_info.platform_properties.properties.remove(name)
action_info_mut.platform_properties.properties.remove(name)
}
};
}
Expand All @@ -120,14 +120,42 @@ impl ActionScheduler for PropertyModifierScheduler {
.await
}

async fn find_by_client_operation_id(
async fn inner_filter_operations(
&self,
client_operation_id: &OperationId,
) -> Result<Option<Box<dyn ActionStateResult>>, Error> {
self.scheduler
.find_by_client_operation_id(client_operation_id)
filter: OperationFilter,
) -> Result<ActionStateResultStream, Error> {
self.scheduler.filter_operations(filter).await
}
}

#[async_trait]
impl ActionScheduler for PropertyModifierScheduler {
async fn get_platform_property_manager(
&self,
instance_name: &str,
) -> Result<Arc<PlatformPropertyManager>, Error> {
self.inner_get_platform_property_manager(instance_name)
.await
}
}

#[async_trait]
impl ClientStateManager for PropertyModifierScheduler {
async fn add_action(
&self,
client_operation_id: OperationId,
action_info: Arc<ActionInfo>,
) -> Result<Box<dyn ActionStateResult>, Error> {
self.inner_add_action(client_operation_id, action_info)
.await
}

async fn filter_operations<'a>(
&'a self,
filter: OperationFilter,
) -> Result<ActionStateResultStream<'a>, Error> {
self.inner_filter_operations(filter).await
}
}

impl RootMetricsComponent for PropertyModifierScheduler {}
Loading

0 comments on commit 2b8f1ee

Please sign in to comment.