Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: libp2p relay and gossipsub server #1459

Merged
merged 36 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
5eac3b2
feat: start writing libp2p server
Larkooo Jan 17, 2024
1b99681
feat: working libp2p example
Larkooo Jan 17, 2024
cd25eb4
feat: relay server
Larkooo Jan 17, 2024
911189b
feat: basic chat room server
Larkooo Jan 17, 2024
8a98fd1
feat(server): relay & gossipsub for sending messages in topics (rooms)
Larkooo Jan 17, 2024
1bf2207
feat: set up server in torii-server and satrt working on client
Larkooo Jan 18, 2024
428e191
feat(client): implemented ping & identify and sending messages to topic
Larkooo Jan 18, 2024
65ebed6
ferat: stasrt integrating into torii-client & message channel
Larkooo Jan 18, 2024
ef7251d
feat: integration into torii-client & error handling
Larkooo Jan 19, 2024
41c8496
feat: tests for libp2p server & client and finish integration
Larkooo Jan 19, 2024
6e8ef1a
chore: tests
Larkooo Jan 19, 2024
468bf19
chore: webrtc base
Larkooo Jan 22, 2024
19daadc
fix: server webrtc correctly compiling & use tokio
Larkooo Jan 22, 2024
50dfa6c
chore: tests
Larkooo Jan 22, 2024
7cd7b65
feat: client compiling for wasm
Larkooo Jan 22, 2024
234f9ed
chore: fmt & clippy
Larkooo Jan 22, 2024
0f76b32
chore: update cargo.lock
Larkooo Jan 22, 2024
fed82f5
chore: rebase main branch
Larkooo Jan 23, 2024
33e0e73
fix: deps
Larkooo Jan 23, 2024
c1c4440
feat: wasm tests for client connectivity
Larkooo Jan 23, 2024
55dbc1f
feat: specify port in torii cli and cert/local key for libp2p
Larkooo Jan 23, 2024
b8e8394
chore: clippy & fmt
Larkooo Jan 23, 2024
6205340
feat(client): event loop
Larkooo Jan 23, 2024
53d5aa0
feat(client): add quic support
Larkooo Jan 24, 2024
67e43b0
fix: issue with ping timeout
Larkooo Jan 24, 2024
67c466c
chore: clippy and fmt
Larkooo Jan 24, 2024
9a9f3d3
refactor: review changes
Larkooo Jan 24, 2024
eb84c61
chore: change to torii-relay
Larkooo Jan 24, 2024
87cbc61
chore: fmt
Larkooo Jan 24, 2024
9a21ef9
refactor: use unbounded channel for messages & commands
Larkooo Jan 24, 2024
fdbe680
chore: remove unused imports
Larkooo Jan 24, 2024
70e0f80
chore: clippy torii client
Larkooo Jan 24, 2024
7235fd2
feat: expose full message metadata
Larkooo Jan 24, 2024
b3c2249
chore: prefix cli args with relay
Larkooo Jan 24, 2024
68e5cf0
feat: oneshot channel for commands result and expose mesages on torii…
Larkooo Jan 24, 2024
fbe2874
refactor(server): better error handling & type scopes
Larkooo Jan 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,303 changes: 2,137 additions & 166 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 1 addition & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ torii-core = { path = "crates/torii/core" }
torii-graphql = { path = "crates/torii/graphql" }
torii-grpc = { path = "crates/torii/grpc" }
torii-server = { path = "crates/torii/server" }
torii-relay = { path = "crates/torii/libp2p" }

# sozo
sozo-signers = { path = "crates/sozo/signers" }
Expand All @@ -88,7 +89,6 @@ assert_matches = "1.5.0"
async-trait = "0.1.68"
base64 = "0.21.2"
blockifier = { git = "https://github.com/starkware-libs/blockifier", tag = "v0.4.0-rc9.2" }
cairo-lang-casm = "2.4.0"
cairo-lang-compiler = "2.4.0"
cairo-lang-debug = "2.4.0"
cairo-lang-defs = "2.4.0"
Expand All @@ -115,12 +115,10 @@ camino = { version = "1.1.2", features = [ "serde1" ] }
chrono = { version = "0.4.24", features = [ "serde" ] }
clap = { version = "4.2", features = [ "derive" ] }
clap_complete = "4.3"
colored = "2"
console = "0.15.7"
convert_case = "0.6.0"
crypto-bigint = { version = "0.5.3", features = [ "serde" ] }
derive_more = "0.99.17"
env_logger = "0.10.0"
flate2 = "1.0.24"
futures = "0.3.28"
hex = "0.4.3"
Expand All @@ -129,7 +127,6 @@ itertools = "0.10.3"
jsonrpsee = { version = "0.16.2", default-features = false }
lazy_static = "1.4.0"
metrics-process = "1.0.9"
num-bigint = "0.4"
once_cell = "1.0"
parking_lot = "0.12.1"
pretty_assertions = "1.2.1"
Expand Down
1 change: 1 addition & 0 deletions bin/torii/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ tower-http = "0.4.4"
tracing-subscriber.workspace = true
tracing.workspace = true
url.workspace = true
torii-relay.workspace = true

