diff --git a/Cargo.lock b/Cargo.lock index 218f55db47..0039641c38 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11489,7 +11489,7 @@ dependencies = [ "torii-core", "torii-graphql", "torii-grpc", - "torii-libp2p", + "torii-relay", "torii-server", "tower", "tower-http", @@ -11521,7 +11521,7 @@ dependencies = [ "tonic 0.10.2", "tonic 0.9.2", "torii-grpc", - "torii-libp2p", + "torii-relay", "url", ] @@ -11640,7 +11640,7 @@ dependencies = [ ] [[package]] -name = "torii-libp2p" +name = "torii-relay" version = "0.5.1-alpha.0" dependencies = [ "anyhow", diff --git a/Cargo.toml b/Cargo.toml index a919949bfb..ff0f243663 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,6 +79,7 @@ torii-core = { path = "crates/torii/core" } torii-graphql = { path = "crates/torii/graphql" } torii-grpc = { path = "crates/torii/grpc" } torii-server = { path = "crates/torii/server" } +torii-relay = { path = "crates/torii/libp2p" } # sozo sozo-signers = { path = "crates/sozo/signers" } diff --git a/bin/torii/Cargo.toml b/bin/torii/Cargo.toml index 49591f3d16..64d9411dbb 100644 --- a/bin/torii/Cargo.toml +++ b/bin/torii/Cargo.toml @@ -43,7 +43,7 @@ tower-http = "0.4.4" tracing-subscriber.workspace = true tracing.workspace = true url.workspace = true -torii-libp2p = { version = "0.5.1-alpha.0", path = "../../crates/torii/libp2p" } +torii-relay.workspace = true [dev-dependencies] camino.workspace = true diff --git a/bin/torii/src/main.rs b/bin/torii/src/main.rs index f5649e93ba..52f3261800 100644 --- a/bin/torii/src/main.rs +++ b/bin/torii/src/main.rs @@ -66,11 +66,11 @@ struct Args { /// Port to serve Libp2p TCP & UDP Quic transports #[arg(long, value_name = "PORT", default_value = "9090")] - port: u16, + relay_port: u16, /// Port to serve Libp2p WebRTC transport #[arg(long, value_name = "PORT", default_value = "9091")] - port_webrtc: u16, + relay_webrtc_port: u16, /// Path to a local identity key file. If not specified, a new identity will be generated #[arg(long, value_name = "PATH")] @@ -180,9 +180,9 @@ async fn main() -> anyhow::Result<()> { proxy_server.clone(), ); - let mut libp2p_relay_server = torii_libp2p::server::Libp2pRelay::new( - args.port, - args.port_webrtc, + let mut libp2p_relay_server = torii_relay::server::Relay::new( + args.relay_port, + args.relay_webrtc_port, args.local_key_path, args.cert_path, ) diff --git a/crates/torii/client/Cargo.toml b/crates/torii/client/Cargo.toml index 0cc1033227..32542a9177 100644 --- a/crates/torii/client/Cargo.toml +++ b/crates/torii/client/Cargo.toml @@ -20,7 +20,7 @@ starknet.workspace = true thiserror.workspace = true tokio = { version = "1.32.0", features = [ "sync" ], default-features = false } torii-grpc = { path = "../grpc", features = [ "client" ] } -torii-libp2p = { path = "../libp2p" } +torii-relay = { path = "../libp2p" } url.workspace = true [target.'cfg(not(target_arch = "wasm32"))'.dependencies] diff --git a/crates/torii/client/src/client/error.rs b/crates/torii/client/src/client/error.rs index b06fba0145..9f9b831939 100644 --- a/crates/torii/client/src/client/error.rs +++ b/crates/torii/client/src/client/error.rs @@ -17,7 +17,7 @@ pub enum Error { #[error(transparent)] GrpcClient(#[from] torii_grpc::client::Error), #[error(transparent)] - Libp2pClient(#[from] torii_libp2p::errors::Error), + RelayClient(#[from] torii_relay::errors::Error), #[error(transparent)] Model(#[from] ModelError), #[error("Unsupported query")] diff --git a/crates/torii/client/src/client/mod.rs b/crates/torii/client/src/client/mod.rs index 11cb87c54a..7decb5ec50 100644 --- a/crates/torii/client/src/client/mod.rs +++ b/crates/torii/client/src/client/mod.rs @@ -10,7 +10,8 @@ use dojo_types::packing::unpack; use dojo_types::schema::Ty; use dojo_types::WorldMetadata; use dojo_world::contracts::WorldContractReader; -use futures::channel::mpsc::UnboundedSender; +use futures::channel::mpsc::{SendError, UnboundedSender}; +use futures_util::{SinkExt, TryFutureExt}; use parking_lot::{RwLock, RwLockReadGuard}; use starknet::core::utils::cairo_short_string_to_felt; use starknet::providers::jsonrpc::HttpTransport; @@ -21,8 +22,8 @@ use torii_grpc::client::{EntityUpdateStreaming, ModelDiffsStreaming}; use torii_grpc::proto::world::RetrieveEntitiesResponse; use torii_grpc::types::schema::Entity; use torii_grpc::types::{KeysClause, Query}; -use torii_libp2p::client::Message; -use torii_libp2p::types::ClientMessage; +use torii_relay::client::Message; +use torii_relay::types::ClientMessage; use crate::client::error::{Error, ParseError}; use crate::client::storage::ModelStorage; @@ -38,7 +39,7 @@ pub struct Client { /// The grpc client. inner: AsyncRwLock, /// Libp2p client. - libp2p_client: torii_libp2p::client::Libp2pClient, + libp2p_client: torii_relay::client::RelayClient, /// Model storage storage: Arc, /// Models the client are subscribed to. @@ -60,7 +61,7 @@ impl Client { ) -> Result { let mut grpc_client = torii_grpc::client::WorldClient::new(torii_url, world).await?; - let libp2p_client = torii_libp2p::client::Libp2pClient::new(libp2p_relay_url)?; + let libp2p_client = torii_relay::client::RelayClient::new(libp2p_relay_url)?; let metadata = grpc_client.metadata().await?; @@ -102,36 +103,45 @@ impl Client { } /// Returns all of the subscribed topics of the libp2p client. - pub fn subscribed_topics(&self) -> HashSet { - self.libp2p_client.topics.keys().cloned().collect() - } + // pub fn subscribed_topics(&self) -> HashSet { + // self.libp2p_client.topics.keys().cloned().collect() + // } /// Subscribes to a topic. /// Returns true if the topic was subscribed to. /// Returns false if the topic was already subscribed to. - pub fn subscribe_topic(&mut self, topic: &str) -> Result { - self.libp2p_client.subscribe(topic).map_err(Error::Libp2pClient) + pub async fn subscribe_topic(&mut self, topic: &str) -> Result<(), SendError> { + self.libp2p_client + .command_sender + .send(torii_relay::client::Command::Subscribe(topic.to_string())) + .await } /// Unsubscribes from a topic. /// Returns true if the topic was subscribed to. - pub fn unsubscribe_topic(&mut self, topic: &str) -> Result { - self.libp2p_client.unsubscribe(topic).map_err(Error::Libp2pClient) + pub async fn unsubscribe_topic(&mut self, topic: &str) -> Result<(), SendError> { + self.libp2p_client + .command_sender + .send(torii_relay::client::Command::Unsubscribe(topic.to_string())) + .await } /// Publishes a message to a topic. /// Returns the message id. - pub fn publish_message(&mut self, topic: &str, message: &[u8]) -> Result, Error> { + pub async fn publish_message(&mut self, topic: &str, message: &[u8]) -> Result<(), SendError> { self.libp2p_client - .publish(&ClientMessage { topic: topic.to_string(), data: message.to_vec() }) - .map(|id| id.0) - .map_err(Error::Libp2pClient) + .command_sender + .send(torii_relay::client::Command::Publish(ClientMessage { + topic: topic.to_string(), + data: message.to_vec(), + })) + .await } - /// Runs the libp2p event listener which processes incoming messages. + /// Runs the libp2p event loop which processes incoming messages and commands. /// And sends events in the channel - pub async fn run_libp2p(&mut self, sender: &UnboundedSender) { - self.libp2p_client.run(sender).await; + pub async fn run_libp2p(&mut self) { + self.libp2p_client.event_loop.run().await; } /// Returns a read lock on the World metadata that the client is connected to.