Skip to content

Commit

Permalink
refactor(torii-client): remove reliance on specific async runtime (#1002
Browse files Browse the repository at this point in the history
)
  • Loading branch information
kariy authored Oct 10, 2023
1 parent 2331e76 commit eea697b
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 74 deletions.
4 changes: 0 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 0 additions & 5 deletions crates/torii/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ serde_json.workspace = true
starknet-crypto.workspace = true
starknet.workspace = true
thiserror.workspace = true
tokio = { version = "1.32.0", default-features = false, features = [ "rt" ] }
torii-grpc = { path = "../grpc", features = [ "client" ] }
url.workspace = true

Expand All @@ -28,12 +27,8 @@ prost.workspace = true
tonic.workspace = true

[target.'cfg(target_arch = "wasm32")'.dependencies]
js-sys = "0.3.64"
wasm-bindgen = "0.2.87"
wasm-bindgen-futures = "0.4.37"
wasm-prost.workspace = true
wasm-tonic.workspace = true
web-sys = { version = "0.3.4", features = [ 'Window', 'WorkerGlobalScope' ] }

[dev-dependencies]
camino.workspace = true
Expand Down
89 changes: 45 additions & 44 deletions crates/torii/client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,23 @@ pub mod error;
pub mod storage;
pub mod subscription;

use std::cell::OnceCell;
use std::sync::Arc;

use dojo_types::packing::unpack;
use dojo_types::schema::{EntityModel, Ty};
use dojo_types::WorldMetadata;
use futures::channel::mpsc;
use parking_lot::{Mutex, RwLock};
use parking_lot::RwLock;
use starknet::core::types::{BlockId, BlockTag};
use starknet::core::utils::cairo_short_string_to_felt;
use starknet::providers::jsonrpc::HttpTransport;
use starknet::providers::JsonRpcClient;
use starknet_crypto::FieldElement;
#[cfg(not(target_arch = "wasm32"))]
use tokio::task::spawn as spawn_task;
#[cfg(target_arch = "wasm32")]
use wasm_bindgen_futures::spawn_local as spawn_task;

use self::error::{Error, ParseError};
use self::storage::ModelStorage;
use self::subscription::{SubscribedEntities, SubscriptionClientHandle};
use crate::client::subscription::SubscriptionClient;
use crate::client::subscription::SubscriptionService;
use crate::contract::world::WorldContractReader;

// TODO: expose the World interface from the `Client`
Expand All @@ -31,22 +27,27 @@ pub struct Client {
/// Metadata of the World that the client is connected to.
metadata: Arc<RwLock<WorldMetadata>>,
/// The grpc client.
inner: Mutex<torii_grpc::client::WorldClient>,
inner: torii_grpc::client::WorldClient,
/// Entity storage
storage: Arc<ModelStorage>,
/// Entities the client are subscribed to.
entity_subscription: Arc<SubscribedEntities>,
/// The subscription client handle
subscription_client_handle: SubscriptionClientHandle,
subscribed_entities: Arc<SubscribedEntities>,
/// The subscription client handle.
sub_client_handle: OnceCell<SubscriptionClientHandle>,
}

