Skip to content

Commit

Permalink
Initial support of listen()
Browse files Browse the repository at this point in the history
  • Loading branch information
rdaum committed Nov 14, 2024
1 parent 67f26e9 commit ac068cf
Show file tree
Hide file tree
Showing 30 changed files with 1,953 additions and 534 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 37 additions & 28 deletions crates/console-host/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use color_eyre::owo_colors::OwoColorize;
use moor_compiler::to_literal;
use moor_values::Objid;
use rpc_common::{
AuthToken, BroadcastEvent, ClientToken, ConnectionEvent, RpcRequest, RpcResponse, RpcResult,
BROADCAST_TOPIC,
AuthToken, ClientEvent, ClientToken, ClientsBroadcastEvent, DaemonToClientReply,
HostClientToDaemonMessage, ReplyResult, CLIENT_BROADCAST_TOPIC,
};
use rpc_sync_client::RpcSendClient;
use rpc_sync_client::{broadcast_recv, events_recv};
Expand Down Expand Up @@ -75,14 +75,16 @@ fn establish_connection(
) -> Result<(ClientToken, Objid), Error> {
match rpc_client.make_rpc_call(
client_id,
RpcRequest::ConnectionEstablish("console".to_string()),
HostClientToDaemonMessage::ConnectionEstablish("console".to_string()),
) {

Check failure on line 79 in crates/console-host/src/main.rs

View workflow job for this annotation

GitHub Actions / clippy

non-exhaustive patterns: `Ok(rpc_common::ReplyResult::HostSuccess(_))` not covered

error[E0004]: non-exhaustive patterns: `Ok(rpc_common::ReplyResult::HostSuccess(_))` not covered --> crates/console-host/src/main.rs:76:11 | 76 | match rpc_client.make_rpc_call( | ___________^ 77 | | client_id, 78 | | HostClientToDaemonMessage::ConnectionEstablish("console".to_string()), 79 | | ) { | |_____^ pattern `Ok(rpc_common::ReplyResult::HostSuccess(_))` not covered | note: `std::result::Result<rpc_common::ReplyResult, rpc_common::RpcError>` defined here --> /rustc/f6e511eec7342f59a25f7c0534f1dbea00d01b14/library/core/src/result.rs:527:1 ::: /rustc/f6e511eec7342f59a25f7c0534f1dbea00d01b14/library/core/src/result.rs:531:5 | = note: not covered = note: the matched value is of type `std::result::Result<rpc_common::ReplyResult, rpc_common::RpcError>` help: ensure that all possible cases are being handled by adding a match arm with a wildcard pattern or an explicit pattern as shown | 94 ~ }, 95 + Ok(rpc_common::ReplyResult::HostSuccess(_)) => todo!() |
Ok(RpcResult::Success(RpcResponse::NewConnection(token, conn_id))) => Ok((token, conn_id)),
Ok(RpcResult::Success(response)) => {
Ok(ReplyResult::ClientSuccess(DaemonToClientReply::NewConnection(token, conn_id))) => {
Ok((token, conn_id))
}
Ok(ReplyResult::ClientSuccess(response)) => {
error!(?response, "Unexpected response");
Err(Error::msg("Unexpected response"))
}
Ok(RpcResult::Failure(error)) => {
Ok(ReplyResult::Failure(error)) => {
error!(?error, "Failure connecting");
Err(Error::msg("Failure connecting"))
}
Expand All @@ -103,7 +105,7 @@ fn perform_auth(
// Need to first authenticate with the server.
match rpc_client.make_rpc_call(
client_id,
RpcRequest::LoginCommand(
HostClientToDaemonMessage::LoginCommand(

Check failure on line 108 in crates/console-host/src/main.rs

View workflow job for this annotation

GitHub Actions / clippy

this enum variant takes 4 arguments but 3 arguments were supplied

error[E0061]: this enum variant takes 4 arguments but 3 arguments were supplied --> crates/console-host/src/main.rs:108:9 | 108 | HostClientToDaemonMessage::LoginCommand( | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 109 | token, 110 | / vec![ 111 | | "connect".to_string(), 112 | | username.to_string(), 113 | | password.to_string(), 114 | | ], | |_____________- argument #2 of type `moor_values::Objid` is missing | note: tuple variant defined here --> /home/runner/work/moor/moor/crates/rpc-common/src/lib.rs:108:5 | 108 | LoginCommand(ClientToken, Objid, Vec<String>, bool /* attach? */), | ^^^^^^^^^^^^ help: provide the argument | 108 ~ HostClientToDaemonMessage::LoginCommand(token, /* moor_values::Objid */, vec![ 109 + "connect".to_string(), 110 + username.to_string(), 111 + password.to_string(), 112 ~ ], true), |
token,
vec![
"connect".to_string(),
Expand All @@ -113,23 +115,23 @@ fn perform_auth(
true,
),
) {
Ok(RpcResult::Success(RpcResponse::LoginResult(Some((
Ok(ReplyResult::ClientSuccess(DaemonToClientReply::LoginResult(Some((
auth_token,
connect_type,
player,
))))) => {
info!(?connect_type, ?player, "Authenticated");
Ok((auth_token, player))
}
Ok(RpcResult::Success(RpcResponse::LoginResult(None))) => {
Ok(ReplyResult::ClientSuccess(DaemonToClientReply::LoginResult(None))) => {
error!("Authentication failed");
Err(Error::msg("Authentication failed"))
}
Ok(RpcResult::Success(response)) => {
Ok(ReplyResult::ClientSuccess(response)) => {
error!(?response, "Unexpected response");
Err(Error::msg("Unexpected response"))
}
Ok(RpcResult::Failure(failure)) => {
Ok(ReplyResult::Failure(failure)) => {
error!(?failure, "Failure authenticating");
Err(Error::msg("Failure authenticating"))
}
Expand All @@ -152,20 +154,20 @@ fn handle_console_line(
if let Some(input_request_id) = input_request_id {
match rpc_client.make_rpc_call(
client_id,
RpcRequest::RequestedInput(
HostClientToDaemonMessage::RequestedInput(
client_token.clone(),
auth_token.clone(),
input_request_id.as_u128(),
line.to_string(),
),
) {
Ok(RpcResult::Success(RpcResponse::InputThanks)) => {
Ok(ReplyResult::ClientSuccess(DaemonToClientReply::InputThanks)) => {
trace!("Input complete");
}
Ok(RpcResult::Success(response)) => {
Ok(ReplyResult::ClientSuccess(response)) => {
warn!(?response, "Unexpected input response");
}
Ok(RpcResult::Failure(error)) => {
Ok(ReplyResult::Failure(error)) => {
error!(?error, "Failure executing input");
}
Err(error) => {
Expand All @@ -177,15 +179,19 @@ fn handle_console_line(

match rpc_client.make_rpc_call(
client_id,
RpcRequest::Command(client_token.clone(), auth_token.clone(), line.to_string()),
HostClientToDaemonMessage::Command(

Check failure on line 182 in crates/console-host/src/main.rs

View workflow job for this annotation

GitHub Actions / clippy

this enum variant takes 4 arguments but 3 arguments were supplied

error[E0061]: this enum variant takes 4 arguments but 3 arguments were supplied --> crates/console-host/src/main.rs:182:9 | 182 | HostClientToDaemonMessage::Command( | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ... 185 | line.to_string(), | ---------------- argument #3 of type `moor_values::Objid` is missing | note: tuple variant defined here --> /home/runner/work/moor/moor/crates/rpc-common/src/lib.rs:114:5 | 114 | Command(ClientToken, AuthToken, Objid, String), | ^^^^^^^ help: provide the argument | 182 | HostClientToDaemonMessage::Command(client_token.clone(), auth_token.clone(), /* moor_values::Objid */, line.to_string()), | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
client_token.clone(),
auth_token.clone(),
line.to_string(),
),
) {
Ok(RpcResult::Success(RpcResponse::CommandSubmitted(_))) => {
Ok(ReplyResult::ClientSuccess(DaemonToClientReply::CommandSubmitted(_))) => {
trace!("Command complete");
}
Ok(RpcResult::Success(response)) => {
Ok(ReplyResult::ClientSuccess(response)) => {
warn!(?response, "Unexpected command response");
}
Ok(RpcResult::Failure(error)) => {
Ok(ReplyResult::Failure(error)) => {
error!(?error, "Failure executing command");
}
Err(error) => {
Expand Down Expand Up @@ -248,7 +254,7 @@ fn console_loop(
return;
}
match events_recv(client_id, &narr_sub_socket) {
Ok(ConnectionEvent::Narrative(_, msg)) => {
Ok(ClientEvent::Narrative(_, msg)) => {
let var = match msg.event() {
moor_values::tasks::Event::Notify(s, _content_type) => s,
};
Expand All @@ -262,21 +268,21 @@ fn console_loop(
}
}
}
Ok(ConnectionEvent::SystemMessage(o, msg)) => {
Ok(ClientEvent::SystemMessage(o, msg)) => {
printer
.print(format!("System message from {}: {}", o.yellow(), msg.red()))
.unwrap();
}
Ok(ConnectionEvent::Disconnect()) => {
Ok(ClientEvent::Disconnect()) => {
printer
.print("Received disconnect event; Session ending.".to_string())
.unwrap();
return;
}
Ok(ConnectionEvent::TaskError(e)) => {
Ok(ClientEvent::TaskError(e)) => {
printer.print(format!("Error: {:?}", e)).unwrap();
}
Ok(ConnectionEvent::TaskSuccess(result)) => {
Ok(ClientEvent::TaskSuccess(result)) => {
printer.print(format!("=> {:?}", result)).unwrap();
}
Err(error) => {
Expand All @@ -288,7 +294,7 @@ fn console_loop(
.unwrap();
return;
}
Ok(ConnectionEvent::RequestInput(requested_input_id)) => {
Ok(ClientEvent::RequestInput(requested_input_id)) => {
*output_input_request_id.lock().unwrap() =
Some(Uuid::from_u128(requested_input_id));
}
Expand All @@ -297,7 +303,7 @@ fn console_loop(

let mut broadcast_subscriber = zmq_ctx.socket(zmq::SUB)?;
broadcast_subscriber.connect(narrative_server)?;
broadcast_subscriber.set_subscribe(BROADCAST_TOPIC)?;
broadcast_subscriber.set_subscribe(CLIENT_BROADCAST_TOPIC)?;

let broadcast_rpc_socket = zmq_ctx.socket(zmq::REQ)?;
broadcast_rpc_socket.connect(rpc_server)?;
Expand All @@ -310,10 +316,13 @@ fn console_loop(
return;
}
match broadcast_recv(&mut broadcast_subscriber) {
Ok(BroadcastEvent::PingPong(_)) => {
Ok(ClientsBroadcastEvent::PingPong(_)) => {
if let Err(e) = broadcast_rpc_client.make_rpc_call(
client_id,
RpcRequest::Pong(broadcast_client_token.clone(), SystemTime::now()),
HostClientToDaemonMessage::ClientPong(

Check failure on line 322 in crates/console-host/src/main.rs

View workflow job for this annotation

GitHub Actions / clippy

this enum variant takes 5 arguments but 2 arguments were supplied

error[E0061]: this enum variant takes 5 arguments but 2 arguments were supplied --> crates/console-host/src/main.rs:322:21 | 322 | HostClientToDaemonMessage::ClientPong( | _____________________^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^- 323 | | broadcast_client_token.clone(), 324 | | SystemTime::now(), 325 | | ), | |_____________________- three arguments of type `moor_values::Objid`, `rpc_common::HostType`, and `std::net::SocketAddr` are missing | note: tuple variant defined here --> /home/runner/work/moor/moor/crates/rpc-common/src/lib.rs:134:5 | 134 | ClientPong(ClientToken, SystemTime, Objid, HostType, SocketAddr), | ^^^^^^^^^^ help: provide the arguments | 322 | HostClientToDaemonMessage::ClientPong(broadcast_client_token.clone(), SystemTime::now(), /* moor_values::Objid */, /* rpc_common::HostType */, /* std::net::SocketAddr */), | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
broadcast_client_token.clone(),
SystemTime::now(),
),
) {
error!("Error sending pong: {:?}", e);
return;
Expand Down
4 changes: 2 additions & 2 deletions crates/daemon/src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use uuid::Uuid;

use moor_kernel::tasks::sessions::SessionError;
use moor_values::Objid;
use rpc_common::RpcRequestError;
use rpc_common::RpcMessageError;

pub const CONNECTION_TIMEOUT_DURATION: Duration = Duration::from_secs(30);

Expand All @@ -37,7 +37,7 @@ pub trait ConnectionsDB {
client_id: Uuid,
hostname: String,
player: Option<Objid>,
) -> Result<Objid, RpcRequestError>;
) -> Result<Objid, RpcMessageError>;

/// Record activity for the given client.
fn record_client_activity(&self, client_id: Uuid, connobj: Objid) -> Result<(), eyre::Error>;
Expand Down
4 changes: 2 additions & 2 deletions crates/daemon/src/connections_rb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use moor_kernel::tasks::sessions::SessionError;
use moor_values::AsByteBuffer;
use moor_values::Objid;
use relbox::{relation_info_for, RelBox, RelationId, RelationInfo, Transaction};
use rpc_common::RpcRequestError;
use rpc_common::RpcMessageError;

use crate::connections::{ConnectionsDB, CONNECTION_TIMEOUT_DURATION};

Expand Down Expand Up @@ -191,7 +191,7 @@ impl ConnectionsDB for ConnectionsRb {
client_id: Uuid,
hostname: String,
player: Option<Objid>,
) -> Result<Objid, RpcRequestError> {
) -> Result<Objid, RpcMessageError> {
let connection_oid = match player {
None => {
// The connection object is pulled from the sequence, then we invert it and subtract from
Expand Down
6 changes: 3 additions & 3 deletions crates/daemon/src/connections_wt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use moor_kernel::tasks::sessions::SessionError;
use moor_values::model::{CommitResult, ValSet};
use moor_values::Objid;
use moor_values::{AsByteBuffer, DecodingError, EncodingError};
use rpc_common::RpcRequestError;
use rpc_common::RpcMessageError;

use crate::connections::{ConnectionsDB, CONNECTION_TIMEOUT_DURATION};
use crate::connections_wt::ConnectionRelation::{
Expand Down Expand Up @@ -280,7 +280,7 @@ impl ConnectionsDB for ConnectionsWT {
client_id: Uuid,
hostname: String,
player: Option<Objid>,
) -> Result<Objid, RpcRequestError> {
) -> Result<Objid, RpcMessageError> {
retry_tx_action(&self.db, |tx| {
let connection_oid = match player {
None => {
Expand All @@ -304,7 +304,7 @@ impl ConnectionsDB for ConnectionsWT {

Ok(connection_oid)
})
.map_err(|e| RpcRequestError::InternalError(e.to_string()))
.map_err(|e| RpcMessageError::InternalError(e.to_string()))
}

fn record_client_activity(&self, client_id: Uuid, _connobj: Objid) -> Result<(), Error> {
Expand Down
25 changes: 11 additions & 14 deletions crates/daemon/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,15 @@ struct Args {
#[arg(
long,
value_name = "public_key",
help = "file containing a pkcs8 ed25519 public key, used for authenticating client connections",
help = "file containing a pkcs8 ed25519 public key, used for authenticating client & host connections",
default_value = "public_key.pem"
)]
public_key: PathBuf,

#[arg(
long,
value_name = "private_key",
help = "file containing a pkcs8 ed25519 private key, used for authenticating client connections",
help = "file containing a pkcs8 ed25519 private key, used for authenticating client & host connections",
default_value = "private_key.pem"
)]
private_key: PathBuf,
Expand Down Expand Up @@ -355,12 +355,6 @@ fn main() -> Result<(), Report> {
}
};

// The pieces from core we're going to use:
// Our DB.
// Our scheduler.
let scheduler = Scheduler::new(database, tasks_db, config.clone());
let scheduler_client = scheduler.client().expect("Failed to get scheduler client");

// We have to create the RpcServer before starting the scheduler because we need to pass it in
// as a parameter to the scheduler for background session construction.

Expand All @@ -374,17 +368,22 @@ fn main() -> Result<(), Report> {
zmq_ctx.clone(),
args.events_listen.as_str(),
args.db_flavour,
config,
config.clone(),
));
let kill_switch = rpc_server.kill_switch();

// The pieces from core we're going to use:
// Our DB.
// Our scheduler.
let scheduler = Scheduler::new(database, tasks_db, config, rpc_server.clone());
let scheduler_client = scheduler.client().expect("Failed to get scheduler client");

// The scheduler thread:
let scheduler_rpc_server = rpc_server.clone();
let scheduler_loop_jh = std::thread::Builder::new()
.name("moor-scheduler".to_string())
.spawn(move || scheduler.run(scheduler_rpc_server))?;

let kill_switch = Arc::new(std::sync::atomic::AtomicBool::new(false));

// Background DB checkpoint thread.
let checkpoint_kill_switch = kill_switch.clone();
let checkpoint_scheduler_client = scheduler_client.clone();
Expand All @@ -402,15 +401,13 @@ fn main() -> Result<(), Report> {
.expect("Failed to submit checkpoint");
})?;

let rpc_kill_switch = kill_switch.clone();

let rpc_loop_scheduler_client = scheduler_client.clone();
let rpc_listen = args.rpc_listen.clone();
let rpc_loop_thread = std::thread::Builder::new()
.name("moor-rpc".to_string())
.spawn(move || {
rpc_server
.zmq_loop(rpc_listen, rpc_loop_scheduler_client, rpc_kill_switch)
.request_loop(rpc_listen, rpc_loop_scheduler_client)
.expect("RPC thread failed");
})?;

Expand Down
Loading

0 comments on commit ac068cf

Please sign in to comment.