[dev-dependencies]
camino.workspace = true
Expand Down
26 changes: 26 additions & 0 deletions bin/torii/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,23 @@
#[arg(long, value_name = "SOCKET", default_value = ":8080", value_parser = parse_socket_address)]
addr: SocketAddr,

/// Port to serve Libp2p TCP & UDP Quic transports
#[arg(long, value_name = "PORT", default_value = "9090")]
relay_port: u16,

Check warning on line 69 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L69

Added line #L69 was not covered by tests

/// Port to serve Libp2p WebRTC transport
#[arg(long, value_name = "PORT", default_value = "9091")]
relay_webrtc_port: u16,

Check warning on line 73 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L73

Added line #L73 was not covered by tests

/// Path to a local identity key file. If not specified, a new identity will be generated
#[arg(long, value_name = "PATH")]
relay_local_key_path: Option<String>,

/// Path to a local certificate file. If not specified, a new certificate will be generated
/// for WebRTC connections
#[arg(long, value_name = "PATH")]
relay_cert_path: Option<String>,

/// Specify allowed origins for api endpoints (comma-separated list of allowed origins, or "*"
/// for all)
#[arg(long, default_value = "*")]
Expand Down Expand Up @@ -163,6 +180,14 @@
proxy_server.clone(),
);

let mut libp2p_relay_server = torii_relay::server::Relay::new(
args.relay_port,
args.relay_webrtc_port,
args.relay_local_key_path,
args.relay_cert_path,
)
.expect("Failed to start libp2p relay server");

Check warning on line 190 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L183-L190

Added lines #L183 - L190 were not covered by tests
info!(target: "torii::cli", "Starting torii endpoint: {}", format!("http://{}", args.addr));
info!(target: "torii::cli", "Serving Graphql playground: {}\n", format!("http://{}/graphql", args.addr));

Expand All @@ -183,6 +208,7 @@
_ = proxy_server.start(shutdown_tx.subscribe()) => {},
_ = graphql_server => {},
_ = grpc_server => {},
_ = libp2p_relay_server.run() => {},

Check warning on line 211 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L211

Added line #L211 was not covered by tests
};

