Skip to content

Commit

Permalink
Move where increment_version() is triggered for scheduler code (#1307)
Browse files Browse the repository at this point in the history
This is mostly cosmetic to support other schedulers easier. Instead of
doing a version change in the parent state manager we now do it in the
underlying manager that actually owns the data.

towards #359
  • Loading branch information
allada authored Sep 5, 2024
1 parent 4b45662 commit 7736a6f
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 5 deletions.
12 changes: 8 additions & 4 deletions nativelink-scheduler/src/memory_awaited_action_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,10 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI
}
}

fn update_awaited_action(&mut self, new_awaited_action: AwaitedAction) -> Result<(), Error> {
fn update_awaited_action(
&mut self,
mut new_awaited_action: AwaitedAction,
) -> Result<(), Error> {
let tx = self
.operation_id_to_awaited_action
.get(new_awaited_action.operation_id())
Expand All @@ -581,22 +584,23 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI

// Do not process changes if the action version is not in sync with
// what the sender based the update on.
if old_awaited_action.version() + 1 != new_awaited_action.version() {
if old_awaited_action.version() != new_awaited_action.version() {
return Err(make_err!(
// From: https://grpc.github.io/grpc/core/md_doc_statuscodes.html
// Use ABORTED if the client should retry at a higher level
// (e.g., when a client-specified test-and-set fails,
// indicating the client should restart a read-modify-write
// sequence)
Code::Aborted,
"{} Expected {:?} but got {:?} for operation_id {:?} - {:?}",
"{} Expected {} but got {} for operation_id {:?} - {:?}",
"Tried to update an awaited action with an incorrect version.",
old_awaited_action.version() + 1,
old_awaited_action.version(),
new_awaited_action.version(),
old_awaited_action,
new_awaited_action,
));
}
new_awaited_action.increment_version();

error_if!(
old_awaited_action.action_info().unique_qualifier
Expand Down
1 change: 0 additions & 1 deletion nativelink-scheduler/src/simple_scheduler_state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,6 @@ impl<T: AwaitedActionDb> SimpleSchedulerStateManager<T> {
operation_id: operation_id.clone(),
action_digest: awaited_action.action_info().digest(),
}));
awaited_action.increment_version();

let update_action_result = self
.action_db
Expand Down

0 comments on commit 7736a6f

Please sign in to comment.