From eea697bb3eea7afc4610a018f361385cfae57ff5 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Wed, 11 Oct 2023 02:28:03 +0900 Subject: [PATCH] refactor(torii-client): remove reliance on specific async runtime (#1002) --- Cargo.lock | 4 - crates/torii/client/Cargo.toml | 5 -- crates/torii/client/src/client/mod.rs | 89 ++++++++++--------- .../torii/client/src/client/subscription.rs | 47 +++++++--- crates/torii/client/wasm/Cargo.lock | 5 -- crates/torii/client/wasm/src/lib.rs | 9 +- 6 files changed, 85 insertions(+), 74 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ee676ce837..1f7f991805 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7871,7 +7871,6 @@ dependencies = [ "futures", "futures-util", "http", - "js-sys", "parking_lot 0.12.1", "prost 0.11.9", "prost 0.12.1", @@ -7885,9 +7884,6 @@ dependencies = [ "tonic 0.9.2", "torii-grpc", "url", - "wasm-bindgen", - "wasm-bindgen-futures", - "web-sys", ] [[package]] diff --git a/crates/torii/client/Cargo.toml b/crates/torii/client/Cargo.toml index 4320cf72d0..bd96feed87 100644 --- a/crates/torii/client/Cargo.toml +++ b/crates/torii/client/Cargo.toml @@ -19,7 +19,6 @@ serde_json.workspace = true starknet-crypto.workspace = true starknet.workspace = true thiserror.workspace = true -tokio = { version = "1.32.0", default-features = false, features = [ "rt" ] } torii-grpc = { path = "../grpc", features = [ "client" ] } url.workspace = true @@ -28,12 +27,8 @@ prost.workspace = true tonic.workspace = true [target.'cfg(target_arch = "wasm32")'.dependencies] -js-sys = "0.3.64" -wasm-bindgen = "0.2.87" -wasm-bindgen-futures = "0.4.37" wasm-prost.workspace = true wasm-tonic.workspace = true -web-sys = { version = "0.3.4", features = [ 'Window', 'WorkerGlobalScope' ] } [dev-dependencies] camino.workspace = true diff --git a/crates/torii/client/src/client/mod.rs b/crates/torii/client/src/client/mod.rs index 17078a40f7..9ebdec6271 100644 --- a/crates/torii/client/src/client/mod.rs +++ b/crates/torii/client/src/client/mod.rs @@ -2,27 +2,23 @@ pub mod error; pub mod storage; pub mod subscription; +use std::cell::OnceCell; use std::sync::Arc; use dojo_types::packing::unpack; use dojo_types::schema::{EntityModel, Ty}; use dojo_types::WorldMetadata; -use futures::channel::mpsc; -use parking_lot::{Mutex, RwLock}; +use parking_lot::RwLock; use starknet::core::types::{BlockId, BlockTag}; use starknet::core::utils::cairo_short_string_to_felt; use starknet::providers::jsonrpc::HttpTransport; use starknet::providers::JsonRpcClient; use starknet_crypto::FieldElement; -#[cfg(not(target_arch = "wasm32"))] -use tokio::task::spawn as spawn_task; -#[cfg(target_arch = "wasm32")] -use wasm_bindgen_futures::spawn_local as spawn_task; use self::error::{Error, ParseError}; use self::storage::ModelStorage; use self::subscription::{SubscribedEntities, SubscriptionClientHandle}; -use crate::client::subscription::SubscriptionClient; +use crate::client::subscription::SubscriptionService; use crate::contract::world::WorldContractReader; // TODO: expose the World interface from the `Client` @@ -31,22 +27,27 @@ pub struct Client { /// Metadata of the World that the client is connected to. metadata: Arc>, /// The grpc client. - inner: Mutex, + inner: torii_grpc::client::WorldClient, /// Entity storage storage: Arc, /// Entities the client are subscribed to. - entity_subscription: Arc, - /// The subscription client handle - subscription_client_handle: SubscriptionClientHandle, + subscribed_entities: Arc, + /// The subscription client handle. + sub_client_handle: OnceCell, } impl Client { - /// Returns the metadata of the world. - pub fn world_metadata(&self) -> WorldMetadata { + /// Returns a [ClientBuilder] for building a [Client]. + pub fn build() -> ClientBuilder { + ClientBuilder::new() + } + + /// Returns the metadata of the world that the client is connected to. + pub fn metadata(&self) -> WorldMetadata { self.metadata.read().clone() } - /// Returns the component value of an entity. + /// Returns the model value of an entity. pub fn entity(&self, model: &str, keys: &[FieldElement]) -> Option { let Ok(Some(raw_values)) = self.storage.get_entity(cairo_short_string_to_felt(model).ok()?, keys) @@ -67,7 +68,23 @@ impl Client { /// Returns the list of entities that the client is subscribed to. pub fn synced_entities(&self) -> Vec { - self.entity_subscription.entities.read().clone().into_iter().collect() + self.subscribed_entities.entities.read().clone().into_iter().collect() + } + + /// Initiate the entity subscriptions and returns a [SubscriptionService] which when await'ed + /// will execute the subscription service and starts the syncing process. + pub async fn start_subscription(&mut self) -> Result { + let sub_res_stream = self.inner.subscribe_entities(self.synced_entities()).await?; + + let (service, handle) = SubscriptionService::new( + Arc::clone(&self.storage), + Arc::clone(&self.metadata), + Arc::clone(&self.subscribed_entities), + sub_res_stream, + ); + + self.sub_client_handle.set(handle).unwrap(); + Ok(service) } } @@ -83,6 +100,16 @@ impl ClientBuilder { Self { initial_entities_to_sync: None } } + #[must_use] + pub fn set_entities_to_sync(mut self, entities: Vec) -> Self { + self.initial_entities_to_sync = Some(entities); + self + } + + /// Returns an initialized [Client] with the provided configurations. + /// + /// The subscription service is not immediately started when calling this function, instead it + /// must be manually started using `Client::start_subscription`. pub async fn build( self, torii_endpoint: String, @@ -123,40 +150,14 @@ impl ClientBuilder { } } - // initiate the stream any way, even if we don't have any initial entities to sync - let sub_res_stream = grpc_client - .subscribe_entities(self.initial_entities_to_sync.unwrap_or_default()) - .await?; - // setup the subscription client - let subscription_client_handle = { - let (sub_req_tx, sub_req_rcv) = mpsc::channel(128); - - spawn_task(SubscriptionClient { - sub_res_stream, - err_callback: None, - req_rcv: sub_req_rcv, - storage: client_storage.clone(), - world_metadata: shared_metadata.clone(), - subscribed_entities: subbed_entities.clone(), - }); - - SubscriptionClientHandle { event_handler: sub_req_tx } - }; - Ok(Client { + inner: grpc_client, storage: client_storage, metadata: shared_metadata, - subscription_client_handle, - inner: Mutex::new(grpc_client), - entity_subscription: subbed_entities, + sub_client_handle: OnceCell::new(), + subscribed_entities: subbed_entities, }) } - - #[must_use] - pub fn set_entities_to_sync(mut self, entities: Vec) -> Self { - self.initial_entities_to_sync = Some(entities); - self - } } impl Default for ClientBuilder { diff --git a/crates/torii/client/src/client/subscription.rs b/crates/torii/client/src/client/subscription.rs index d46cb5b159..0666d7de67 100644 --- a/crates/torii/client/src/client/subscription.rs +++ b/crates/torii/client/src/client/subscription.rs @@ -7,7 +7,7 @@ use std::task::Poll; use anyhow::{anyhow, Result}; use dojo_types::schema::EntityModel; use dojo_types::WorldMetadata; -use futures::channel::mpsc::{Receiver, Sender}; +use futures::channel::mpsc::{self, Receiver, Sender}; use futures_util::StreamExt; use parking_lot::RwLock; use starknet::core::utils::cairo_short_string_to_felt; @@ -103,26 +103,45 @@ impl SubscribedEntities { } } -#[allow(unused)] -pub(crate) struct SubscriptionClientHandle { - pub(super) event_handler: Sender, -} +#[derive(Debug)] +pub(crate) struct SubscriptionClientHandle(Sender); #[must_use = "SubscriptionClient does nothing unless polled"] -pub struct SubscriptionClient { - pub(super) req_rcv: Receiver, +pub struct SubscriptionService { + req_rcv: Receiver, /// The stream returned by the subscription server to receive the response - pub(super) sub_res_stream: tonic::Streaming, + sub_res_stream: tonic::Streaming, /// Callback to be called on error - pub(super) err_callback: Option>, + err_callback: Option>, // for processing the entity diff and updating the storage - pub(super) storage: Arc, - pub(super) world_metadata: Arc>, - pub(super) subscribed_entities: Arc, + storage: Arc, + world_metadata: Arc>, + subscribed_entities: Arc, } -impl SubscriptionClient { +impl SubscriptionService { + pub(super) fn new( + storage: Arc, + world_metadata: Arc>, + subscribed_entities: Arc, + sub_res_stream: tonic::Streaming, + ) -> (Self, SubscriptionClientHandle) { + let (req_sender, req_rcv) = mpsc::channel(128); + let handle = SubscriptionClientHandle(req_sender); + + let client = Self { + req_rcv, + storage, + world_metadata, + sub_res_stream, + err_callback: None, + subscribed_entities, + }; + + (client, handle) + } + // TODO: handle the subscription events properly fn handle_event(&self, event: SubscriptionEvent) -> Result<(), Error> { match event { @@ -189,7 +208,7 @@ impl SubscriptionClient { } } -impl Future for SubscriptionClient { +impl Future for SubscriptionService { type Output = (); fn poll( diff --git a/crates/torii/client/wasm/Cargo.lock b/crates/torii/client/wasm/Cargo.lock index 48e33c4bde..55511396b9 100644 --- a/crates/torii/client/wasm/Cargo.lock +++ b/crates/torii/client/wasm/Cargo.lock @@ -3058,7 +3058,6 @@ dependencies = [ "futures", "futures-util", "http", - "js-sys", "parking_lot 0.12.1", "prost 0.11.9", "prost 0.12.1", @@ -3067,14 +3066,10 @@ dependencies = [ "starknet", "starknet-crypto", "thiserror", - "tokio", "tonic 0.10.1", "tonic 0.9.2", "torii-grpc", "url", - "wasm-bindgen", - "wasm-bindgen-futures", - "web-sys", ] [[package]] diff --git a/crates/torii/client/wasm/src/lib.rs b/crates/torii/client/wasm/src/lib.rs index 715e47ae94..ca51473d30 100644 --- a/crates/torii/client/wasm/src/lib.rs +++ b/crates/torii/client/wasm/src/lib.rs @@ -23,7 +23,6 @@ pub struct Client(torii_client::client::Client); #[wasm_bindgen] impl Client { - /// Returns the model values of the requested entity. #[wasm_bindgen(js_name = getModelValue)] pub async fn get_model_value( &self, @@ -81,11 +80,17 @@ pub async fn spawn_client( JsValue::from_str(format!("failed to parse world address: {err}").as_str()) })?; - let client = torii_client::client::ClientBuilder::new() + let mut client = torii_client::client::ClientBuilder::new() .set_entities_to_sync(entities) .build(torii_url.into(), rpc_url.into(), world_address) .await .map_err(|err| JsValue::from_str(format!("failed to build client: {err}").as_str()))?; + wasm_bindgen_futures::spawn_local(client.start_subscription().await.map_err(|err| { + JsValue::from_str( + format!("failed to start torii client subscription service: {err}").as_str(), + ) + })?); + Ok(Client(client)) }