diff --git a/Cargo.lock b/Cargo.lock index 554f246b..586219bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1691,17 +1691,15 @@ dependencies = [ "futures-util", "itertools 0.12.1", "moor-values", - "rpc-async-client", "rpc-common", + "rpc-sync-client", "rustyline", "strum", - "tmq", - "tokio", - "tokio-util", "tracing", "tracing-chrome", "tracing-subscriber", "uuid", + "zmq", ] [[package]] @@ -2458,6 +2456,19 @@ dependencies = [ "uuid", ] +[[package]] +name = "rpc-sync-client" +version = "0.1.0" +dependencies = [ + "bincode", + "futures-util", + "rpc-common", + "thiserror", + "tracing", + "uuid", + "zmq", +] + [[package]] name = "rustc-demangle" version = "0.1.23" diff --git a/Cargo.toml b/Cargo.toml index cf5e9461..7bb1160e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "crates/kernel", "crates/db", "crates/rpc-common", + "crates/rpc-sync-client", "crates/rpc-async-client", "crates/daemon", "crates/telnet-host", diff --git a/crates/console-host/Cargo.toml b/crates/console-host/Cargo.toml index c89525fb..c45386b3 100644 --- a/crates/console-host/Cargo.toml +++ b/crates/console-host/Cargo.toml @@ -10,7 +10,7 @@ rust-version.workspace = true [dependencies] moor-values = { path = "../values" } rpc-common = { path = "../rpc-common" } -rpc-async-client = { path = "../rpc-async-client" } +rpc-sync-client = { path = "../rpc-sync-client" } ## Command line arguments parsing. clap.workspace = true @@ -25,10 +25,6 @@ bincode.workspace = true futures-util.workspace = true futures.workspace = true -## Asynchronous transaction processing & networking -tokio-util.workspace = true -tokio.workspace = true - ## Logging & tracing tracing-chrome.workspace = true tracing-subscriber.workspace = true @@ -36,7 +32,7 @@ tracing.workspace = true ## ZMQ / RPC itertools.workspace = true -tmq.workspace = true +zmq.workspace = true uuid.workspace = true ## For console diff --git a/crates/console-host/src/main.rs b/crates/console-host/src/main.rs index 63145c8f..20646880 100644 --- a/crates/console-host/src/main.rs +++ b/crates/console-host/src/main.rs @@ -13,8 +13,7 @@ // use eyre::Error; -use std::process::exit; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::SystemTime; use clap::Parser; @@ -22,20 +21,15 @@ use clap_derive::Parser; use moor_values::var::Objid; use rustyline::error::ReadlineError; use rustyline::DefaultEditor; -use tmq::request; -use tokio::select; -use tokio::signal::unix::{signal, SignalKind}; -use tokio::sync::Mutex; -use tokio::task::block_in_place; use tracing::{debug, error, info, trace, warn}; use uuid::Uuid; -use rpc_async_client::pubsub_client::{broadcast_recv, narrative_recv}; -use rpc_async_client::rpc_client::RpcSendClient; use rpc_common::{ AuthToken, BroadcastEvent, ClientToken, ConnectionEvent, RpcRequest, RpcResponse, RpcResult, BROADCAST_TOPIC, }; +use rpc_sync_client::RpcSendClient; +use rpc_sync_client::{broadcast_recv, narrative_recv}; #[derive(Parser, Debug)] struct Args { @@ -72,17 +66,14 @@ struct Args { password: String, } -async fn establish_connection( +fn establish_connection( client_id: Uuid, rpc_client: &mut RpcSendClient, -) -> Result<(ClientToken, Objid), eyre::Error> { - match rpc_client - .make_rpc_call( - client_id, - RpcRequest::ConnectionEstablish("console".to_string()), - ) - .await - { +) -> Result<(ClientToken, Objid), Error> { + match rpc_client.make_rpc_call( + client_id, + RpcRequest::ConnectionEstablish("console".to_string()), + ) { Ok(RpcResult::Success(RpcResponse::NewConnection(token, conn_id))) => Ok((token, conn_id)), Ok(RpcResult::Success(other)) => { error!("Unexpected response: {:?}", other); @@ -99,7 +90,7 @@ async fn establish_connection( } } -async fn perform_auth( +fn perform_auth( token: ClientToken, client_id: Uuid, rpc_client: &mut RpcSendClient, @@ -107,21 +98,18 @@ async fn perform_auth( password: &str, ) -> Result<(AuthToken, Objid), Error> { // Need to first authenticate with the server. - match rpc_client - .make_rpc_call( - client_id, - RpcRequest::LoginCommand( - token, - vec![ - "connect".to_string(), - username.to_string(), - password.to_string(), - ], - true, - ), - ) - .await - { + match rpc_client.make_rpc_call( + client_id, + RpcRequest::LoginCommand( + token, + vec![ + "connect".to_string(), + username.to_string(), + password.to_string(), + ], + true, + ), + ) { Ok(RpcResult::Success(RpcResponse::LoginResult(Some(( auth_token, connect_type, @@ -149,7 +137,7 @@ async fn perform_auth( } } -async fn handle_console_line( +fn handle_console_line( client_token: ClientToken, auth_token: AuthToken, client_id: Uuid, @@ -157,27 +145,17 @@ async fn handle_console_line( rpc_client: &mut RpcSendClient, input_request_id: Option, ) { - // Lines are either 'eval' or 'command', depending on the mode we're in. - // TODO: The intent here is to do something like Julia's repl interface where they have a pkg - // mode (initiated by initial ] keystroke) and default repl mode. - // For us, our initial keystroke will provoke evaluation through `Eval` but default will be - // to send standard MOO commands. - // But For now, we'll just act as if we're a telnet connection. User can do eval with ; via - // the core. let line = line.trim(); if let Some(input_request_id) = input_request_id { - match rpc_client - .make_rpc_call( - client_id, - RpcRequest::RequestedInput( - client_token.clone(), - auth_token.clone(), - input_request_id.as_u128(), - line.to_string(), - ), - ) - .await - { + match rpc_client.make_rpc_call( + client_id, + RpcRequest::RequestedInput( + client_token.clone(), + auth_token.clone(), + input_request_id.as_u128(), + line.to_string(), + ), + ) { Ok(RpcResult::Success(RpcResponse::InputThanks)) => { trace!("Input complete"); } @@ -194,13 +172,10 @@ async fn handle_console_line( return; } - match rpc_client - .make_rpc_call( - client_id, - RpcRequest::Command(client_token.clone(), auth_token.clone(), line.to_string()), - ) - .await - { + match rpc_client.make_rpc_call( + client_id, + RpcRequest::Command(client_token.clone(), auth_token.clone(), line.to_string()), + ) { Ok(RpcResult::Success(RpcResponse::CommandSubmitted(_))) => { trace!("Command complete"); } @@ -216,26 +191,23 @@ async fn handle_console_line( } } -async fn console_loop( +fn console_loop( rpc_server: &str, narrative_server: &str, username: &str, password: &str, -) -> Result<(), eyre::Error> { - let zmq_ctx = tmq::Context::new(); +) -> Result<(), Error> { + let zmq_ctx = zmq::Context::new(); + + let rpc_socket = zmq_ctx.socket(zmq::REQ)?; + rpc_socket.connect(rpc_server)?; // Establish a connection to the RPC server let client_id = Uuid::new_v4(); - let rcp_request_sock = request(&zmq_ctx) - .set_rcvtimeo(100) - .set_sndtimeo(100) - .connect(rpc_server) - .expect("Unable to bind RPC server for connection"); - - let mut rpc_client = RpcSendClient::new(rcp_request_sock); + let mut rpc_client = RpcSendClient::new(rpc_socket); - let (client_token, conn_obj_id) = establish_connection(client_id, &mut rpc_client).await?; + let (client_token, conn_obj_id) = establish_connection(client_id, &mut rpc_client)?; debug!("Transitional connection ID before auth: {:?}", conn_obj_id); // Now authenticate with the server. @@ -245,24 +217,22 @@ async fn console_loop( &mut rpc_client, username, password, - ) - .await?; + )?; info!("Authenticated as {:?} / {}", username, player); // Spawn a thread to listen for events on the narrative pubsub channel, and send them to the // console. - let narrative_subscriber = tmq::subscribe(&zmq_ctx) - .connect(narrative_server) - .expect("Unable to connect to narrative pubsub server"); - let mut narrative_subscriber = narrative_subscriber - .subscribe(client_id.as_bytes()) - .expect("Unable to subscribe to narrative pubsub server"); + let narr_sub_socket = zmq_ctx.socket(zmq::SUB)?; + narr_sub_socket.connect(narrative_server)?; + narr_sub_socket.set_subscribe(client_id.as_bytes())?; let input_request_id = Arc::new(Mutex::new(None)); let output_input_request_id = input_request_id.clone(); - let output_loop = tokio::spawn(async move { - loop { - match narrative_recv(client_id, &mut narrative_subscriber).await { + + std::thread::Builder::new() + .name("output-loop".to_string()) + .spawn(move || loop { + match narrative_recv(client_id, &narr_sub_socket) { Ok(ConnectionEvent::Narrative(_, msg)) => { println!( "{}", @@ -283,111 +253,86 @@ async fn console_loop( return; } Ok(ConnectionEvent::RequestInput(requested_input_id)) => { - (*output_input_request_id.lock().await) = + (*output_input_request_id.lock().unwrap()) = Some(Uuid::from_u128(requested_input_id)); } } - } - }); + })?; + + let mut broadcast_subscriber = zmq_ctx.socket(zmq::SUB)?; + broadcast_subscriber.connect(narrative_server)?; + broadcast_subscriber.set_subscribe(BROADCAST_TOPIC)?; - let broadcast_subscriber = tmq::subscribe(&zmq_ctx) - .connect(narrative_server) - .expect("Unable to connect to narrative pubsub server"); - let mut broadcast_subscriber = broadcast_subscriber - .subscribe(BROADCAST_TOPIC) - .expect("Unable to subscribe to narrative pubsub server"); - let broadcast_rcp_request_sock = request(&zmq_ctx) - .set_rcvtimeo(100) - .set_sndtimeo(100) - .connect(rpc_server) - .expect("Unable to bind RPC server for connection"); - let mut broadcast_rpc_client = RpcSendClient::new(broadcast_rcp_request_sock); + let broadcast_rpc_socket = zmq_ctx.socket(zmq::REQ)?; + broadcast_rpc_socket.connect(rpc_server)?; + let mut broadcast_rpc_client = RpcSendClient::new(broadcast_rpc_socket); let broadcast_client_token = client_token.clone(); - let broadcast_loop = tokio::spawn(async move { - loop { - match broadcast_recv(&mut broadcast_subscriber).await { - Ok(BroadcastEvent::PingPong(_)) => { - if let Err(e) = broadcast_rpc_client - .make_rpc_call( - client_id, - RpcRequest::Pong(broadcast_client_token.clone(), SystemTime::now()), - ) - .await - { - error!("Error sending pong: {:?}", e); - return; - } - } - Err(e) => { - error!("Error receiving broadcast event: {:?}; Session ending.", e); + std::thread::spawn(move || loop { + match broadcast_recv(&mut broadcast_subscriber) { + Ok(BroadcastEvent::PingPong(_)) => { + if let Err(e) = broadcast_rpc_client.make_rpc_call( + client_id, + RpcRequest::Pong(broadcast_client_token.clone(), SystemTime::now()), + ) { + error!("Error sending pong: {:?}", e); return; } } + Err(e) => { + error!("Error receiving broadcast event: {:?}; Session ending.", e); + return; + } } }); let edit_client_token = client_token.clone(); let edit_auth_token = auth_token.clone(); - let edit_loop = tokio::spawn(async move { - let mut rl = DefaultEditor::new().unwrap(); - loop { - // TODO: unprovoked output from the narrative stream screws up the prompt midstream, - // but we have no real way to signal to this loop that it should newline for - // cleanliness. Need to figure out something for this. - let input_request_id = input_request_id.lock().await.take(); - let prompt = if let Some(input_request_id) = input_request_id { - format!("{} > ", input_request_id) - } else { - "> ".to_string() - }; - let output = block_in_place(|| rl.readline(prompt.as_str())); - match output { - Ok(line) => { - rl.add_history_entry(line.clone()) - .expect("Could not add history"); - handle_console_line( - edit_client_token.clone(), - edit_auth_token.clone(), - client_id, - &line, - &mut rpc_client, - input_request_id, - ) - .await; - } - Err(ReadlineError::Eof) => { - println!(""); - break; - } - Err(ReadlineError::Interrupted) => { - println!("^C"); - continue; - } - Err(e) => { - println!("Error: {e:?}"); - break; - } - } - } - }); - select! { - _ = output_loop => { - info!("ZMQ client loop exited, stopping..."); - } - _ = broadcast_loop => { - info!("Broadcast loop exited, stopping..."); - } - _ = edit_loop => { - info!("Edit loop exited, stopping..."); + let mut rl = DefaultEditor::new().unwrap(); + loop { + // TODO: unprovoked output from the narrative stream screws up the prompt midstream, + // but we have no real way to signal to this loop that it should newline for + // cleanliness. Need to figure out something for this. + let input_request_id = input_request_id.lock().unwrap().take(); + let prompt = if let Some(input_request_id) = input_request_id { + format!("{} > ", input_request_id) + } else { + "> ".to_string() + }; + let output = rl.readline(prompt.as_str()); + match output { + Ok(line) => { + rl.add_history_entry(line.clone()) + .expect("Could not add history"); + handle_console_line( + edit_client_token.clone(), + edit_auth_token.clone(), + client_id, + &line, + &mut rpc_client, + input_request_id, + ); + } + Err(ReadlineError::Eof) => { + println!(""); + break; + } + Err(ReadlineError::Interrupted) => { + println!("^C"); + continue; + } + Err(e) => { + println!("Error: {e:?}"); + break; + } } } + Ok(()) } -#[tokio::main(flavor = "multi_thread")] -async fn main() -> Result<(), eyre::Error> { +fn main() -> Result<(), Error> { color_eyre::install()?; let args: Args = Args::parse(); @@ -403,24 +348,10 @@ async fn main() -> Result<(), eyre::Error> { tracing::subscriber::set_global_default(main_subscriber) .expect("Unable to set configure logging"); - let mut hup_signal = - signal(SignalKind::hangup()).expect("Unable to register HUP signal handler"); - let mut stop_signal = - signal(SignalKind::interrupt()).expect("Unable to register STOP signal handler"); - - select! { - _ = console_loop(&args.rpc_server, args.narrative_server.as_str(), - &args.username, &args.password) => { - info!("console session exited, quitting..."); - exit(0); - } - _ = hup_signal.recv() => { - info!("HUP received, quitting..."); - exit(0); - }, - _ = stop_signal.recv() => { - info!("STOP received, quitting..."); - exit(0); - } - } + console_loop( + &args.rpc_server, + args.narrative_server.as_str(), + &args.username, + &args.password, + ) } diff --git a/crates/rpc-sync-client/Cargo.toml b/crates/rpc-sync-client/Cargo.toml new file mode 100644 index 00000000..f67329a8 --- /dev/null +++ b/crates/rpc-sync-client/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "rpc-sync-client" +version = "0.1.0" +edition.workspace = true +authors.workspace = true +repository.workspace = true +license.workspace = true +rust-version.workspace = true + +[dependencies] +# Own +rpc-common = { path = "../rpc-common" } + +bincode.workspace = true +futures-util.workspace = true +thiserror.workspace = true +zmq.workspace = true +tracing.workspace = true +uuid.workspace = true diff --git a/crates/rpc-sync-client/src/lib.rs b/crates/rpc-sync-client/src/lib.rs new file mode 100644 index 00000000..0fd45122 --- /dev/null +++ b/crates/rpc-sync-client/src/lib.rs @@ -0,0 +1,5 @@ +mod pubsub_client; +mod rpc_client; + +pub use pubsub_client::{broadcast_recv, narrative_recv}; +pub use rpc_client::RpcSendClient; diff --git a/crates/rpc-sync-client/src/pubsub_client.rs b/crates/rpc-sync-client/src/pubsub_client.rs new file mode 100644 index 00000000..52e5e732 --- /dev/null +++ b/crates/rpc-sync-client/src/pubsub_client.rs @@ -0,0 +1,88 @@ +// Copyright (C) 2024 Ryan Daum +// +// This program is free software: you can redistribute it and/or modify it under +// the terms of the GNU General Public License as published by the Free Software +// Foundation, version 3. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +// FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with +// this program. If not, see . +// + +/// RPC related functions, for talking to/from the RPC daemon over ZMQ. +use tracing::trace; +use uuid::Uuid; +use zmq::Socket; + +use rpc_common::{BroadcastEvent, ConnectionEvent, RpcError}; + +/// Blocking receive on the narrative channel, returning a `ConnectionEvent`. +pub fn narrative_recv(client_id: Uuid, subscribe: &Socket) -> Result { + let Ok(inbound) = subscribe.recv_multipart(0) else { + return Err(RpcError::CouldNotReceive( + "Unable to receive narrative message".to_string(), + )); + }; + + // bincode decode the message, and it should be ConnectionEvent + if inbound.len() != 2 { + return Err(RpcError::CouldNotDecode(format!( + "Unexpected message length: {}", + inbound.len() + ))); + } + + let (received_client_id, event) = (&inbound[0], &inbound[1]); + + let Ok(received_client_id) = Uuid::from_slice(received_client_id) else { + return Err(RpcError::CouldNotDecode( + "Unable to decode client ID".to_string(), + )); + }; + + if received_client_id != client_id { + return Err(RpcError::CouldNotDecode("Unexpected client ID".to_string())); + } + + let decode_result = bincode::decode_from_slice(event.as_ref(), bincode::config::standard()); + let (msg, _msg_size): (ConnectionEvent, usize) = decode_result.map_err(|e| { + RpcError::CouldNotDecode(format!("Unable to decode narrative message: {}", e)) + })?; + + Ok(msg) +} + +/// Blocking receive on the broadcast channel, returning a `BroadcastEvent`. +pub fn broadcast_recv(subscribe: &mut Socket) -> Result { + let Ok(inbound) = subscribe.recv_multipart(0) else { + return Err(RpcError::CouldNotReceive( + "Unable to receive broadcast message".to_string(), + )); + }; + + trace!(message = ?inbound, "broadcast_message"); + if inbound.len() != 2 { + return Err(RpcError::CouldNotDecode(format!( + "Unexpected message length: {}", + inbound.len() + ))); + } + + let (topic, event) = (&inbound[0], &inbound[1]); + + if &topic[..] != b"broadcast" { + return Err(RpcError::CouldNotDecode(format!( + "Unexpected topic: {:?}", + topic + ))); + } + + let (msg, _msg_size): (BroadcastEvent, usize) = + bincode::decode_from_slice(event.as_ref(), bincode::config::standard()).map_err(|e| { + RpcError::CouldNotDecode(format!("Unable to decode broadcast message: {}", e)) + })?; + Ok(msg) +} diff --git a/crates/rpc-sync-client/src/rpc_client.rs b/crates/rpc-sync-client/src/rpc_client.rs new file mode 100644 index 00000000..222aca92 --- /dev/null +++ b/crates/rpc-sync-client/src/rpc_client.rs @@ -0,0 +1,84 @@ +// Copyright (C) 2024 Ryan Daum +// +// This program is free software: you can redistribute it and/or modify it under +// the terms of the GNU General Public License as published by the Free Software +// Foundation, version 3. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +// FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with +// this program. If not, see . +// + +use rpc_common::{RpcError, RpcRequest, RpcResult}; +use tracing::error; +use uuid::Uuid; +use zmq::Socket; + +/// Lightweight wrapper around the TMQ RequestSender to make it slightly simpler to make RPC +/// requests, reducing some boiler plate. +pub struct RpcSendClient { + // Note: this becomes None while a request is in flight, and is replaced with Some() as the + // response is received. + rcp_request_sock: Option, +} + +impl RpcSendClient { + pub fn new(request_sender: Socket) -> Self { + Self { + rcp_request_sock: Some(request_sender), + } + } + + /// Call the ZMQ RPC (REQ/REPLY) endpoint with a `ClientRequest`, and receive a `ServerResponse`. + pub fn make_rpc_call( + &mut self, + client_id: Uuid, + rpc_msg: RpcRequest, + ) -> Result { + let rpc_msg_payload = bincode::encode_to_vec(rpc_msg, bincode::config::standard()) + .map_err(|e| RpcError::CouldNotSend(e.to_string()))?; + + let message = vec![client_id.as_bytes().to_vec(), rpc_msg_payload]; + let rpc_sock = self.rcp_request_sock.take().ok_or(RpcError::CouldNotSend( + "RPC request socket not initialized".to_string(), + ))?; + if let Err(e) = rpc_sock.send_multipart(message, 0) { + error!( + "Unable to send connection establish request to RPC server: {}", + e + ); + return Err(RpcError::CouldNotSend(e.to_string())); + } + let msg = match rpc_sock.recv_multipart(0) { + Ok(parts) => { + if parts.len() != 1 { + return Err(RpcError::CouldNotReceive( + "Unexpected message length".to_string(), + )); + } + parts.clone() + } + Err(e) => { + error!( + "Unable to receive connection establish reply from RPC server: {}", + e + ); + return Err(RpcError::CouldNotReceive(e.to_string())); + } + }; + + match bincode::decode_from_slice(&msg[0], bincode::config::standard()) { + Ok((msg, _)) => { + self.rcp_request_sock = Some(rpc_sock); + Ok(msg) + } + Err(e) => { + error!("Unable to decode RPC response: {}", e); + Err(RpcError::CouldNotDecode(e.to_string())) + } + } + } +}