Skip to content

Commit

Permalink
Add WorkerStateManager for RedisStateManager
Browse files Browse the repository at this point in the history
Implements WorkerStateManager trait for RedisStateManager. Adds API
hooks for worker-driven updates to OperationState.
  • Loading branch information
zbirenbaum committed Jun 18, 2024
1 parent 69d345b commit 67864cb
Showing 1 changed file with 55 additions and 1 deletion.
56 changes: 55 additions & 1 deletion nativelink-scheduler/src/redis_operation_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use nativelink_util::background_spawn;
use nativelink_util::store_trait::{StoreDriver, StoreLike, StoreSubscription};

use crate::operation_state_manager::{ActionStateResult, OperationStageFlags};
use crate::operation_state_manager::{ActionStateResultStream, ClientStateManager, OperationFilter};
use crate::operation_state_manager::{ActionStateResultStream, ClientStateManager, WorkerStateManager, OperationFilter};

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
enum OperationStage {
Expand Down Expand Up @@ -330,6 +330,48 @@ impl<T: ConnectionLike + Unpin + Clone + Send + Sync + 'static> RedisStateManage
}
Ok(Box::pin(futures::stream::iter(v)))
}

async fn inner_update_operation(
&self,
operation_id: OperationId,
worker_id: Option<WorkerId>,
action_stage: Result<ActionStage, Error>,
) -> Result<(), Error> {
let store = self.store.as_store_driver_pin();
let key = format!("operations:{}", operation_id);
let operation_bytes_res = &store.get_part_unchunked(
key.clone().into(),
0,
None
).await;
let Ok(operation_bytes) = operation_bytes_res else {
return Err(make_input_err!("Received request to update operation {operation_id}, but operation does not exist."))
};
let mut operation: RedisOperation = RedisOperation::from_slice(&operation_bytes[..]);

match action_stage {
Ok(stage) => {
let (maybe_operation_stage, maybe_result) = match stage {
ActionStage::CompletedFromCache(_) => (None, None),
ActionStage::Completed(result) => (Some(OperationStage::Completed), Some(result)),
ActionStage::Queued => (Some(OperationStage::Completed), None),
ActionStage::Unknown => (Some(OperationStage::Unknown), None),
ActionStage::Executing => (Some(OperationStage::Executing), None),
ActionStage::CacheCheck => (Some(OperationStage::CacheCheck), None),
};
if let Some(operation_stage) = maybe_operation_stage {
operation.stage = operation_stage;
}
operation.result = maybe_result;
operation.worker_id = worker_id;
},
Err(e) => { operation.last_error = Some(e); }
}
store.update_oneshot(
key.into(),
operation.as_json().into()
).await
}
}

#[async_trait]
Expand All @@ -348,3 +390,15 @@ impl ClientStateManager for RedisStateManager {
self.inner_filter_operations(filter).await
}
}

#[async_trait]
impl WorkerStateManager for RedisStateManager {
async fn update_operation(
&self,
operation_id: OperationId,
worker_id: WorkerId,
action_stage: Result<ActionStage, Error>,
) -> Result<(), Error> {
self.inner_update_operation(operation_id, Some(worker_id), action_stage).await
}
}

0 comments on commit 67864cb

Please sign in to comment.