Skip to content

Commit

Permalink
Implement read() builtin function. Allows for prompts.
Browse files Browse the repository at this point in the history
#10

Like `suspend()` this commits the current transaction and then resumes in a new one once the input is back. So not really recommended if you care strongly about data consistency, and in general this kind of prompting breaks the "VR" feel.

Along with this I did a fairly major reworking of the scheduler loop, which should be far more efficient now. Was able to find a way to rework the channels etc. so that it works with a tokio select! invocation in a loop, so no need for a polling interval anymore.

(There's some oddities still that are preventing Session `fork` from
working that I need to get to the bottom of but they are non-critical;
and the experience in the console host isn't ideal.)
  • Loading branch information
rdaum committed Sep 28, 2023
1 parent 06862c6 commit 813e0cb
Show file tree
Hide file tree
Showing 20 changed files with 788 additions and 358 deletions.
51 changes: 47 additions & 4 deletions crates/console-host/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::Error;
use std::process::exit;
use std::sync::Arc;
use std::time::SystemTime;

use clap::Parser;
Expand All @@ -10,6 +11,7 @@ use rustyline::DefaultEditor;
use tmq::request;
use tokio::select;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::Mutex;
use tokio::task::block_in_place;
use tracing::{debug, error, info, trace, warn};
use uuid::Uuid;
Expand Down Expand Up @@ -123,7 +125,12 @@ async fn perform_auth(
}
}

