Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust): rewrote router into a state container #8641

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/rust/get_started/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,6 @@ fn vault_and_identity() -> Result<(), Error> {

// Assert successful run conditions
assert_eq!(Some(0), exitcode);
assert!(stdout.contains("No more workers left. Goodbye!"));
assert!(stdout.contains("No more workers left. Goodbye!"));
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,6 @@ impl ConnectionBuilder {
if last_pass && is_last {
let is_terminal = ctx
.get_metadata(address.clone())
.await
.ok()
.flatten()
.map(|m| m.is_terminal)
.unwrap_or(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ impl NodeManager {
)));
}

if ctx.is_worker_registered_at(addr.clone()).await? {
if ctx.is_worker_registered_at(&addr) {
ctx.stop_worker(addr.clone()).await?
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,8 @@ impl NodeManagerWorker {
&self,
ctx: &Context,
) -> Result<Response<WorkerList>, Response<Error>> {
let workers = match ctx.list_workers().await {
Err(e) => Err(Response::internal_error_no_request(&e.to_string())),
Ok(workers) => Ok(workers),
}?;

let list = workers
let list = ctx
.list_workers()
.into_iter()
.map(|addr| WorkerStatus::new(addr.address()))
.collect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,7 @@ impl TestNode {
}

pub async fn create(runtime: Arc<Runtime>, listen_addr: Option<&str>) -> Self {
let (mut context, mut executor) = NodeBuilder::new().with_runtime(runtime.clone()).build();
runtime.spawn(async move {
executor.start_router().await.expect("cannot start router");
});
let (mut context, _executor) = NodeBuilder::new().with_runtime(runtime.clone()).build();
let node_manager_handle = start_manager_for_tests(
&mut context,
listen_addr,
Expand Down
6 changes: 3 additions & 3 deletions implementations/rust/ockam/ockam_api/tests/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ async fn authority_starts_with_default_configuration(ctx: &mut Context) -> Resul
let configuration = default_configuration().await?;
start_authority_node(ctx, &configuration).await?;

let workers = ctx.list_workers().await?;
let workers = ctx.list_workers();

assert!(!workers.contains(&Address::from(DefaultAddress::DIRECT_AUTHENTICATOR)));
assert!(!workers.contains(&Address::from(DefaultAddress::ENROLLMENT_TOKEN_ACCEPTOR)));
Expand All @@ -28,7 +28,7 @@ async fn authority_starts_direct_authenticator(ctx: &mut Context) -> Result<()>
configuration.no_direct_authentication = false;
start_authority_node(ctx, &configuration).await?;

let workers = ctx.list_workers().await?;
let workers = ctx.list_workers();

assert!(workers.contains(&Address::from(DefaultAddress::DIRECT_AUTHENTICATOR)));
assert!(!workers.contains(&Address::from(DefaultAddress::ENROLLMENT_TOKEN_ACCEPTOR)));
Expand All @@ -46,7 +46,7 @@ async fn authority_starts_enrollment_token(ctx: &mut Context) -> Result<()> {
configuration.no_token_enrollment = false;
start_authority_node(ctx, &configuration).await?;

let workers = ctx.list_workers().await?;
let workers = ctx.list_workers();

assert!(!workers.contains(&Address::from(DefaultAddress::DIRECT_AUTHENTICATOR)));
assert!(workers.contains(&Address::from(DefaultAddress::ENROLLMENT_TOKEN_ACCEPTOR)));
Expand Down
10 changes: 2 additions & 8 deletions implementations/rust/ockam/ockam_api/tests/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,18 +114,12 @@ async fn start_monitoring__available__should_be_up_fast(ctx: &mut Context) -> Re
ctx.start_worker(Address::from_string("echo"), MockEchoer::new())
.await?;

assert!(
!ctx.is_worker_registered_at(session.collector_address().clone())
.await?
);
assert!(!ctx.is_worker_registered_at(session.collector_address()));

// Start the Session in a separate task
session.start_monitoring().await?;

assert!(
ctx.is_worker_registered_at(session.collector_address().clone())
.await?
);
assert!(ctx.is_worker_registered_at(session.collector_address()));

let mut time_to_restore = 0;

Expand Down
12 changes: 2 additions & 10 deletions implementations/rust/ockam/ockam_app_lib/src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,20 +98,12 @@ impl AppState {
) -> Result<AppState> {
let cli_state = CliState::with_default_dir()?;
let rt = Arc::new(Runtime::new().expect("cannot create a tokio runtime"));
let (context, mut executor) = NodeBuilder::new()
let (context, _executor) = NodeBuilder::new()
.no_logging()
.with_runtime(rt.clone())
.build();
let context = Arc::new(context);

// start the router, it is needed for the node manager creation
rt.spawn(async move {
let result = executor.start_router().await;
if let Err(e) = result {
error!(%e, "Failed to start the router")
}
});

let runtime = context.runtime().clone();
let future = async {
Self::make(
Expand Down Expand Up @@ -327,7 +319,7 @@ impl AppState {

info!("stopped the old node manager");

for w in self.context.list_workers().await.into_diagnostic()? {
for w in self.context.list_workers() {
let _ = self.context.stop_worker(w.address()).await;
}
info!("stopped all the ctx workers");
Expand Down
6 changes: 5 additions & 1 deletion implementations/rust/ockam/ockam_core/src/compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ pub mod sync {
/// Wrap `spin::RwLock` as it does not return LockResult<Guard> like `std::sync::Mutex`.
#[derive(Debug)]
pub struct RwLock<T>(spin::RwLock<T>);

/// Wrap `spin::RwLockWriteGuard`
pub type RwLockWriteGuard<'a, T> = spin::RwLockWriteGuard<'a, T>;

impl<T> RwLock<T> {
/// Creates a new spinlock wrapping the supplied data.
pub fn new(value: T) -> Self {
Expand Down Expand Up @@ -290,7 +294,7 @@ pub mod sync {
#[cfg(feature = "std")]
pub mod sync {
pub use std::sync::Arc;
pub use std::sync::{Mutex, RwLock};
pub use std::sync::{Mutex, RwLock, RwLockWriteGuard};
}

/// Provides `std::task` for `no_std` targets.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1083,7 +1083,7 @@ async fn should_stop_encryptor__and__decryptor__in__secure_channel(
0
);

let workers = ctx.list_workers().await?;
let workers = ctx.list_workers();
assert!(!workers.contains(channel1.decryptor_messaging_address()));
assert!(!workers.contains(channel1.encryptor_messaging_address()));
assert!(!workers.contains(channel2.decryptor_messaging_address()));
Expand Down
3 changes: 0 additions & 3 deletions implementations/rust/ockam/ockam_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,6 @@ no_std = ["ockam_core/no_std", "ockam_transport_core/no_std", "heapless"]
# Feature: "alloc" enables support for heap allocation (implied by `feature = "std"`)
alloc = ["ockam_core/alloc", "ockam_executor/alloc", "futures/alloc", "minicbor/alloc"]

# Feature: "dump_internals" when set, will dump the internal state of
# workers at startup via the trace! macro.
dump_internals = []
# TODO should these features be combined?
metrics = []

Expand Down
23 changes: 7 additions & 16 deletions implementations/rust/ockam/ockam_node/src/async_drop.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use crate::tokio::sync::{
mpsc::Sender as DefaultSender,
oneshot::{self, Receiver, Sender},
};
use crate::NodeMessage;
use crate::router::Router;
use crate::tokio::sync::oneshot::{self, Receiver, Sender};
use alloc::sync::Arc;
use ockam_core::Address;

/// A helper to implement Drop mechanisms, but async
Expand All @@ -19,7 +17,7 @@ use ockam_core::Address;
/// additional metadata to generate messages.
pub struct AsyncDrop {
rx: Receiver<Address>,
sender: DefaultSender<NodeMessage>,
router: Arc<Router>,
}

impl AsyncDrop {
Expand All @@ -29,9 +27,9 @@ impl AsyncDrop {
/// Context that creates this hook, while the `address` field must
/// refer to the address of the context that will be deallocated
/// this way.
pub fn new(sender: DefaultSender<NodeMessage>) -> (Self, Sender<Address>) {
pub fn new(router: Arc<Router>) -> (Self, Sender<Address>) {
let (tx, rx) = oneshot::channel();
(Self { rx, sender }, tx)
(Self { rx, router }, tx)
}

/// Wait for the cancellation of the channel and then send a
Expand All @@ -42,16 +40,9 @@ impl AsyncDrop {
pub async fn run(self) {
if let Ok(addr) = self.rx.await {
debug!("Received AsyncDrop request for address: {}", addr);

let (msg, mut reply) = NodeMessage::stop_worker(addr, true);
if let Err(e) = self.sender.send(msg).await {
if let Err(e) = self.router.stop_worker(&addr, true).await {
debug!("Failed sending AsyncDrop request to router: {}", e);
}

// Then check that address was properly shut down
if reply.recv().await.is_none() {
debug!("AsyncDrop router reply was None");
}
}
}
}
Loading
Loading