diff --git a/nativelink-scheduler/src/memory_awaited_action_db.rs b/nativelink-scheduler/src/memory_awaited_action_db.rs index f2364dbf4..922664ad8 100644 --- a/nativelink-scheduler/src/memory_awaited_action_db.rs +++ b/nativelink-scheduler/src/memory_awaited_action_db.rs @@ -96,6 +96,7 @@ impl LenEntry for ClientAwaitedAction { } /// Actions the AwaitedActionsDb needs to process. +#[derive(Debug)] pub(crate) enum ActionEvent { /// A client has sent a keep alive message. ClientKeepAlive(OperationId), @@ -357,6 +358,7 @@ pub struct AwaitedActionDbImpl I> { operation_id_to_awaited_action: BTreeMap>, /// A lookup table to lookup the state of an action by its unique qualifier. + #[metric(group = "action_info_hash_key_to_awaited_action")] action_info_hash_key_to_awaited_action: HashMap, /// A sorted set of [`AwaitedAction`]s. A wrapper is used to perform sorting @@ -414,8 +416,9 @@ impl I + Clone + Send + Sync> AwaitedActionDbI &mut self, action_events: impl IntoIterator, ) -> NoEarlyReturn { - for drop_action in action_events.into_iter() { - match drop_action { + for action in action_events.into_iter() { + event!(Level::DEBUG, ?action, "Handling action"); + match action { ActionEvent::ClientDroppedOperation(operation_id) => { // Cleanup operation_id_to_awaited_action. let Some(tx) = self.operation_id_to_awaited_action.remove(&operation_id) else { @@ -452,6 +455,11 @@ impl I + Clone + Send + Sync> AwaitedActionDbI .insert(operation_id, connected_clients); continue; } + event!( + Level::DEBUG, + ?operation_id, + "Clearing operation from state manager" + ); let awaited_action = tx.borrow().clone(); // Cleanup action_info_hash_key_to_awaited_action if it was marked cached. match &awaited_action.action_info().unique_qualifier { @@ -742,6 +750,14 @@ impl I + Clone + Send + Sync> AwaitedActionDbI let (client_awaited_action, rx) = self.make_client_awaited_action(operation_id.clone(), awaited_action); + event!( + Level::DEBUG, + ?client_operation_id, + ?operation_id, + ?client_awaited_action, + "Adding action" + ); + self.client_operation_to_awaited_action .insert(client_operation_id.clone(), client_awaited_action) .await; diff --git a/nativelink-scheduler/src/simple_scheduler.rs b/nativelink-scheduler/src/simple_scheduler.rs index ec36f47d4..bc6ae059d 100644 --- a/nativelink-scheduler/src/simple_scheduler.rs +++ b/nativelink-scheduler/src/simple_scheduler.rs @@ -112,6 +112,7 @@ impl ActionStateResult for SimpleSchedulerActionStateResult { #[derive(MetricsComponent)] pub struct SimpleScheduler { /// Manager for matching engine side of the state manager. + #[metric(group = "matching_engine_state_manager")] matching_engine_state_manager: Arc, /// Manager for client state of this scheduler. @@ -124,6 +125,7 @@ pub struct SimpleScheduler { /// A `Workers` pool that contains all workers that are available to execute actions in a priority /// order based on the allocation strategy. + #[metric(group = "worker_scheduler")] worker_scheduler: Arc, /// Background task that tries to match actions to workers. If this struct diff --git a/nativelink-scheduler/src/worker.rs b/nativelink-scheduler/src/worker.rs index 6a0813a22..1d9651665 100644 --- a/nativelink-scheduler/src/worker.rs +++ b/nativelink-scheduler/src/worker.rs @@ -33,11 +33,13 @@ pub type WorkerTimestamp = u64; /// These platform properties have the type of the properties as well as /// the value of the properties, unlike ActionInfo, which only has the /// string value of the properties. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, MetricsComponent)] pub struct ActionInfoWithProps { /// The action info of the action. + #[metric(group = "action_info")] pub inner: Arc, /// The platform properties of the action. + #[metric(group = "platform_properties")] pub platform_properties: PlatformProperties, } @@ -59,12 +61,14 @@ pub struct Worker { pub id: WorkerId, /// Properties that describe the capabilities of this worker. + #[metric(group = "platform_properties")] pub platform_properties: PlatformProperties, /// Channel to send commands from scheduler to worker. pub tx: UnboundedSender, - /// The action info of the running actions on the worker + /// The action info of the running actions on the worker. + #[metric(group = "running_action_infos")] pub running_action_infos: HashMap, /// Timestamp of last time this worker had been communicated with. diff --git a/nativelink-util/src/action_messages.rs b/nativelink-util/src/action_messages.rs index bad036a39..696a933cf 100644 --- a/nativelink-util/src/action_messages.rs +++ b/nativelink-util/src/action_messages.rs @@ -242,6 +242,18 @@ pub struct ActionUniqueKey { pub digest: DigestInfo, } +impl std::fmt::Display for ActionUniqueKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!( + "{}/{}/{}-{}", + self.instance_name, + self.digest_function, + self.digest.hash_str(), + self.digest.size_bytes + )) + } +} + /// Information needed to execute an action. This struct is used over bazel's proto `Action` /// for simplicity and offers a `salt`, which is useful to ensure during hashing (for dicts) /// to ensure we never match against another `ActionInfo` (when a task should never be cached).