Ok(())
Expand Down
2 changes: 2 additions & 0 deletions crates/torii/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ starknet.workspace = true
thiserror.workspace = true
tokio = { version = "1.32.0", features = [ "sync" ], default-features = false }
torii-grpc = { path = "../grpc", features = [ "client" ] }
torii-relay = { path = "../libp2p" }
url.workspace = true
libp2p-gossipsub = "0.46.1"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
prost.workspace = true
Expand Down
2 changes: 2 additions & 0 deletions crates/torii/client/src/client/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ pub enum Error {
#[error(transparent)]
GrpcClient(#[from] torii_grpc::client::Error),
#[error(transparent)]
RelayClient(#[from] torii_relay::errors::Error),
#[error(transparent)]
Model(#[from] ModelError),
#[error("Unsupported query")]
UnsupportedQuery,
Expand Down
43 changes: 43 additions & 0 deletions crates/torii/client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
use dojo_types::schema::Ty;
use dojo_types::WorldMetadata;
use dojo_world::contracts::WorldContractReader;
use futures::channel::mpsc::UnboundedReceiver;
use futures_util::lock::Mutex;
use parking_lot::{RwLock, RwLockReadGuard};
use starknet::core::utils::cairo_short_string_to_felt;
use starknet::providers::jsonrpc::HttpTransport;
Expand All @@ -20,6 +22,7 @@
use torii_grpc::proto::world::RetrieveEntitiesResponse;
use torii_grpc::types::schema::Entity;
use torii_grpc::types::{KeysClause, Query};
use torii_relay::client::Message;

use crate::client::error::{Error, ParseError};
use crate::client::storage::ModelStorage;
Expand All @@ -34,6 +37,8 @@
metadata: Arc<RwLock<WorldMetadata>>,
/// The grpc client.
inner: AsyncRwLock<torii_grpc::client::WorldClient>,
/// Libp2p client.
relay_client: torii_relay::client::RelayClient,
/// Model storage
storage: Arc<ModelStorage>,
/// Models the client are subscribed to.
Expand All @@ -49,11 +54,14 @@
pub async fn new(
torii_url: String,
rpc_url: String,
libp2p_relay_url: String,

Check warning on line 57 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L57

Added line #L57 was not covered by tests
world: FieldElement,
models_keys: Option<Vec<KeysClause>>,
) -> Result<Self, Error> {
let mut grpc_client = torii_grpc::client::WorldClient::new(torii_url, world).await?;

let libp2p_client = torii_relay::client::RelayClient::new(libp2p_relay_url)?;

Check warning on line 63 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L63

Added line #L63 was not covered by tests

let metadata = grpc_client.metadata().await?;

let shared_metadata: Arc<_> = RwLock::new(metadata).into();
Expand Down Expand Up @@ -88,10 +96,45 @@
metadata: shared_metadata,
sub_client_handle: OnceCell::new(),
inner: AsyncRwLock::new(grpc_client),
relay_client: libp2p_client,

Check warning on line 99 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L99

Added line #L99 was not covered by tests
subscribed_models: subbed_models,
})
}

/// Subscribes to a topic.
/// Returns true if the topic was subscribed to.
/// Returns false if the topic was already subscribed to.
pub async fn subscribe_topic(&mut self, topic: String) -> Result<bool, Error> {
self.relay_client.command_sender.subscribe(topic).await.map_err(Error::RelayClient)
}

Check warning on line 109 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L107-L109

Added lines #L107 - L109 were not covered by tests

/// Unsubscribes from a topic.
/// Returns true if the topic was subscribed to.
pub async fn unsubscribe_topic(&mut self, topic: String) -> Result<bool, Error> {
self.relay_client.command_sender.unsubscribe(topic).await.map_err(Error::RelayClient)
}

Check warning on line 115 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L113-L115

Added lines #L113 - L115 were not covered by tests

/// Publishes a message to a topic.
/// Returns the message id.
pub async fn publish_message(&mut self, topic: &str, message: &[u8]) -> Result<Vec<u8>, Error> {
self.relay_client
.command_sender
.publish(topic.to_string(), message.to_vec())
.await
.map_err(Error::RelayClient)
.map(|m| m.0)
}

Check warning on line 126 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L119-L126

Added lines #L119 - L126 were not covered by tests

/// Runs the libp2p event loop which processes incoming messages and commands.
/// And sends events in the channel
pub async fn run_libp2p(&mut self) {
self.relay_client.event_loop.run().await;
}

Check warning on line 132 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L130-L132

Added lines #L130 - L132 were not covered by tests

pub fn libp2p_message_stream(&self) -> Arc<Mutex<UnboundedReceiver<Message>>> {
self.relay_client.message_receiver.clone()
}

Check warning on line 136 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L134-L136

Added lines #L134 - L136 were not covered by tests

/// Returns a read lock on the World metadata that the client is connected to.
pub fn metadata(&self) -> RwLockReadGuard<'_, WorldMetadata> {
self.metadata.read()
Expand Down
36 changes: 36 additions & 0 deletions crates/torii/libp2p/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
[package]
edition.workspace = true
license-file.workspace = true
name = "torii-relay"
repository.workspace = true
version.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
futures.workspace = true
rand = "0.8.5"
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing.workspace = true
async-trait = "0.1.77"
regex = "1.10.3"
anyhow.workspace = true

[dev-dependencies]
tempfile = "3.9.0"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio.workspace = true
libp2p = { git = "https://github.com/libp2p/rust-libp2p", features = [ "ed25519", "gossipsub", "identify", "macros", "noise", "ping", "quic", "relay", "tcp", "tokio", "yamux" ] }
libp2p-webrtc = { git = "https://github.com/libp2p/rust-libp2p", features = [ "tokio", "pem" ] }

[target.'cfg(target_arch = "wasm32")'.dependencies]
libp2p = { git = "https://github.com/libp2p/rust-libp2p", features = [ "ed25519", "gossipsub", "identify", "macros", "ping", "tcp", "wasm-bindgen" ] }
libp2p-webrtc-websys = { git = "https://github.com/libp2p/rust-libp2p" }
tracing-wasm = "0.2.1"
wasm-bindgen-test = "0.3.40"
wasm-bindgen-futures = "0.4.40"
wasm-timer = "0.2.5"
27 changes: 27 additions & 0 deletions crates/torii/libp2p/src/client/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use gossipsub::Event as GossipsubEvent;
use libp2p::{gossipsub, identify, ping};

#[derive(Debug)]

Check warning on line 4 in crates/torii/libp2p/src/client/events.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/libp2p/src/client/events.rs#L4

Added line #L4 was not covered by tests
pub(crate) enum ClientEvent {
Gossipsub(GossipsubEvent),
Identify(identify::Event),
Ping(ping::Event),
}

impl From<GossipsubEvent> for ClientEvent {
fn from(event: GossipsubEvent) -> Self {
Self::Gossipsub(event)
}
}

impl From<identify::Event> for ClientEvent {
fn from(event: identify::Event) -> Self {
Self::Identify(event)
}
}

impl From<ping::Event> for ClientEvent {
fn from(event: ping::Event) -> Self {
Self::Ping(event)
}
}
Loading
Loading