Skip to content

Commit

Permalink
Fixes out of code review
Browse files Browse the repository at this point in the history
  • Loading branch information
rdaum committed Jun 27, 2024
1 parent 80b77b8 commit 936c177
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 38 deletions.
2 changes: 1 addition & 1 deletion crates/kernel/src/builtins/bf_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ fn bf_notify(bf_args: &mut BfCallState<'_>) -> Result<BfRet, BfErr> {
event,
},
))
.ok();
.expect("Unable to contact scheduler for Notify");

// MOO docs say this should return none, but in reality it returns 1?
Ok(Ret(v_int(1)))
Expand Down
61 changes: 24 additions & 37 deletions crates/kernel/src/tasks/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use moor_values::model::VerbProgramError;
use moor_values::model::{BinaryType, CommandError, HasUuid, VerbAttrs};
use moor_values::model::{CommitResult, Perms};
use moor_values::var::Error::{E_INVARG, E_PERM};
use moor_values::var::{v_err, v_int, v_string, List, Var};
use moor_values::var::{v_err, v_int, v_none, v_string, List, Var};
use moor_values::var::{Objid, Variant};
use moor_values::{AsByteBuffer, SYSTEM_OBJECT};
use SchedulerError::{
Expand Down Expand Up @@ -733,7 +733,10 @@ impl Scheduler {
} => {
// Task is asking to kill another task.
let mut inner = self.task_q.lock().unwrap();
inner.kill_task(victim_task_id, sender_permissions, result_sender);
let kr = inner.kill_task(victim_task_id, sender_permissions);
if let Err(e) = result_sender.send(kr) {
error!(?e, "Could not send kill task result to requester");
}
}
SchedulerControlMsg::ResumeTask {
queued_task_id,
Expand All @@ -742,15 +745,17 @@ impl Scheduler {
result_sender,
} => {
let mut inner = self.task_q.lock().unwrap();
inner.resume_task(
let rr = inner.resume_task(
task_id,
queued_task_id,
sender_permissions,
return_value,
result_sender,
&self.control_sender,
self.database.clone(),
);
if let Err(e) = result_sender.send(rr) {
error!(?e, "Could not send resume task result to requester");
}
}
SchedulerControlMsg::BootPlayer {
player,
Expand Down Expand Up @@ -1112,7 +1117,7 @@ impl TaskQ {
database: Arc<dyn Database>,
) {
// Make sure the old thread is dead.
task.kill_switch.store(false, Ordering::SeqCst);
task.kill_switch.store(true, Ordering::SeqCst);

// Remove this from the running tasks.
// By definition we can't respond to a retry for a suspended task, so if it's not in the
Expand Down Expand Up @@ -1140,13 +1145,8 @@ impl TaskQ {
}
}

#[instrument(skip(self, result_sender))]
fn kill_task(
&mut self,
victim_task_id: TaskId,
sender_permissions: Perms,
result_sender: oneshot::Sender<Var>,
) {
#[instrument(skip(self))]
fn kill_task(&mut self, victim_task_id: TaskId, sender_permissions: Perms) -> Var {
// We need to do perms check first, which means checking both running and suspended tasks,
// and getting their permissions. And may as well remember whether it was in suspended or
// active at the same time.
Expand All @@ -1155,10 +1155,7 @@ impl TaskQ {
None => match self.tasks.get(&victim_task_id) {
Some(task) => (task.player, false),
None => {
result_sender
.send(v_err(E_INVARG))
.expect("Could not send kill result");
return;
return v_err(E_INVARG);
}
},
};
Expand All @@ -1175,10 +1172,7 @@ impl TaskQ {
.expect("Could not check wizard status for kill request")
&& sender_permissions.who != perms
{
result_sender
.send(v_err(E_PERM))
.expect("Could not send kill result");
return;
return v_err(E_PERM);
}

// If suspended we can just remove completely and move on.
Expand All @@ -1189,44 +1183,40 @@ impl TaskQ {
"Task not found in suspended list for kill request"
);
}
return;
return v_none();
}

// Otherwise we have to check if the task is running, remove its control record, and flip
// its kill switch.
let victim_task = match self.tasks.remove(&victim_task_id) {
Some(victim_task) => victim_task,
None => {
result_sender
.send(v_err(E_INVARG))
.expect("Could not send kill result");
return;
return v_err(E_INVARG);
}
};
victim_task.kill_switch.store(true, Ordering::SeqCst);
v_none()
}

#[allow(clippy::too_many_arguments)]
#[instrument(skip(self, result_sender, control_sender, database))]
#[instrument(skip(self, control_sender, database))]
fn resume_task(
&mut self,
requesting_task_id: TaskId,
queued_task_id: TaskId,
sender_permissions: Perms,
return_value: Var,
result_sender: oneshot::Sender<Var>,
control_sender: &Sender<(TaskId, SchedulerControlMsg)>,
database: Arc<dyn Database>,
) {
) -> Var {
// Task can't resume itself, it couldn't be queued. Builtin should not have sent this
// request.
if requesting_task_id == queued_task_id {
error!(
task = requesting_task_id,
"Task requested to resume itself. Ignoring"
);
result_sender.send(v_err(E_INVARG)).ok();
return;
return v_err(E_INVARG);
}

let perms = match self.suspended.get(&queued_task_id) {
Expand All @@ -1241,8 +1231,7 @@ impl TaskQ {
..
}) => task.perms,
_ => {
result_sender.send(v_err(E_INVARG)).ok();
return;
return v_err(E_INVARG);
}
};
// No permissions.
Expand All @@ -1251,10 +1240,7 @@ impl TaskQ {
.expect("Could not check wizard status for resume request")
&& sender_permissions.who != perms
{
result_sender
.send(v_err(E_PERM))
.expect("Could not send resume result");
return;
return v_err(E_PERM);
}

let sr = self.suspended.remove(&queued_task_id).unwrap();
Expand All @@ -1271,8 +1257,9 @@ impl TaskQ {
.is_err()
{
error!(task = queued_task_id, "Could not resume task");
result_sender.send(v_err(E_INVARG)).ok();
return v_err(E_INVARG);
}
v_none()
}

#[instrument(skip(self))]
Expand Down

0 comments on commit 936c177

Please sign in to comment.