Skip to content

Commit

Permalink
Add more metrics & event messages (#1303)
Browse files Browse the repository at this point in the history
Adds a few more metrics and event messages to aid in debugging.
  • Loading branch information
allada authored Aug 31, 2024
1 parent 0ecf5b4 commit 9f0e809
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 2 deletions.
16 changes: 16 additions & 0 deletions nativelink-scheduler/src/memory_awaited_action_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ impl LenEntry for ClientAwaitedAction {
}

/// Actions the AwaitedActionsDb needs to process.
#[derive(Debug)]
pub(crate) enum ActionEvent {
/// A client has sent a keep alive message.
ClientKeepAlive(OperationId),
Expand Down Expand Up @@ -357,6 +358,7 @@ pub struct AwaitedActionDbImpl<I: InstantWrapper, NowFn: Fn() -> I> {
operation_id_to_awaited_action: BTreeMap<OperationId, watch::Sender<AwaitedAction>>,

/// A lookup table to lookup the state of an action by its unique qualifier.
#[metric(group = "action_info_hash_key_to_awaited_action")]
action_info_hash_key_to_awaited_action: HashMap<ActionUniqueKey, OperationId>,

/// A sorted set of [`AwaitedAction`]s. A wrapper is used to perform sorting
Expand Down Expand Up @@ -415,6 +417,7 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI
action_events: impl IntoIterator<Item = ActionEvent>,
) -> NoEarlyReturn {
for action in action_events.into_iter() {
event!(Level::DEBUG, ?action, "Handling action");
match action {
ActionEvent::ClientDroppedOperation(operation_id) => {
// Cleanup operation_id_to_awaited_action.
Expand Down Expand Up @@ -452,6 +455,11 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI
.insert(operation_id, connected_clients);
continue;
}
event!(
Level::DEBUG,
?operation_id,
"Clearing operation from state manager"
);
let awaited_action = tx.borrow().clone();
// Cleanup action_info_hash_key_to_awaited_action if it was marked cached.
match &awaited_action.action_info().unique_qualifier {
Expand Down Expand Up @@ -738,6 +746,14 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI
let (client_awaited_action, rx) =
self.make_client_awaited_action(operation_id.clone(), awaited_action);

event!(
Level::DEBUG,
?client_operation_id,
?operation_id,
?client_awaited_action,
"Adding action"
);

self.client_operation_to_awaited_action
.insert(client_operation_id.clone(), client_awaited_action)
.await;
Expand Down
2 changes: 2 additions & 0 deletions nativelink-scheduler/src/simple_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ impl ActionStateResult for SimpleSchedulerActionStateResult {
#[derive(MetricsComponent)]
pub struct SimpleScheduler {
/// Manager for matching engine side of the state manager.
#[metric(group = "matching_engine_state_manager")]
matching_engine_state_manager: Arc<dyn MatchingEngineStateManager>,

/// Manager for client state of this scheduler.
Expand All @@ -124,6 +125,7 @@ pub struct SimpleScheduler {

/// A `Workers` pool that contains all workers that are available to execute actions in a priority
/// order based on the allocation strategy.
#[metric(group = "worker_scheduler")]
worker_scheduler: Arc<ApiWorkerScheduler>,

/// Background task that tries to match actions to workers. If this struct
Expand Down
8 changes: 6 additions & 2 deletions nativelink-scheduler/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ pub type WorkerTimestamp = u64;
/// These platform properties have the type of the properties as well as
/// the value of the properties, unlike ActionInfo, which only has the
/// string value of the properties.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, MetricsComponent)]
pub struct ActionInfoWithProps {
/// The action info of the action.
#[metric(group = "action_info")]
pub inner: Arc<ActionInfo>,
/// The platform properties of the action.
#[metric(group = "platform_properties")]
pub platform_properties: PlatformProperties,
}

Expand All @@ -59,12 +61,14 @@ pub struct Worker {
pub id: WorkerId,

/// Properties that describe the capabilities of this worker.
#[metric(group = "platform_properties")]
pub platform_properties: PlatformProperties,

/// Channel to send commands from scheduler to worker.
pub tx: UnboundedSender<UpdateForWorker>,

/// The action info of the running actions on the worker
/// The action info of the running actions on the worker.
#[metric(group = "running_action_infos")]
pub running_action_infos: HashMap<OperationId, ActionInfoWithProps>,

/// Timestamp of last time this worker had been communicated with.
Expand Down
12 changes: 12 additions & 0 deletions nativelink-util/src/action_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,18 @@ pub struct ActionUniqueKey {
pub digest: DigestInfo,
}

impl std::fmt::Display for ActionUniqueKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!(
"{}/{}/{}-{}",
self.instance_name,
self.digest_function,
self.digest.hash_str(),
self.digest.size_bytes
))
}
}

/// Information needed to execute an action. This struct is used over bazel's proto `Action`
/// for simplicity and offers a `salt`, which is useful to ensure during hashing (for dicts)
/// to ensure we never match against another `ActionInfo` (when a task should never be cached).
Expand Down

0 comments on commit 9f0e809

Please sign in to comment.