async fn handle_console_line(client_id: Uuid, line: &str, rpc_client: &mut RpcSendClient) {
async fn handle_console_line(
client_id: Uuid,
line: &str,
rpc_client: &mut RpcSendClient,
input_request_id: Option<Uuid>,
) {
// Lines are either 'eval' or 'command', depending on the mode we're in.
// TODO: The intent here is to do something like Julia's repl interface where they have a pkg
// mode (initiated by initial ] keystroke) and default repl mode.
Expand All @@ -132,11 +139,35 @@ async fn handle_console_line(client_id: Uuid, line: &str, rpc_client: &mut RpcSe
// But For now, we'll just act as if we're a telnet connection. User can do eval with ; via
// the core.
let line = line.trim();
if let Some(input_request_id) = input_request_id {
match rpc_client
.make_rpc_call(
client_id,
RpcRequest::RequestedInput(input_request_id.as_u128(), line.to_string()),
)
.await
{
Ok(RpcResult::Success(RpcResponse::InputThanks)) => {
trace!("Input complete");
}
Ok(RpcResult::Success(other)) => {
warn!("Unexpected input response: {:?}", other);
}
Ok(RpcResult::Failure(e)) => {
error!("Failure executing input: {:?}", e);
}
Err(e) => {
error!("Error executing input: {:?}", e);
}
}
return;
}

match rpc_client
.make_rpc_call(client_id, RpcRequest::Command(line.to_string()))
.await
{
Ok(RpcResult::Success(RpcResponse::CommandComplete)) => {
Ok(RpcResult::Success(RpcResponse::CommandSubmitted(_))) => {
trace!("Command complete");
}
Ok(RpcResult::Success(other)) => {
Expand Down Expand Up @@ -186,6 +217,8 @@ async fn console_loop(
let mut narrative_subscriber = narrative_subscriber
.subscribe(client_id.as_bytes())
.expect("Unable to subscribe to narrative pubsub server");
let input_request_id = Arc::new(Mutex::new(None));
let output_input_request_id = input_request_id.clone();
let output_loop = tokio::spawn(async move {
loop {
match narrative_recv(client_id, &mut narrative_subscriber).await {
Expand All @@ -203,6 +236,10 @@ async fn console_loop(
error!("Error receiving narrative event: {:?}; Session ending.", e);
return;
}
Ok(ConnectionEvent::RequestInput(requested_input_id)) => {
(*output_input_request_id.lock().await) =
Some(Uuid::from_u128(requested_input_id));
}
}
}
});
Expand Down Expand Up @@ -245,12 +282,18 @@ async fn console_loop(
// TODO: unprovoked output from the narrative stream screws up the prompt midstream,
// but we have no real way to signal to this loop that it should newline for
// cleanliness. Need to figure out something for this.
let output = block_in_place(|| rl.readline("> "));
let input_request_id = input_request_id.lock().await.take();
let prompt = if let Some(input_request_id) = input_request_id {
format!("{} > ", input_request_id.to_string())
} else {
"> ".to_string()
};
let output = block_in_place(|| rl.readline(prompt.as_str()));
match output {
Ok(line) => {
rl.add_history_entry(line.clone())
.expect("Could not add history");
handle_console_line(client_id, &line, &mut rpc_client).await;
handle_console_line(client_id, &line, &mut rpc_client, input_request_id).await;
}
Err(ReadlineError::Eof) => {
println!("<EOF>");
Expand Down
144 changes: 120 additions & 24 deletions crates/daemon/src/rpc_server.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
use anyhow::{Context, Error};
use futures_util::SinkExt;
use itertools::Itertools;
use metrics_macros::increment_counter;
use moor_kernel::tasks::command_parse::parse_into_words;
use moor_values::var::Var;
use std::path::PathBuf;
/// The core of the server logic for the RPC daemon
use std::sync::Arc;
use std::time::{Instant, SystemTime};

use anyhow::{Context, Error};
use futures_util::SinkExt;
use itertools::Itertools;
use metrics_macros::increment_counter;
use tmq::publish::Publish;
use tmq::{publish, reply, Multipart};
use tokio::sync::Mutex;
use tracing::{debug, error, info, trace, warn};
use uuid::Uuid;

use crate::connections::Connections;
use crate::make_response;
use crate::rpc_session::RpcSession;
use moor_kernel::tasks::command_parse::parse_into_words;
use moor_kernel::tasks::scheduler::{Scheduler, SchedulerError, TaskWaiterResult};
use moor_kernel::tasks::sessions::SessionError::DeliveryError;
use moor_kernel::tasks::sessions::{Session, SessionError};
Expand All @@ -25,6 +22,7 @@ use moor_values::model::world_state::WorldStateSource;
use moor_values::model::NarrativeEvent;
use moor_values::var::objid::Objid;
use moor_values::var::variant::Variant;
use moor_values::var::Var;
use moor_values::var::{v_bool, v_objid, v_str, v_string};
use moor_values::SYSTEM_OBJECT;
use rpc_common::RpcResponse::{LoginResult, NewConnection};
Expand All @@ -33,6 +31,10 @@ use rpc_common::{
BROADCAST_TOPIC,
};

use crate::connections::Connections;
use crate::make_response;
use crate::rpc_session::RpcSession;

pub struct RpcServer {
publish: Arc<Mutex<Publish>>,
world_state_source: Arc<dyn WorldStateSource>,
Expand Down Expand Up @@ -62,6 +64,7 @@ impl RpcServer {
zmq_context.get_io_threads().unwrap()
);
let publish = publish(&zmq_context.clone())
.set_sndtimeo(1)
.bind(narrative_endpoint)
.unwrap();
let connections = Connections::new(connections_file).await;
Expand Down Expand Up @@ -159,6 +162,23 @@ impl RpcServer {
.await,
)
}
RpcRequest::RequestedInput(request_id, input) => {
increment_counter!("rpc_server.requested_input");
let Some(connection) = self
.connections
.connection_object_for_client(client_id)
.await
else {
return make_response(Err(RpcRequestError::NoConnection));
};

let request_id = Uuid::from_u128(request_id);
make_response(
self.clone()
.respond_input(client_id, connection, request_id, input)
.await,
)
}
RpcRequest::OutOfBand(command) => {
increment_counter!("rpc_server.out_of_band_received");
let Some(connection) = self
Expand Down Expand Up @@ -507,7 +527,7 @@ impl RpcServer {
{
if let Ok(value) = self.clone().watch_command_task(task_id).await {
if value != v_bool(false) {
return Ok(RpcResponse::CommandComplete);
return Ok(RpcResponse::CommandSubmitted(task_id));
}
}
}
Expand Down Expand Up @@ -538,9 +558,39 @@ impl RpcServer {
}
};

self.watch_command_task(task_id).await?;
Ok(RpcResponse::CommandSubmitted(task_id))
}

Ok(RpcResponse::CommandComplete)
async fn respond_input(
self: Arc<Self>,
client_id: Uuid,
connection: Objid,
input_request_id: Uuid,
input: String,
) -> Result<RpcResponse, RpcRequestError> {
if let Err(e) = self
.connections
.activity_for_client(client_id, connection)
.await
{
warn!("Unable to update client connection activity: {}", e);
};
increment_counter!("rpc_server.respond_input");

// Pass this back over to the scheduler to handle.
if let Err(e) = self
.clone()
.scheduler
.submit_requested_input(connection, input_request_id, input)
.await
{
increment_counter!("rpc_server.respond_input.submit_requested_input_failed");
error!(error = ?e, "Error submitting requested input");
return Err(RpcRequestError::InternalError(e.to_string()));
}

// TODO: do we need a new response for this? Maybe just a "Thanks"?
Ok(RpcResponse::InputThanks)
}

async fn watch_command_task(self: Arc<Self>, task_id: TaskId) -> Result<Var, RpcRequestError> {
Expand Down Expand Up @@ -581,7 +631,7 @@ impl RpcServer {
increment_counter!("rpc_server.perform_out_of_band");

let command_components = parse_into_words(command.as_str());
let _ = match self
let task_id = match self
.clone()
.scheduler
.submit_out_of_band_task(connection, command_components, command, session)
Expand All @@ -601,7 +651,7 @@ impl RpcServer {
// let the session run to completion on its own and output back to the client.
// Maybe we should be returning a value from this for the future, but the way clients are
// written right now, there's little point.
Ok(RpcResponse::CommandComplete)
Ok(RpcResponse::CommandSubmitted(task_id))
}

async fn eval(
Expand Down Expand Up @@ -687,29 +737,71 @@ impl RpcServer {
message: String,
) -> Result<(), SessionError> {
increment_counter!("rpc_server.send_system_message");
let mut publish = self.publish.lock().await;
let event = ConnectionEvent::SystemMessage(player, message);
let event_bytes = bincode::encode_to_vec(&event, bincode::config::standard())
.expect("Unable to serialize system message");
let payload = vec![client_id.as_bytes().to_vec(), event_bytes];
publish.send(payload).await.map_err(|e| {
error!(error = ?e, "Unable to send system message");
DeliveryError
})?;
{
let mut publish = self.publish.lock().await;
publish.send(payload).await.map_err(|e| {
error!(error = ?e, "Unable to send system message");
DeliveryError
})?;
}
Ok(())
}

/// Request that the client dispatch its next input event through as an input event into the
/// scheduler submit_input, instead, with the attached input_request_id. So send a narrative
/// event to this *specific* client id letting it know that it should issue a prompt.
pub(crate) async fn request_client_input(
&self,
client_id: Uuid,
player: Objid,
input_request_id: Uuid,
) -> Result<(), SessionError> {
// Mark this client as in `input mode`, which means that instead of dispatching its next
// line to the scheduler as a command, it should instead dispatch it as an input event.

// Validate first.
let Some(connection) = self
.connections
.connection_object_for_client(client_id)
.await
else {
return Err(SessionError::NoConnectionForPlayer(player));
};
if connection != player {
return Err(SessionError::NoConnectionForPlayer(player));
}

let event = ConnectionEvent::RequestInput(input_request_id.as_u128());
let event_bytes = bincode::encode_to_vec(&event, bincode::config::standard())
.expect("Unable to serialize input request");
let payload = vec![client_id.as_bytes().to_vec(), event_bytes];
{
let mut publish = self.publish.lock().await;
publish.send(payload).await.map_err(|e| {
error!(error = ?e, "Unable to send input request");
DeliveryError
})?;
}
Ok(())
}

async fn ping_pong(&self) -> Result<(), SessionError> {
let mut publish = self.publish.lock().await;
let event = BroadcastEvent::PingPong(SystemTime::now());
let event_bytes = bincode::encode_to_vec(&event, bincode::config::standard()).unwrap();

// We want responses from all clients, so send on this broadcast "topic"
let payload = vec![BROADCAST_TOPIC.to_vec(), event_bytes];
publish.send(payload).await.map_err(|e| {
error!(error = ?e, "Unable to send PingPong to client");
DeliveryError
})?;
{
let mut publish = self.publish.lock().await;
publish.send(payload).await.map_err(|e| {
error!(error = ?e, "Unable to send PingPong to client");
DeliveryError
})?;
}
self.connections.ping_check().await;
Ok(())
}
Expand All @@ -723,6 +815,10 @@ pub(crate) async fn zmq_loop(
narrative_endpoint: &str,
) -> anyhow::Result<()> {
let zmq_ctx = tmq::Context::new();
zmq_ctx
.set_io_threads(8)
.expect("Unable to set ZMQ IO threads");

let rpc_server = Arc::new(
RpcServer::new(
connections_file,
Expand Down
12 changes: 12 additions & 0 deletions crates/daemon/src/rpc_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,18 @@ impl Session for RpcSession {
Ok(new_session)
}

async fn request_input(
&self,
player: Objid,
input_request_id: Uuid,
) -> Result<(), SessionError> {
self.rpc_server
.clone()
.request_client_input(self.client_id, player, input_request_id)
.await?;
Ok(())
}

async fn send_event(&self, player: Objid, event: NarrativeEvent) -> Result<(), SessionError> {
self.session_buffer.lock().await.push((player, event));
Ok(())
Expand Down
3 changes: 3 additions & 0 deletions crates/kernel/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ pub mod vm_test_utils {
VMHostResponse::Suspend(_) => {
panic!("Unexpected suspend");
}
VMHostResponse::SuspendNeedInput => {
panic!("Unexpected suspend need input");
}
}
}
}
Expand Down
Loading

0 comments on commit 813e0cb

Please sign in to comment.