Skip to content

Commit

Permalink
Clean up in task management & #26
Browse files Browse the repository at this point in the history
  * Move the actual parsing of the command down into Task
  * Try `:huh` if the parsing failed
  * Clean up the error returns from the scheduler for the command path
  * Clean up the task dispatching logic to use tokio select!, and a condition variable for the suspension state of the VMHost so we don't busy-loop there.

 $do_command is still initiated from the daemon side, but the next step will be to fold that into the Task as well.
  • Loading branch information
rdaum committed Sep 25, 2023
1 parent 6923a6a commit 4808f72
Show file tree
Hide file tree
Showing 18 changed files with 622 additions and 475 deletions.
26 changes: 7 additions & 19 deletions crates/daemon/src/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ impl RpcServer {
increment_counter!("rpc_server.perform_command");

// Try to submit to do_command as a verb call first and only parse_command after that fails.
// TODO: fold this functionality into Task.
increment_counter!("rpc_server.submit_sys_do_command_task");
let arguments = parse_into_words(command.as_str());
if let Ok(task_id) = self
Expand Down Expand Up @@ -525,13 +526,9 @@ impl RpcServer {
.await
{
Ok(t) => t,
Err(SchedulerError::CouldNotParseCommand(_)) => {
Err(SchedulerError::CommandExecutionError(e)) => {
increment_counter!("rpc_server.perform_command.could_not_parse_command");
return Err(RpcError::CouldNotParseCommand);
}
Err(SchedulerError::NoCommandMatch(s, _)) => {
increment_counter!("rpc_server.perform_command.no_command_match");
return Err(RpcError::NoCommandMatch(s));
return Err(RpcError::CommandError(e));
}
Err(e) => {
increment_counter!("rpc_server.perform_command.submit_command_task_failed");
Expand All @@ -558,17 +555,8 @@ impl RpcServer {

match receiver.await {
Ok(TaskWaiterResult::Success(value)) => Ok(value),
Ok(TaskWaiterResult::Error(SchedulerError::PermissionDenied)) => {
Err(RpcError::PermissionDenied)
}
Ok(TaskWaiterResult::Error(SchedulerError::CouldNotParseCommand(_))) => {
Err(RpcError::CouldNotParseCommand)
}
Ok(TaskWaiterResult::Error(SchedulerError::NoCommandMatch(s, _))) => {
Err(RpcError::NoCommandMatch(s))
}
Ok(TaskWaiterResult::Error(SchedulerError::DatabaseError(e))) => {
Err(RpcError::DatabaseError(e))
Ok(TaskWaiterResult::Error(SchedulerError::CommandExecutionError(e))) => {
Err(RpcError::CommandError(e))
}
Ok(TaskWaiterResult::Error(e)) => Err(RpcError::InternalError(e.to_string())),
Err(e) => {
Expand Down Expand Up @@ -652,9 +640,9 @@ impl RpcServer {

match receiver.await {
Ok(TaskWaiterResult::Success(v)) => Ok(RpcResponse::EvalResult(v)),
Ok(TaskWaiterResult::Error(SchedulerError::DatabaseError(e))) => {
Ok(TaskWaiterResult::Error(SchedulerError::CommandExecutionError(e))) => {
increment_counter!("rpc_server.eval.database_error");
Err(RpcError::DatabaseError(e))
Err(RpcError::CommandError(e))
}
Ok(TaskWaiterResult::Error(e)) => {
error!(error = ?e, "Error processing eval");
Expand Down
14 changes: 7 additions & 7 deletions crates/kernel/src/db/match_env.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use anyhow::anyhow;
use async_trait::async_trait;
use moor_values::model::objset::ObjSet;

use moor_values::model::world_state::WorldState;
use moor_values::model::WorldStateError;
use moor_values::var::objid::Objid;

use crate::db::matching::MatchEnvironment;
Expand All @@ -14,18 +14,18 @@ pub struct DBMatchEnvironment<'a> {

#[async_trait]
impl<'a> MatchEnvironment for DBMatchEnvironment<'a> {
async fn obj_valid(&mut self, oid: Objid) -> Result<bool, anyhow::Error> {
self.ws.valid(oid).await.map_err(|e| anyhow!(e))
async fn obj_valid(&mut self, oid: Objid) -> Result<bool, WorldStateError> {
self.ws.valid(oid).await
}

async fn get_names(&mut self, oid: Objid) -> Result<Vec<String>, anyhow::Error> {
async fn get_names(&mut self, oid: Objid) -> Result<Vec<String>, WorldStateError> {
let mut names = self.ws.names_of(self.perms, oid).await?;
let mut object_names = vec![names.0];
object_names.append(&mut names.1);
Ok(object_names)
}

async fn get_surroundings(&mut self, player: Objid) -> Result<ObjSet, anyhow::Error> {
async fn get_surroundings(&mut self, player: Objid) -> Result<ObjSet, WorldStateError> {
let location = self.ws.location_of(self.perms, player).await?;
let surroundings = self
.ws
Expand All @@ -36,7 +36,7 @@ impl<'a> MatchEnvironment for DBMatchEnvironment<'a> {
Ok(surroundings)
}

async fn location_of(&mut self, player: Objid) -> Result<Objid, anyhow::Error> {
Ok(self.ws.location_of(self.perms, player).await?)
async fn location_of(&mut self, player: Objid) -> Result<Objid, WorldStateError> {
self.ws.location_of(self.perms, player).await
}
}
21 changes: 10 additions & 11 deletions crates/kernel/src/db/matching.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use anyhow::anyhow;
use async_trait::async_trait;
use moor_values::model::objset::ObjSet;
use moor_values::{AMBIGUOUS, FAILED_MATCH, NOTHING};
Expand All @@ -13,16 +12,16 @@ use crate::tasks::command_parse::ParseMatcher;
#[async_trait]
pub trait MatchEnvironment {
// Test whether a given object is valid in this environment.
async fn obj_valid(&mut self, oid: Objid) -> Result<bool, anyhow::Error>;
async fn obj_valid(&mut self, oid: Objid) -> Result<bool, WorldStateError>;

// Return all match names & aliases for an object.
async fn get_names(&mut self, oid: Objid) -> Result<Vec<String>, anyhow::Error>;
async fn get_names(&mut self, oid: Objid) -> Result<Vec<String>, WorldStateError>;

// Returns location, contents, and player, all the things we'd search for matches on.
async fn get_surroundings(&mut self, player: Objid) -> Result<ObjSet, anyhow::Error>;
async fn get_surroundings(&mut self, player: Objid) -> Result<ObjSet, WorldStateError>;

// Return the location of a given object.
async fn location_of(&mut self, player: Objid) -> Result<Objid, anyhow::Error>;
async fn location_of(&mut self, player: Objid) -> Result<Objid, WorldStateError>;
}

#[derive(Clone, Eq, PartialEq, Debug)]
Expand All @@ -36,7 +35,7 @@ fn do_match_object_names(
match_data: &mut MatchData,
names: Vec<String>,
match_name: &str,
) -> Result<Objid, anyhow::Error> {
) -> Result<Objid, WorldStateError> {
let match_name = match_name.to_lowercase();

for object_name in names {
Expand Down Expand Up @@ -71,7 +70,7 @@ pub async fn match_contents<M: MatchEnvironment + Send + Sync>(
env: &mut M,
player: Objid,
object_name: &str,
) -> Result<Option<Objid>, anyhow::Error> {
) -> Result<Option<Objid>, WorldStateError> {
let mut match_data = MatchData {
exact: NOTHING,
partial: FAILED_MATCH,
Expand Down Expand Up @@ -103,7 +102,7 @@ pub struct MatchEnvironmentParseMatcher<M: MatchEnvironment + Send + Sync> {

#[async_trait]
impl<M: MatchEnvironment + Send + Sync> ParseMatcher for MatchEnvironmentParseMatcher<M> {
async fn match_object(&mut self, object_name: &str) -> Result<Option<Objid>, anyhow::Error> {
async fn match_object(&mut self, object_name: &str) -> Result<Option<Objid>, WorldStateError> {
if object_name.is_empty() {
return Ok(None);
}
Expand All @@ -119,9 +118,9 @@ impl<M: MatchEnvironment + Send + Sync> ParseMatcher for MatchEnvironmentParseMa

// Check if the player is valid.
if !self.env.obj_valid(self.player).await? {
return Err(anyhow!(WorldStateError::FailedMatch(
"Invalid current player when performing object match".to_string()
)));
return Err(WorldStateError::FailedMatch(
"Invalid current player when performing object match".to_string(),
));
}

// Check 'me' and 'here' first.
Expand Down
13 changes: 7 additions & 6 deletions crates/kernel/src/db/mock/mock_matching_env.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::collections::{HashMap, HashSet};

use anyhow::{anyhow, Result};
use anyhow::Result;
use async_trait::async_trait;
use moor_values::model::objset::ObjSet;
use moor_values::model::WorldStateError;
use moor_values::NOTHING;

use moor_values::var::objid::Objid;
Expand Down Expand Up @@ -35,18 +36,18 @@ impl MockMatchEnvironment {

#[async_trait]
impl MatchEnvironment for MockMatchEnvironment {
async fn obj_valid(&mut self, oid: Objid) -> Result<bool, anyhow::Error> {
async fn obj_valid(&mut self, oid: Objid) -> Result<bool, WorldStateError> {
Ok(self.objects.contains_key(&oid))
}

async fn get_names(&mut self, oid: Objid) -> Result<Vec<String>, anyhow::Error> {
async fn get_names(&mut self, oid: Objid) -> Result<Vec<String>, WorldStateError> {
Ok(self
.objects
.get(&oid)
.map_or_else(Vec::new, |o| o.names.clone()))
}

async fn get_surroundings(&mut self, player: Objid) -> Result<ObjSet, anyhow::Error> {
async fn get_surroundings(&mut self, player: Objid) -> Result<ObjSet, WorldStateError> {
let mut result = Vec::new();
if let Some(player_obj) = self.objects.get(&player) {
result.push(MOCK_PLAYER);
Expand All @@ -60,11 +61,11 @@ impl MatchEnvironment for MockMatchEnvironment {
Ok(ObjSet::from(&result))
}

async fn location_of(&mut self, oid: Objid) -> Result<Objid, anyhow::Error> {
async fn location_of(&mut self, oid: Objid) -> Result<Objid, WorldStateError> {
self.objects
.get(&oid)
.map(|o| o.location)
.ok_or_else(|| anyhow!("Object not found: {:?}", oid))
.ok_or_else(|| WorldStateError::ObjectNotFound(oid))
}
}

Expand Down
13 changes: 8 additions & 5 deletions crates/kernel/src/tasks/command_parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use bincode::{Decode, Encode};
use lazy_static::lazy_static;

use moor_values::model::r#match::{PrepSpec, Preposition, PREP_LIST};
use moor_values::model::WorldStateError;
use moor_values::var::objid::Objid;
use moor_values::var::{v_str, Var};

Expand Down Expand Up @@ -137,7 +138,7 @@ pub fn parse_into_words(input: &str) -> Vec<String> {

#[async_trait]
pub trait ParseMatcher {
async fn match_object(&mut self, name: &str) -> Result<Option<Objid>, anyhow::Error>;
async fn match_object(&mut self, name: &str) -> Result<Option<Objid>, WorldStateError>;
}

#[derive(thiserror::Error, Debug, Clone, Decode, Encode)]
Expand All @@ -147,7 +148,9 @@ pub enum ParseCommandError {
#[error("Unimplemented built-in command")]
UnimplementedBuiltInCommand,
#[error("Error occurred during object matching")]
ErrorDuringMatch,
ErrorDuringMatch(WorldStateError),
#[error("Permission denied")]
PermissionDenied,
}

#[tracing::instrument(skip(command_environment))]
Expand Down Expand Up @@ -232,15 +235,15 @@ where
iobj = command_environment
.match_object(&iobjstr)
.await
.map_err(|_| ParseCommandError::ErrorDuringMatch)?;
.map_err(|wse| ParseCommandError::ErrorDuringMatch(wse))?;
}

// Get direct object object
if !dobjstr.is_empty() {
dobj = command_environment
.match_object(&dobjstr)
.await
.map_err(|_| ParseCommandError::ErrorDuringMatch)?;
.map_err(|wse| ParseCommandError::ErrorDuringMatch(wse))?;
}

// Build and return ParsedCommand
Expand Down Expand Up @@ -303,7 +306,7 @@ mod tests {
struct SimpleParseMatcher {}
#[async_trait]
impl ParseMatcher for SimpleParseMatcher {
async fn match_object(&mut self, name: &str) -> Result<Option<Objid>, anyhow::Error> {
async fn match_object(&mut self, name: &str) -> Result<Option<Objid>, WorldStateError> {
Ok(match name {
"obj" => Some(Objid(1)),
"player" => Some(Objid(2)),
Expand Down
4 changes: 2 additions & 2 deletions crates/kernel/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ pub mod sessions;

mod moo_vm_host;
mod task;
mod vm_host;
pub mod task_messages;
pub mod vm_host;

pub type TaskId = usize;

Expand Down Expand Up @@ -46,7 +47,6 @@ pub mod vm_test_utils {
let (scs_tx, _scs_rx) = tokio::sync::mpsc::unbounded_channel();
let mut vm_host = MooVmHost::new(
vm,
false,
20,
90_000,
Duration::from_secs(5),
Expand Down
Loading

0 comments on commit 4808f72

Please sign in to comment.