From 0a5aee67e241385ee90804b209887c9e022d2aed Mon Sep 17 00:00:00 2001 From: Larko <59736843+Larkooo@users.noreply.github.com> Date: Tue, 30 Jan 2024 14:34:26 -0500 Subject: [PATCH] fix(relay): borrow issues with wasm client (#1494) * fix: borrow? * fix: mut? * feat: use mutex for eventloop and return it * chore: typo & fmt clippy * refactor: return mutexguard * refactor: return arc cloen --- crates/torii/client/src/client/mod.rs | 24 +++++++++++++----------- crates/torii/libp2p/src/client/mod.rs | 6 +++--- crates/torii/libp2p/src/tests.rs | 4 ++-- 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/crates/torii/client/src/client/mod.rs b/crates/torii/client/src/client/mod.rs index 6fe4276212..65be70ba73 100644 --- a/crates/torii/client/src/client/mod.rs +++ b/crates/torii/client/src/client/mod.rs @@ -22,7 +22,7 @@ use torii_grpc::client::{EntityUpdateStreaming, ModelDiffsStreaming}; use torii_grpc::proto::world::RetrieveEntitiesResponse; use torii_grpc::types::schema::Entity; use torii_grpc::types::{KeysClause, Query}; -use torii_relay::client::Message; +use torii_relay::client::{EventLoop, Message}; use crate::client::error::{Error, ParseError}; use crate::client::storage::ModelStorage; @@ -37,7 +37,7 @@ pub struct Client { metadata: Arc>, /// The grpc client. inner: AsyncRwLock, - /// Libp2p client. + /// Relay client. relay_client: torii_relay::client::RelayClient, /// Model storage storage: Arc, @@ -54,13 +54,13 @@ impl Client { pub async fn new( torii_url: String, rpc_url: String, - libp2p_relay_url: String, + relay_url: String, world: FieldElement, models_keys: Option>, ) -> Result { let mut grpc_client = torii_grpc::client::WorldClient::new(torii_url, world).await?; - let libp2p_client = torii_relay::client::RelayClient::new(libp2p_relay_url)?; + let relay_client = torii_relay::client::RelayClient::new(relay_url)?; let metadata = grpc_client.metadata().await?; @@ -96,7 +96,7 @@ impl Client { metadata: shared_metadata, sub_client_handle: OnceCell::new(), inner: AsyncRwLock::new(grpc_client), - relay_client: libp2p_client, + relay_client, subscribed_models: subbed_models, }) } @@ -125,13 +125,14 @@ impl Client { .map(|m| m.0) } - /// Runs the libp2p event loop which processes incoming messages and commands. - /// And sends events in the channel - pub async fn run_libp2p(&mut self) { - self.relay_client.event_loop.run().await; + /// Returns the event loop of the relay client. + /// Which can then be used to run the relay client + pub fn relay_client_runner(&self) -> Arc> { + self.relay_client.event_loop.clone() } - pub fn libp2p_message_stream(&self) -> Arc>> { + /// Returns the message receiver of the relay client. + pub fn relay_client_stream(&self) -> Arc>> { self.relay_client.message_receiver.clone() } @@ -209,7 +210,8 @@ impl Client { /// Initiate the model subscriptions and returns a [SubscriptionService] which when await'ed /// will execute the subscription service and starts the syncing process. pub async fn start_subscription(&self) -> Result { - let models_keys = self.subscribed_models.models_keys.read().clone().into_iter().collect(); + let models_keys: Vec = + self.subscribed_models.models_keys.read().clone().into_iter().collect(); let sub_res_stream = self.initiate_subscription(models_keys).await?; let (service, handle) = SubscriptionService::new( diff --git a/crates/torii/libp2p/src/client/mod.rs b/crates/torii/libp2p/src/client/mod.rs index 6c9d20c361..db51695d65 100644 --- a/crates/torii/libp2p/src/client/mod.rs +++ b/crates/torii/libp2p/src/client/mod.rs @@ -29,7 +29,7 @@ struct Behaviour { pub struct RelayClient { pub message_receiver: Arc>>, pub command_sender: CommandSender, - pub event_loop: EventLoop, + pub event_loop: Arc>, } pub struct EventLoop { @@ -106,7 +106,7 @@ impl RelayClient { Ok(Self { command_sender: CommandSender::new(command_sender), message_receiver: Arc::new(Mutex::new(message_receiver)), - event_loop: EventLoop { swarm, message_sender, command_receiver }, + event_loop: Arc::new(Mutex::new(EventLoop { swarm, message_sender, command_receiver })), }) } @@ -153,7 +153,7 @@ impl RelayClient { Ok(Self { command_sender: CommandSender::new(command_sender), message_receiver: Arc::new(Mutex::new(message_receiver)), - event_loop: EventLoop { swarm, message_sender, command_receiver }, + event_loop: Arc::new(Mutex::new(EventLoop { swarm, message_sender, command_receiver })), }) } } diff --git a/crates/torii/libp2p/src/tests.rs b/crates/torii/libp2p/src/tests.rs index 460779aa24..6cd168b7c9 100644 --- a/crates/torii/libp2p/src/tests.rs +++ b/crates/torii/libp2p/src/tests.rs @@ -34,7 +34,7 @@ mod test { // Initialize the first client (listener) let mut client = RelayClient::new("/ip4/127.0.0.1/tcp/9090".to_string())?; tokio::spawn(async move { - client.event_loop.run().await; + client.event_loop.lock().await.run().await; }); client.command_sender.subscribe("mawmaw".to_string()).await?; @@ -78,7 +78,7 @@ mod test { )?; spawn_local(async move { - client.event_loop.run().await; + client.event_loop.lock().await.run().await; }); // Give some time for the client to start up