Skip to content

Commit

Permalink
Add more metrics & event messages
Browse files Browse the repository at this point in the history
Adds a few more metrics and event messages to aid in debugging.
  • Loading branch information
allada committed Aug 31, 2024
1 parent 47dfc20 commit 26ba264
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 4 deletions.
20 changes: 18 additions & 2 deletions nativelink-scheduler/src/memory_awaited_action_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ impl LenEntry for ClientAwaitedAction {
}

/// Actions the AwaitedActionsDb needs to process.
#[derive(Debug)]
pub(crate) enum ActionEvent {
/// A client has sent a keep alive message.
ClientKeepAlive(OperationId),
Expand Down Expand Up @@ -357,6 +358,7 @@ pub struct AwaitedActionDbImpl<I: InstantWrapper, NowFn: Fn() -> I> {
operation_id_to_awaited_action: BTreeMap<OperationId, watch::Sender<AwaitedAction>>,

/// A lookup table to lookup the state of an action by its unique qualifier.
#[metric(group = "action_info_hash_key_to_awaited_action")]
action_info_hash_key_to_awaited_action: HashMap<ActionUniqueKey, OperationId>,

/// A sorted set of [`AwaitedAction`]s. A wrapper is used to perform sorting
Expand Down Expand Up @@ -414,8 +416,9 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI
&mut self,
action_events: impl IntoIterator<Item = ActionEvent>,
) -> NoEarlyReturn {
for drop_action in action_events.into_iter() {
match drop_action {
for action in action_events.into_iter() {
event!(Level::DEBUG, ?action, "Handling action");
match action {
ActionEvent::ClientDroppedOperation(operation_id) => {
// Cleanup operation_id_to_awaited_action.
let Some(tx) = self.operation_id_to_awaited_action.remove(&operation_id) else {
Expand Down Expand Up @@ -452,6 +455,11 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI
.insert(operation_id, connected_clients);
continue;
}
event!(
Level::DEBUG,
?operation_id,
"Clearing operation from state manager"
);
let awaited_action = tx.borrow().clone();
// Cleanup action_info_hash_key_to_awaited_action if it was marked cached.
match &awaited_action.action_info().unique_qualifier {
Expand Down Expand Up @@ -742,6 +750,14 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI
let (client_awaited_action, rx) =
self.make_client_awaited_action(operation_id.clone(), awaited_action);

event!(
Level::DEBUG,
?client_operation_id,
?operation_id,
?client_awaited_action,
"Adding action"
);

self.client_operation_to_awaited_action
.insert(client_operation_id.clone(), client_awaited_action)
.await;
Expand Down
2 changes: 2 additions & 0 deletions nativelink-scheduler/src/simple_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ impl ActionStateResult for SimpleSchedulerActionStateResult {
#[derive(MetricsComponent)]
pub struct SimpleScheduler {
/// Manager for matching engine side of the state manager.
#[metric(group = "matching_engine_state_manager")]
matching_engine_state_manager: Arc<dyn MatchingEngineStateManager>,

/// Manager for client state of this scheduler.
Expand All @@ -124,6 +125,7 @@ pub struct SimpleScheduler {

/// A `Workers` pool that contains all workers that are available to execute actions in a priority
/// order based on the allocation strategy.
#[metric(group = "worker_scheduler")]
worker_scheduler: Arc<ApiWorkerScheduler>,

/// Background task that tries to match actions to workers. If this struct
Expand Down
8 changes: 6 additions & 2 deletions nativelink-scheduler/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ pub type WorkerTimestamp = u64;
/// These platform properties have the type of the properties as well as
/// the value of the properties, unlike ActionInfo, which only has the
/// string value of the properties.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, MetricsComponent)]
pub struct ActionInfoWithProps {
/// The action info of the action.
#[metric(group = "action_info")]
pub inner: Arc<ActionInfo>,
/// The platform properties of the action.
#[metric(group = "platform_properties")]
pub platform_properties: PlatformProperties,
}

Expand All @@ -59,12 +61,14 @@ pub struct Worker {
pub id: WorkerId,

/// Properties that describe the capabilities of this worker.
#[metric(group = "platform_properties")]
pub platform_properties: PlatformProperties,

/// Channel to send commands from scheduler to worker.
pub tx: UnboundedSender<UpdateForWorker>,

/// The action info of the running actions on the worker
/// The action info of the running actions on the worker.
#[metric(group = "running_action_infos")]
pub running_action_infos: HashMap<OperationId, ActionInfoWithProps>,

/// Timestamp of last time this worker had been communicated with.
Expand Down
12 changes: 12 additions & 0 deletions nativelink-util/src/action_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,18 @@ pub struct ActionUniqueKey {
pub digest: DigestInfo,
}

impl std::fmt::Display for ActionUniqueKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!(
"{}/{}/{}-{}",
self.instance_name,
self.digest_function,
self.digest.hash_str(),
self.digest.size_bytes
))
}
}

/// Information needed to execute an action. This struct is used over bazel's proto `Action`
/// for simplicity and offers a `salt`, which is useful to ensure during hashing (for dicts)
/// to ensure we never match against another `ActionInfo` (when a task should never be cached).
Expand Down

0 comments on commit 26ba264

Please sign in to comment.