impl Client {
/// Returns the metadata of the world.
pub fn world_metadata(&self) -> WorldMetadata {
/// Returns a [ClientBuilder] for building a [Client].
pub fn build() -> ClientBuilder {
ClientBuilder::new()
}

/// Returns the metadata of the world that the client is connected to.
pub fn metadata(&self) -> WorldMetadata {
self.metadata.read().clone()
}

/// Returns the component value of an entity.
/// Returns the model value of an entity.
pub fn entity(&self, model: &str, keys: &[FieldElement]) -> Option<Ty> {
let Ok(Some(raw_values)) =
self.storage.get_entity(cairo_short_string_to_felt(model).ok()?, keys)
Expand All @@ -67,7 +68,23 @@ impl Client {

/// Returns the list of entities that the client is subscribed to.
pub fn synced_entities(&self) -> Vec<EntityModel> {
self.entity_subscription.entities.read().clone().into_iter().collect()
self.subscribed_entities.entities.read().clone().into_iter().collect()
}

/// Initiate the entity subscriptions and returns a [SubscriptionService] which when await'ed
/// will execute the subscription service and starts the syncing process.
pub async fn start_subscription(&mut self) -> Result<SubscriptionService, Error> {
let sub_res_stream = self.inner.subscribe_entities(self.synced_entities()).await?;

let (service, handle) = SubscriptionService::new(
Arc::clone(&self.storage),
Arc::clone(&self.metadata),
Arc::clone(&self.subscribed_entities),
sub_res_stream,
);

self.sub_client_handle.set(handle).unwrap();
Ok(service)
}
}

Expand All @@ -83,6 +100,16 @@ impl ClientBuilder {
Self { initial_entities_to_sync: None }
}

#[must_use]
pub fn set_entities_to_sync(mut self, entities: Vec<EntityModel>) -> Self {
self.initial_entities_to_sync = Some(entities);
self
}

/// Returns an initialized [Client] with the provided configurations.
///
/// The subscription service is not immediately started when calling this function, instead it
/// must be manually started using `Client::start_subscription`.
pub async fn build(
self,
torii_endpoint: String,
Expand Down Expand Up @@ -123,40 +150,14 @@ impl ClientBuilder {
}
}

// initiate the stream any way, even if we don't have any initial entities to sync
let sub_res_stream = grpc_client
.subscribe_entities(self.initial_entities_to_sync.unwrap_or_default())
.await?;
// setup the subscription client
let subscription_client_handle = {
let (sub_req_tx, sub_req_rcv) = mpsc::channel(128);

spawn_task(SubscriptionClient {
sub_res_stream,
err_callback: None,
req_rcv: sub_req_rcv,
storage: client_storage.clone(),
world_metadata: shared_metadata.clone(),
subscribed_entities: subbed_entities.clone(),
});

SubscriptionClientHandle { event_handler: sub_req_tx }
};

Ok(Client {
inner: grpc_client,
storage: client_storage,
metadata: shared_metadata,
subscription_client_handle,
inner: Mutex::new(grpc_client),
entity_subscription: subbed_entities,
sub_client_handle: OnceCell::new(),
subscribed_entities: subbed_entities,
})
}

#[must_use]
pub fn set_entities_to_sync(mut self, entities: Vec<EntityModel>) -> Self {
self.initial_entities_to_sync = Some(entities);
self
}
}

impl Default for ClientBuilder {
Expand Down
47 changes: 33 additions & 14 deletions crates/torii/client/src/client/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::task::Poll;
use anyhow::{anyhow, Result};
use dojo_types::schema::EntityModel;
use dojo_types::WorldMetadata;
use futures::channel::mpsc::{Receiver, Sender};
use futures::channel::mpsc::{self, Receiver, Sender};
use futures_util::StreamExt;
use parking_lot::RwLock;
use starknet::core::utils::cairo_short_string_to_felt;
Expand Down Expand Up @@ -103,26 +103,45 @@ impl SubscribedEntities {
}
}

#[allow(unused)]
pub(crate) struct SubscriptionClientHandle {
pub(super) event_handler: Sender<SubscriptionEvent>,
}
#[derive(Debug)]
pub(crate) struct SubscriptionClientHandle(Sender<SubscriptionEvent>);

#[must_use = "SubscriptionClient does nothing unless polled"]
pub struct SubscriptionClient {
pub(super) req_rcv: Receiver<SubscriptionEvent>,
pub struct SubscriptionService {
req_rcv: Receiver<SubscriptionEvent>,
/// The stream returned by the subscription server to receive the response
pub(super) sub_res_stream: tonic::Streaming<SubscribeEntitiesResponse>,
sub_res_stream: tonic::Streaming<SubscribeEntitiesResponse>,
/// Callback to be called on error
pub(super) err_callback: Option<Box<dyn Fn(tonic::Status) + Send + Sync>>,
err_callback: Option<Box<dyn Fn(tonic::Status) + Send + Sync>>,

// for processing the entity diff and updating the storage
pub(super) storage: Arc<ModelStorage>,
pub(super) world_metadata: Arc<RwLock<WorldMetadata>>,
pub(super) subscribed_entities: Arc<SubscribedEntities>,
storage: Arc<ModelStorage>,
world_metadata: Arc<RwLock<WorldMetadata>>,
subscribed_entities: Arc<SubscribedEntities>,
}

impl SubscriptionClient {
impl SubscriptionService {
pub(super) fn new(
storage: Arc<ModelStorage>,
world_metadata: Arc<RwLock<WorldMetadata>>,
subscribed_entities: Arc<SubscribedEntities>,
sub_res_stream: tonic::Streaming<SubscribeEntitiesResponse>,
) -> (Self, SubscriptionClientHandle) {
let (req_sender, req_rcv) = mpsc::channel(128);
let handle = SubscriptionClientHandle(req_sender);

let client = Self {
req_rcv,
storage,
world_metadata,
sub_res_stream,
err_callback: None,
subscribed_entities,
};

(client, handle)
}

// TODO: handle the subscription events properly
fn handle_event(&self, event: SubscriptionEvent) -> Result<(), Error> {
match event {
Expand Down Expand Up @@ -189,7 +208,7 @@ impl SubscriptionClient {
}
}

impl Future for SubscriptionClient {
impl Future for SubscriptionService {
type Output = ();

fn poll(
Expand Down
5 changes: 0 additions & 5 deletions crates/torii/client/wasm/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions crates/torii/client/wasm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ pub struct Client(torii_client::client::Client);

#[wasm_bindgen]
impl Client {
/// Returns the model values of the requested entity.
#[wasm_bindgen(js_name = getModelValue)]
pub async fn get_model_value(
&self,
Expand Down Expand Up @@ -81,11 +80,17 @@ pub async fn spawn_client(
JsValue::from_str(format!("failed to parse world address: {err}").as_str())
})?;

let client = torii_client::client::ClientBuilder::new()
let mut client = torii_client::client::ClientBuilder::new()
.set_entities_to_sync(entities)
.build(torii_url.into(), rpc_url.into(), world_address)
.await
.map_err(|err| JsValue::from_str(format!("failed to build client: {err}").as_str()))?;

wasm_bindgen_futures::spawn_local(client.start_subscription().await.map_err(|err| {
JsValue::from_str(
format!("failed to start torii client subscription service: {err}").as_str(),
)
})?);

Ok(Client(client))
}

0 comments on commit eea697b

Please sign in to comment.