Skip to content

Commit

Permalink
AwaitedAction's operation_id and client_operation_id now separated
Browse files Browse the repository at this point in the history
Gives more clear separation of client_operation_id and operation_id in
AwaitedAction.

towards #359
  • Loading branch information
allada committed Sep 2, 2024
1 parent 2bc4d78 commit 71d38d3
Show file tree
Hide file tree
Showing 13 changed files with 93 additions and 65 deletions.
15 changes: 12 additions & 3 deletions nativelink-scheduler/src/awaited_action_db/awaited_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ pub struct AwaitedAction {
action_info: Arc<ActionInfo>,

/// The operation id of the action.
// If you need the client operation id, it may be set in
// ActionState::operation_id.
#[metric(help = "The operation id of the AwaitedAction")]
operation_id: OperationId,

Expand Down Expand Up @@ -86,7 +88,12 @@ impl AwaitedAction {
);
let state = Arc::new(ActionState {
stage,
operation_id: operation_id.clone(),
// Note: We don't use the real client_operation_id here because
// the only place AwaitedAction::new should ever be called is
// when the action is first created and this struct will be stored
// in the database, so we don't want to accidentally leak the
// client_operation_id to all clients.
client_operation_id: operation_id.clone(),
action_digest: action_info.unique_qualifier.digest(),
});
Self {
Expand Down Expand Up @@ -147,9 +154,11 @@ impl AwaitedAction {

/// Sets the current state of the action and notifies subscribers.
/// Returns true if the state was set, false if there are no subscribers.
pub(crate) fn set_state(&mut self, mut state: Arc<ActionState>, now: SystemTime) {
pub(crate) fn set_state(&mut self, mut state: Arc<ActionState>, now: Option<SystemTime>) {
std::mem::swap(&mut self.state, &mut state);
self.keep_alive(now);
if let Some(now) = now {
self.keep_alive(now);
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions nativelink-scheduler/src/cache_lookup_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,13 @@ impl CacheLookupScheduler {
return; // Nobody is waiting for this action anymore.
};
let mut action_state = ActionState {
operation_id: OperationId::default(),
client_operation_id: OperationId::default(),
stage: ActionStage::CompletedFromCache(action_result),
action_digest: action_info.unique_qualifier.digest(),
};

for (client_operation_id, pending_tx) in pending_txs {
action_state.operation_id = client_operation_id;
action_state.client_operation_id = client_operation_id;
// Ignore errors here, as the other end may have hung up.
let _ = pending_tx.send(Ok(Box::new(CacheLookupActionStateResult {
action_state: Arc::new(action_state.clone()),
Expand Down
4 changes: 2 additions & 2 deletions nativelink-scheduler/src/grpc_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ struct GrpcActionStateResult {
impl ActionStateResult for GrpcActionStateResult {
async fn as_state(&self) -> Result<Arc<ActionState>, Error> {
let mut action_state = self.rx.borrow().clone();
Arc::make_mut(&mut action_state).operation_id = self.client_operation_id.clone();
Arc::make_mut(&mut action_state).client_operation_id = self.client_operation_id.clone();
Ok(action_state)
}

Expand All @@ -68,7 +68,7 @@ impl ActionStateResult for GrpcActionStateResult {
)
})?;
let mut action_state = self.rx.borrow().clone();
Arc::make_mut(&mut action_state).operation_id = self.client_operation_id.clone();
Arc::make_mut(&mut action_state).client_operation_id = self.client_operation_id.clone();
Ok(action_state)
}

Expand Down
22 changes: 17 additions & 5 deletions nativelink-scheduler/src/memory_awaited_action_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ where
NowFn: Fn() -> I + Send + Sync + 'static,
{
async fn changed(&mut self) -> Result<AwaitedAction, Error> {
{
let client_operation_id = {
let changed_fut = self.awaited_action_rx.changed().map(|r| {
r.map_err(|e| {
make_err!(
Expand Down Expand Up @@ -194,15 +194,27 @@ where
// let the database know that we are still listening to prevent
// the action from being dropped.
}

}
}
}
Ok(self.awaited_action_rx.borrow().clone())
client_info.client_operation_id.clone()
};
// At this stage we know that this event is a client request, so we need
// to populate the client_operation_id.
let mut awaited_action = self.awaited_action_rx.borrow().clone();
let mut state = awaited_action.state().as_ref().clone();
state.client_operation_id = client_operation_id;
awaited_action.set_state(Arc::new(state), None);
Ok(awaited_action)
}

fn borrow(&self) -> AwaitedAction {
self.awaited_action_rx.borrow().clone()
let mut awaited_action = self.awaited_action_rx.borrow().clone();
if let Some(client_info) = self.client_info.as_ref() {
let mut state = awaited_action.state().as_ref().clone();
state.client_operation_id = client_info.client_operation_id.clone();
awaited_action.set_state(Arc::new(state), None);
}
awaited_action
}
}

Expand Down
6 changes: 3 additions & 3 deletions nativelink-scheduler/src/simple_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl ActionStateResult for SimpleSchedulerActionStateResult {
.err_tip(|| "In SimpleSchedulerActionStateResult")?;
// We need to ensure the client is not aware of the downstream
// operation id, so override it before it goes out.
Arc::make_mut(&mut action_state).operation_id = self.client_operation_id.clone();
Arc::make_mut(&mut action_state).client_operation_id = self.client_operation_id.clone();
Ok(action_state)
}

Expand All @@ -87,7 +87,7 @@ impl ActionStateResult for SimpleSchedulerActionStateResult {
.err_tip(|| "In SimpleSchedulerActionStateResult")?;
// We need to ensure the client is not aware of the downstream
// operation id, so override it before it goes out.
Arc::make_mut(&mut action_state).operation_id = self.client_operation_id.clone();
Arc::make_mut(&mut action_state).client_operation_id = self.client_operation_id.clone();
Ok(action_state)
}

Expand Down Expand Up @@ -223,7 +223,7 @@ impl SimpleScheduler {
.as_state()
.await
.err_tip(|| "Failed to get action_info from as_state_result stream")?;
action_state.operation_id.clone()
action_state.client_operation_id.clone()
};

// Tell the matching engine that the operation is being assigned to a worker.
Expand Down
10 changes: 7 additions & 3 deletions nativelink-scheduler/src/simple_scheduler_state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,9 @@ where
event!(
Level::WARN,
?awaited_action,
"OperationId {} timed out after {} seconds issuing a retry",
"OperationId {} / {} timed out after {} seconds issuing a retry",
awaited_action.operation_id(),
awaited_action.state().client_operation_id,
self.no_event_action_timeout.as_secs_f32(),
);

Expand Down Expand Up @@ -504,10 +505,13 @@ where
awaited_action.set_state(
Arc::new(ActionState {
stage,
operation_id: operation_id.clone(),
// Client id is not known here, it is the responsibility of
// the the subscriber impl to replace this with the
// correct client id.
client_operation_id: operation_id.clone(),
action_digest: awaited_action.action_info().digest(),
}),
now,
Some(now),
);

let update_action_result = self
Expand Down
2 changes: 1 addition & 1 deletion nativelink-scheduler/tests/action_messages_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async fn action_state_any_url_test() -> Result<(), Error> {
let client_id = OperationId::default();
let operation_id = OperationId::default();
let action_state = ActionState {
operation_id: operation_id.clone(),
client_operation_id: operation_id.clone(),
// Result is only populated if has_action_result.
stage: ActionStage::Completed(ActionResult::default()),
action_digest,
Expand Down
2 changes: 1 addition & 1 deletion nativelink-scheduler/tests/cache_lookup_scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async fn add_action_handles_skip_cache() -> Result<(), Error> {
.await?;
let (_forward_watch_channel_tx, forward_watch_channel_rx) =
watch::channel(Arc::new(ActionState {
operation_id: OperationId::default(),
client_operation_id: OperationId::default(),
stage: ActionStage::Queued,
action_digest: action_info.unique_qualifier.digest(),
}));
Expand Down
10 changes: 5 additions & 5 deletions nativelink-scheduler/tests/property_modifier_scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async fn add_action_adds_property() -> Result<(), Error> {
let action_info = make_base_action_info(UNIX_EPOCH, DigestInfo::zero_digest());
let (_forward_watch_channel_tx, forward_watch_channel_rx) =
watch::channel(Arc::new(ActionState {
operation_id: OperationId::default(),
client_operation_id: OperationId::default(),
stage: ActionStage::Queued,
action_digest: action_info.unique_qualifier.digest(),
}));
Expand Down Expand Up @@ -111,7 +111,7 @@ async fn add_action_overwrites_property() -> Result<(), Error> {
let action_info = Arc::new(action_info);
let (_forward_watch_channel_tx, forward_watch_channel_rx) =
watch::channel(Arc::new(ActionState {
operation_id: OperationId::default(),
client_operation_id: OperationId::default(),
stage: ActionStage::Queued,
action_digest: action_info.unique_qualifier.digest(),
}));
Expand Down Expand Up @@ -150,7 +150,7 @@ async fn add_action_property_added_after_remove() -> Result<(), Error> {
let action_info = make_base_action_info(UNIX_EPOCH, DigestInfo::zero_digest());
let (_forward_watch_channel_tx, forward_watch_channel_rx) =
watch::channel(Arc::new(ActionState {
operation_id: OperationId::default(),
client_operation_id: OperationId::default(),
stage: ActionStage::Queued,
action_digest: action_info.unique_qualifier.digest(),
}));
Expand Down Expand Up @@ -189,7 +189,7 @@ async fn add_action_property_remove_after_add() -> Result<(), Error> {
let action_info = make_base_action_info(UNIX_EPOCH, DigestInfo::zero_digest());
let (_forward_watch_channel_tx, forward_watch_channel_rx) =
watch::channel(Arc::new(ActionState {
operation_id: OperationId::default(),
client_operation_id: OperationId::default(),
stage: ActionStage::Queued,
action_digest: action_info.unique_qualifier.digest(),
}));
Expand Down Expand Up @@ -227,7 +227,7 @@ async fn add_action_property_remove() -> Result<(), Error> {
let action_info = Arc::new(action_info);
let (_forward_watch_channel_tx, forward_watch_channel_rx) =
watch::channel(Arc::new(ActionState {
operation_id: OperationId::default(),
client_operation_id: OperationId::default(),
stage: ActionStage::Queued,
action_digest: action_info.unique_qualifier.digest(),
}));
Expand Down
Loading

0 comments on commit 71d38d3

Please sign in to comment.