Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more metrics & event messages #1303

Merged
merged 1 commit into from
Aug 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions nativelink-scheduler/src/memory_awaited_action_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ impl LenEntry for ClientAwaitedAction {
}

/// Actions the AwaitedActionsDb needs to process.
#[derive(Debug)]
pub(crate) enum ActionEvent {
/// A client has sent a keep alive message.
ClientKeepAlive(OperationId),
Expand Down Expand Up @@ -357,6 +358,7 @@ pub struct AwaitedActionDbImpl<I: InstantWrapper, NowFn: Fn() -> I> {
operation_id_to_awaited_action: BTreeMap<OperationId, watch::Sender<AwaitedAction>>,

/// A lookup table to lookup the state of an action by its unique qualifier.
#[metric(group = "action_info_hash_key_to_awaited_action")]
action_info_hash_key_to_awaited_action: HashMap<ActionUniqueKey, OperationId>,

/// A sorted set of [`AwaitedAction`]s. A wrapper is used to perform sorting
Expand Down Expand Up @@ -415,6 +417,7 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI
action_events: impl IntoIterator<Item = ActionEvent>,
) -> NoEarlyReturn {
for action in action_events.into_iter() {
event!(Level::DEBUG, ?action, "Handling action");
match action {
ActionEvent::ClientDroppedOperation(operation_id) => {
// Cleanup operation_id_to_awaited_action.
Expand Down Expand Up @@ -452,6 +455,11 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI
.insert(operation_id, connected_clients);
continue;
}
event!(
Level::DEBUG,
?operation_id,
"Clearing operation from state manager"
);
let awaited_action = tx.borrow().clone();
// Cleanup action_info_hash_key_to_awaited_action if it was marked cached.
match &awaited_action.action_info().unique_qualifier {
Expand Down Expand Up @@ -738,6 +746,14 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI
let (client_awaited_action, rx) =
self.make_client_awaited_action(operation_id.clone(), awaited_action);

event!(
Level::DEBUG,
?client_operation_id,
?operation_id,
?client_awaited_action,
"Adding action"
);

self.client_operation_to_awaited_action
.insert(client_operation_id.clone(), client_awaited_action)
.await;
Expand Down
2 changes: 2 additions & 0 deletions nativelink-scheduler/src/simple_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ impl ActionStateResult for SimpleSchedulerActionStateResult {
#[derive(MetricsComponent)]
pub struct SimpleScheduler {
/// Manager for matching engine side of the state manager.
#[metric(group = "matching_engine_state_manager")]
matching_engine_state_manager: Arc<dyn MatchingEngineStateManager>,

/// Manager for client state of this scheduler.
Expand All @@ -124,6 +125,7 @@ pub struct SimpleScheduler {

/// A `Workers` pool that contains all workers that are available to execute actions in a priority
/// order based on the allocation strategy.
#[metric(group = "worker_scheduler")]
worker_scheduler: Arc<ApiWorkerScheduler>,

/// Background task that tries to match actions to workers. If this struct
Expand Down
8 changes: 6 additions & 2 deletions nativelink-scheduler/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ pub type WorkerTimestamp = u64;
/// These platform properties have the type of the properties as well as
/// the value of the properties, unlike ActionInfo, which only has the
/// string value of the properties.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, MetricsComponent)]
pub struct ActionInfoWithProps {
/// The action info of the action.
#[metric(group = "action_info")]
pub inner: Arc<ActionInfo>,
/// The platform properties of the action.
#[metric(group = "platform_properties")]
pub platform_properties: PlatformProperties,
}

Expand All @@ -59,12 +61,14 @@ pub struct Worker {
pub id: WorkerId,

/// Properties that describe the capabilities of this worker.
#[metric(group = "platform_properties")]
pub platform_properties: PlatformProperties,

/// Channel to send commands from scheduler to worker.
pub tx: UnboundedSender<UpdateForWorker>,

/// The action info of the running actions on the worker
/// The action info of the running actions on the worker.
#[metric(group = "running_action_infos")]
pub running_action_infos: HashMap<OperationId, ActionInfoWithProps>,

/// Timestamp of last time this worker had been communicated with.
Expand Down
12 changes: 12 additions & 0 deletions nativelink-util/src/action_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,18 @@ pub struct ActionUniqueKey {
pub digest: DigestInfo,
}

impl std::fmt::Display for ActionUniqueKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!(
"{}/{}/{}-{}",
self.instance_name,
self.digest_function,
self.digest.hash_str(),
self.digest.size_bytes
))
}
}

/// Information needed to execute an action. This struct is used over bazel's proto `Action`
/// for simplicity and offers a `salt`, which is useful to ensure during hashing (for dicts)
/// to ensure we never match against another `ActionInfo` (when a task should never be cached).
Expand Down
Loading