Skip to content

Commit

Permalink
refactor: get rid of wait for relay
Browse files Browse the repository at this point in the history
  • Loading branch information
Larkooo committed Apr 2, 2024
1 parent 5c3d71e commit f69f107
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 30 deletions.
9 changes: 6 additions & 3 deletions crates/torii/client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use dojo_types::packing::unpack;
use dojo_types::schema::Ty;
use dojo_types::WorldMetadata;
use dojo_world::contracts::WorldContractReader;
use futures::lock::Mutex;
use parking_lot::{RwLock, RwLockReadGuard};
use starknet::core::utils::cairo_short_string_to_felt;
use starknet::providers::jsonrpc::HttpTransport;
Expand All @@ -20,6 +21,7 @@ use torii_grpc::client::{EntityUpdateStreaming, ModelDiffsStreaming};
use torii_grpc::proto::world::RetrieveEntitiesResponse;
use torii_grpc::types::schema::Entity;
use torii_grpc::types::{KeysClause, Query};
use torii_relay::client::EventLoop;
use torii_relay::types::Message;

use crate::client::error::{Error, ParseError};
Expand Down Expand Up @@ -99,9 +101,10 @@ impl Client {
})
}

/// Waits for the relay to be ready and listening for messages.
pub async fn wait_for_relay(&self) -> Result<(), Error> {
self.relay_client.command_sender.wait_for_relay().await.map_err(Error::RelayClient)
/// Starts the relay client event loop.
/// This is a blocking call. Spawn this on a separate task.
pub async fn start_relay(&self) {
self.relay_client.event_loop.lock().await.run().await;
}

/// Publishes a message to a topic.
Expand Down
52 changes: 27 additions & 25 deletions crates/torii/libp2p/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ pub struct EventLoop {
#[derive(Debug)]
enum Command {
Publish(Message, oneshot::Sender<Result<MessageId, Error>>),
WaitForRelay(oneshot::Sender<Result<(), Error>>),
}

impl RelayClient {
Expand Down Expand Up @@ -162,47 +161,50 @@ impl CommandSender {

rx.await.expect("Failed to receive response")
}

pub async fn wait_for_relay(&self) -> Result<(), Error> {
let (tx, rx) = oneshot::channel();

self.sender.unbounded_send(Command::WaitForRelay(tx)).expect("Failed to send command");

rx.await.expect("Failed to receive response")
}
}

impl EventLoop {
async fn handle_command(
&mut self,
command: Command,
is_relay_ready: bool,
commands_queue: Arc<Mutex<Vec<Command>>>,
) {
match command {
Command::Publish(data, sender) => {
// if the relay is not ready yet, add the message to the queue
if !is_relay_ready {
commands_queue.lock().await.push(Command::Publish(data, sender));
} else {
sender.send(self.publish(&data)).expect("Failed to send response");
}
}
}
}

pub async fn run(&mut self) {
let mut is_relay_ready = false;
let mut relay_ready_tx = None;
let commands_queue = Arc::new(Mutex::new(Vec::new()));

loop {
// Poll the swarm for new events.
select! {
command = self.command_receiver.select_next_some() => {
match command {
Command::Publish(data, sender) => {
sender.send(self.publish(&data)).expect("Failed to send response");
}
Command::WaitForRelay(sender) => {
if is_relay_ready {
sender.send(Ok(())).expect("Failed to send response");
} else {
relay_ready_tx = Some(sender);
}
}
}
self.handle_command(command, is_relay_ready, commands_queue.clone()).await;
},
event = self.swarm.select_next_some() => {
match event {
SwarmEvent::Behaviour(ClientEvent::Gossipsub(gossipsub::Event::Subscribed { topic, .. })) => {
// Handle behaviour events.
info!(target: LOG_TARGET, topic = ?topic, "Relay ready. Received subscription confirmation.");

is_relay_ready = true;
if let Some(tx) = relay_ready_tx.take() {
tx.send(Ok(())).expect("Failed to send response");
if !is_relay_ready {
is_relay_ready = true;

// Execute all the commands that were queued while the relay was not ready.
for command in commands_queue.lock().await.drain(..) {
self.handle_command(command, is_relay_ready, commands_queue.clone()).await;
}
}
}
SwarmEvent::ConnectionClosed { cause: Some(cause), .. } => {
Expand Down
3 changes: 1 addition & 2 deletions crates/torii/libp2p/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,11 @@ mod test {
});

// Initialize the first client (listener)
let mut client = RelayClient::new("/ip4/127.0.0.1/tcp/9900".to_string())?;
let client = RelayClient::new("/ip4/127.0.0.1/tcp/9900".to_string())?;
tokio::spawn(async move {
client.event_loop.lock().await.run().await;
});

client.command_sender.wait_for_relay().await?;
let mut data = Struct { name: "Message".to_string(), children: vec![] };

data.children.push(Member {
Expand Down

0 comments on commit f69f107

Please sign in to comment.