From 15366b615fa1b70e408b3be65e13604f84144ff0 Mon Sep 17 00:00:00 2001 From: Blaise Bruer Date: Sun, 1 Sep 2024 13:24:58 -0500 Subject: [PATCH] Move where increment_version() is triggered for scheduler code This is mostly cosmetic to support other schedulers easier. Instead of doing a version change in the parent state manager we now do it in the underlying manager that actually owns the data. towards #359 --- nativelink-scheduler/src/memory_awaited_action_db.rs | 12 ++++++++---- .../src/simple_scheduler_state_manager.rs | 1 - 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/nativelink-scheduler/src/memory_awaited_action_db.rs b/nativelink-scheduler/src/memory_awaited_action_db.rs index 397cc35b4..15568112c 100644 --- a/nativelink-scheduler/src/memory_awaited_action_db.rs +++ b/nativelink-scheduler/src/memory_awaited_action_db.rs @@ -623,7 +623,10 @@ impl I + Clone + Send + Sync> AwaitedActionDbI } } - fn update_awaited_action(&mut self, new_awaited_action: AwaitedAction) -> Result<(), Error> { + fn update_awaited_action( + &mut self, + mut new_awaited_action: AwaitedAction, + ) -> Result<(), Error> { let tx = self .operation_id_to_awaited_action .get(new_awaited_action.operation_id()) @@ -640,7 +643,7 @@ impl I + Clone + Send + Sync> AwaitedActionDbI // Do not process changes if the action version is not in sync with // what the sender based the update on. - if old_awaited_action.version() + 1 != new_awaited_action.version() { + if old_awaited_action.version() != new_awaited_action.version() { return Err(make_err!( // From: https://grpc.github.io/grpc/core/md_doc_statuscodes.html // Use ABORTED if the client should retry at a higher level @@ -648,14 +651,15 @@ impl I + Clone + Send + Sync> AwaitedActionDbI // indicating the client should restart a read-modify-write // sequence) Code::Aborted, - "{} Expected {:?} but got {:?} for operation_id {:?} - {:?}", + "{} Expected {} but got {} for operation_id {:?} - {:?}", "Tried to update an awaited action with an incorrect version.", - old_awaited_action.version() + 1, + old_awaited_action.version(), new_awaited_action.version(), old_awaited_action, new_awaited_action, )); } + new_awaited_action.increment_version(); error_if!( old_awaited_action.action_info().unique_qualifier diff --git a/nativelink-scheduler/src/simple_scheduler_state_manager.rs b/nativelink-scheduler/src/simple_scheduler_state_manager.rs index 7c0b3e28f..dcefe9c65 100644 --- a/nativelink-scheduler/src/simple_scheduler_state_manager.rs +++ b/nativelink-scheduler/src/simple_scheduler_state_manager.rs @@ -253,7 +253,6 @@ impl SimpleSchedulerStateManager { operation_id: operation_id.clone(), action_digest: awaited_action.action_info().digest(), })); - awaited_action.increment_version(); let update_action_result = self .action_db