From 67864cbdf601188b8b2c446f97fe5868b424c1a7 Mon Sep 17 00:00:00 2001 From: Zach Birenbaum Date: Tue, 18 Jun 2024 12:47:25 -0700 Subject: [PATCH] Add WorkerStateManager for RedisStateManager Implements WorkerStateManager trait for RedisStateManager. Adds API hooks for worker-driven updates to OperationState. --- .../src/redis_operation_state.rs | 56 ++++++++++++++++++- 1 file changed, 55 insertions(+), 1 deletion(-) diff --git a/nativelink-scheduler/src/redis_operation_state.rs b/nativelink-scheduler/src/redis_operation_state.rs index 660c12efa2..19f1c691f0 100644 --- a/nativelink-scheduler/src/redis_operation_state.rs +++ b/nativelink-scheduler/src/redis_operation_state.rs @@ -35,7 +35,7 @@ use nativelink_util::background_spawn; use nativelink_util::store_trait::{StoreDriver, StoreLike, StoreSubscription}; use crate::operation_state_manager::{ActionStateResult, OperationStageFlags}; -use crate::operation_state_manager::{ActionStateResultStream, ClientStateManager, OperationFilter}; +use crate::operation_state_manager::{ActionStateResultStream, ClientStateManager, WorkerStateManager, OperationFilter}; #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] enum OperationStage { @@ -330,6 +330,48 @@ impl RedisStateManage } Ok(Box::pin(futures::stream::iter(v))) } + + async fn inner_update_operation( + &self, + operation_id: OperationId, + worker_id: Option, + action_stage: Result, + ) -> Result<(), Error> { + let store = self.store.as_store_driver_pin(); + let key = format!("operations:{}", operation_id); + let operation_bytes_res = &store.get_part_unchunked( + key.clone().into(), + 0, + None + ).await; + let Ok(operation_bytes) = operation_bytes_res else { + return Err(make_input_err!("Received request to update operation {operation_id}, but operation does not exist.")) + }; + let mut operation: RedisOperation = RedisOperation::from_slice(&operation_bytes[..]); + + match action_stage { + Ok(stage) => { + let (maybe_operation_stage, maybe_result) = match stage { + ActionStage::CompletedFromCache(_) => (None, None), + ActionStage::Completed(result) => (Some(OperationStage::Completed), Some(result)), + ActionStage::Queued => (Some(OperationStage::Completed), None), + ActionStage::Unknown => (Some(OperationStage::Unknown), None), + ActionStage::Executing => (Some(OperationStage::Executing), None), + ActionStage::CacheCheck => (Some(OperationStage::CacheCheck), None), + }; + if let Some(operation_stage) = maybe_operation_stage { + operation.stage = operation_stage; + } + operation.result = maybe_result; + operation.worker_id = worker_id; + }, + Err(e) => { operation.last_error = Some(e); } + } + store.update_oneshot( + key.into(), + operation.as_json().into() + ).await + } } #[async_trait] @@ -348,3 +390,15 @@ impl ClientStateManager for RedisStateManager { self.inner_filter_operations(filter).await } } + +#[async_trait] +impl WorkerStateManager for RedisStateManager { + async fn update_operation( + &self, + operation_id: OperationId, + worker_id: WorkerId, + action_stage: Result, + ) -> Result<(), Error> { + self.inner_update_operation(operation_id, Some(worker_id), action_stage).await + } +}