Skip to content

Commit

Permalink
fix(relay): borrow issues with wasm client (#1494)
Browse files Browse the repository at this point in the history
* fix: borrow?

* fix: mut?

* feat: use mutex for eventloop and return it

* chore: typo & fmt clippy

* refactor: return mutexguard

* refactor: return arc cloen
  • Loading branch information
Larkooo authored Jan 30, 2024
1 parent 1c0ea09 commit 0a5aee6
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 16 deletions.
24 changes: 13 additions & 11 deletions crates/torii/client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use torii_grpc::client::{EntityUpdateStreaming, ModelDiffsStreaming};
use torii_grpc::proto::world::RetrieveEntitiesResponse;
use torii_grpc::types::schema::Entity;
use torii_grpc::types::{KeysClause, Query};
use torii_relay::client::Message;
use torii_relay::client::{EventLoop, Message};

use crate::client::error::{Error, ParseError};
use crate::client::storage::ModelStorage;
Expand All @@ -37,7 +37,7 @@ pub struct Client {
metadata: Arc<RwLock<WorldMetadata>>,
/// The grpc client.
inner: AsyncRwLock<torii_grpc::client::WorldClient>,
/// Libp2p client.
/// Relay client.
relay_client: torii_relay::client::RelayClient,
/// Model storage
storage: Arc<ModelStorage>,
Expand All @@ -54,13 +54,13 @@ impl Client {
pub async fn new(
torii_url: String,
rpc_url: String,
libp2p_relay_url: String,
relay_url: String,
world: FieldElement,
models_keys: Option<Vec<KeysClause>>,
) -> Result<Self, Error> {
let mut grpc_client = torii_grpc::client::WorldClient::new(torii_url, world).await?;

let libp2p_client = torii_relay::client::RelayClient::new(libp2p_relay_url)?;
let relay_client = torii_relay::client::RelayClient::new(relay_url)?;

let metadata = grpc_client.metadata().await?;

Expand Down Expand Up @@ -96,7 +96,7 @@ impl Client {
metadata: shared_metadata,
sub_client_handle: OnceCell::new(),
inner: AsyncRwLock::new(grpc_client),
relay_client: libp2p_client,
relay_client,
subscribed_models: subbed_models,
})
}
Expand Down Expand Up @@ -125,13 +125,14 @@ impl Client {
.map(|m| m.0)
}

/// Runs the libp2p event loop which processes incoming messages and commands.
/// And sends events in the channel
pub async fn run_libp2p(&mut self) {
self.relay_client.event_loop.run().await;
/// Returns the event loop of the relay client.
/// Which can then be used to run the relay client
pub fn relay_client_runner(&self) -> Arc<Mutex<EventLoop>> {
self.relay_client.event_loop.clone()
}

pub fn libp2p_message_stream(&self) -> Arc<Mutex<UnboundedReceiver<Message>>> {
/// Returns the message receiver of the relay client.
pub fn relay_client_stream(&self) -> Arc<Mutex<UnboundedReceiver<Message>>> {
self.relay_client.message_receiver.clone()
}

Expand Down Expand Up @@ -209,7 +210,8 @@ impl Client {
/// Initiate the model subscriptions and returns a [SubscriptionService] which when await'ed
/// will execute the subscription service and starts the syncing process.
pub async fn start_subscription(&self) -> Result<SubscriptionService, Error> {
let models_keys = self.subscribed_models.models_keys.read().clone().into_iter().collect();
let models_keys: Vec<KeysClause> =
self.subscribed_models.models_keys.read().clone().into_iter().collect();
let sub_res_stream = self.initiate_subscription(models_keys).await?;

let (service, handle) = SubscriptionService::new(
Expand Down
6 changes: 3 additions & 3 deletions crates/torii/libp2p/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ struct Behaviour {
pub struct RelayClient {
pub message_receiver: Arc<Mutex<UnboundedReceiver<Message>>>,
pub command_sender: CommandSender,
pub event_loop: EventLoop,
pub event_loop: Arc<Mutex<EventLoop>>,
}

pub struct EventLoop {
Expand Down Expand Up @@ -106,7 +106,7 @@ impl RelayClient {
Ok(Self {
command_sender: CommandSender::new(command_sender),
message_receiver: Arc::new(Mutex::new(message_receiver)),
event_loop: EventLoop { swarm, message_sender, command_receiver },
event_loop: Arc::new(Mutex::new(EventLoop { swarm, message_sender, command_receiver })),
})
}

Expand Down Expand Up @@ -153,7 +153,7 @@ impl RelayClient {
Ok(Self {
command_sender: CommandSender::new(command_sender),
message_receiver: Arc::new(Mutex::new(message_receiver)),
event_loop: EventLoop { swarm, message_sender, command_receiver },
event_loop: Arc::new(Mutex::new(EventLoop { swarm, message_sender, command_receiver })),
})
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/torii/libp2p/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ mod test {
// Initialize the first client (listener)
let mut client = RelayClient::new("/ip4/127.0.0.1/tcp/9090".to_string())?;
tokio::spawn(async move {
client.event_loop.run().await;
client.event_loop.lock().await.run().await;
});

client.command_sender.subscribe("mawmaw".to_string()).await?;
Expand Down Expand Up @@ -78,7 +78,7 @@ mod test {
)?;

spawn_local(async move {
client.event_loop.run().await;
client.event_loop.lock().await.run().await;
});

// Give some time for the client to start up
Expand Down

0 comments on commit 0a5aee6

Please sign in to comment.