Skip to content

Commit

Permalink
Allow updating synced entities after Client is built (#1040)
Browse files Browse the repository at this point in the history
  • Loading branch information
kariy authored Oct 14, 2023
1 parent 7d2c8c9 commit 4dcefb3
Show file tree
Hide file tree
Showing 15 changed files with 228 additions and 42 deletions.
2 changes: 1 addition & 1 deletion crates/dojo-lang/src/manifest_test_data/manifest
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ test_manifest_file
{
"name": "player_actions",
"address": null,
"class_hash": "0x53ce2324af80c9c879e2ccefd0feaef0c3461b6ddfb2e0d35a0e78d5ba76a08",
"class_hash": "0x473f9dbf12f5dae40e26219f7ff6c9156a7ace830a211be372e9b88136505c7",
"abi": [
{
"type": "impl",
Expand Down
1 change: 1 addition & 0 deletions crates/torii/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ serde_json.workspace = true
starknet-crypto.workspace = true
starknet.workspace = true
thiserror.workspace = true
tokio = { version = "1.32.0", features = [ "sync" ], default-features = false }
torii-grpc = { path = "../grpc", features = [ "client" ] }
url.workspace = true

Expand Down
2 changes: 2 additions & 0 deletions crates/torii/client/src/client/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use starknet::providers::{JsonRpcClient, Provider};

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Subscription service uninitialized")]
SubscriptionUninitialized,
#[error("Unknown model: {0}")]
UnknownModel(String),
#[error(
Expand Down
52 changes: 48 additions & 4 deletions crates/torii/client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use starknet::core::utils::cairo_short_string_to_felt;
use starknet::providers::jsonrpc::HttpTransport;
use starknet::providers::JsonRpcClient;
use starknet_crypto::FieldElement;
use tokio::sync::RwLock as AsyncRwLock;
use torii_grpc::protos::world::SubscribeEntitiesResponse;

use self::error::{Error, ParseError};
use self::storage::ModelStorage;
Expand All @@ -26,7 +28,7 @@ pub struct Client {
/// Metadata of the World that the client is connected to.
metadata: Arc<RwLock<WorldMetadata>>,
/// The grpc client.
inner: torii_grpc::client::WorldClient,
inner: AsyncRwLock<torii_grpc::client::WorldClient>,
/// Entity storage
storage: Arc<ModelStorage>,
/// Entities the client are subscribed to.
Expand Down Expand Up @@ -72,8 +74,9 @@ impl Client {

/// Initiate the entity subscriptions and returns a [SubscriptionService] which when await'ed
/// will execute the subscription service and starts the syncing process.
pub async fn start_subscription(&mut self) -> Result<SubscriptionService, Error> {
let sub_res_stream = self.inner.subscribe_entities(self.synced_entities()).await?;
pub async fn start_subscription(&self) -> Result<SubscriptionService, Error> {
let entities = self.synced_entities();
let sub_res_stream = self.initiate_subscription(entities).await?;

let (service, handle) = SubscriptionService::new(
Arc::clone(&self.storage),
Expand All @@ -85,6 +88,47 @@ impl Client {
self.sub_client_handle.set(handle).unwrap();
Ok(service)
}

/// Adds entities to the list of entities to be synced.
///
/// NOTE: This will establish a new subscription stream with the server.
pub async fn add_entities_to_sync(&self, entities: Vec<EntityModel>) -> Result<(), Error> {
self.subscribed_entities.add_entities(entities)?;

let updated_entities = self.synced_entities();
let sub_res_stream = self.initiate_subscription(updated_entities).await?;

match self.sub_client_handle.get() {
Some(handle) => handle.update_subscription_stream(sub_res_stream),
None => return Err(Error::SubscriptionUninitialized),
}
Ok(())
}

/// Removes entities from the list of entities to be synced.
///
/// NOTE: This will establish a new subscription stream with the server.
pub async fn remove_entities_to_sync(&self, entities: Vec<EntityModel>) -> Result<(), Error> {
self.subscribed_entities.remove_entities(entities)?;

let updated_entities = self.synced_entities();
let sub_res_stream = self.initiate_subscription(updated_entities).await?;

match self.sub_client_handle.get() {
Some(handle) => handle.update_subscription_stream(sub_res_stream),
None => return Err(Error::SubscriptionUninitialized),
}
Ok(())
}

async fn initiate_subscription(
&self,
entities: Vec<EntityModel>,
) -> Result<tonic::Streaming<SubscribeEntitiesResponse>, Error> {
let mut grpc_client = self.inner.write().await;
let stream = grpc_client.subscribe_entities(entities).await?;
Ok(stream)
}
}

// TODO: able to handle entities that has not been set yet, currently `build` will panic if the
Expand Down Expand Up @@ -147,10 +191,10 @@ impl ClientBuilder {
}

Ok(Client {
inner: grpc_client,
storage: client_storage,
metadata: shared_metadata,
sub_client_handle: OnceCell::new(),
inner: AsyncRwLock::new(grpc_client),
subscribed_entities: subbed_entities,
})
}
Expand Down
50 changes: 32 additions & 18 deletions crates/torii/client/src/client/subscription.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::cell::RefCell;
use std::collections::HashSet;
use std::future::Future;
use std::str::FromStr;
Expand All @@ -9,7 +10,7 @@ use dojo_types::schema::EntityModel;
use dojo_types::WorldMetadata;
use futures::channel::mpsc::{self, Receiver, Sender};
use futures_util::StreamExt;
use parking_lot::RwLock;
use parking_lot::{Mutex, RwLock};
use starknet::core::utils::cairo_short_string_to_felt;
use starknet_crypto::FieldElement;
use torii_grpc::protos;
Expand All @@ -20,10 +21,9 @@ use super::error::{Error, ParseError};
use super::ModelStorage;
use crate::utils::compute_all_storage_addresses;

#[derive(Debug, Clone)]
#[derive(Debug)]
pub enum SubscriptionEvent {
SubscribeEntity(EntityModel),
UnsubscribeEntity(EntityModel),
UpdateSubsciptionStream(tonic::Streaming<SubscribeEntitiesResponse>),
}

pub struct SubscribedEntities {
Expand All @@ -34,7 +34,7 @@ pub struct SubscribedEntities {
}

impl SubscribedEntities {
pub fn new(metadata: Arc<RwLock<WorldMetadata>>) -> Self {
pub(crate) fn new(metadata: Arc<RwLock<WorldMetadata>>) -> Self {
Self {
metadata,
entities: Default::default(),
Expand Down Expand Up @@ -104,13 +104,26 @@ impl SubscribedEntities {
}

#[derive(Debug)]
pub(crate) struct SubscriptionClientHandle(Sender<SubscriptionEvent>);
pub(crate) struct SubscriptionClientHandle(Mutex<Sender<SubscriptionEvent>>);

impl SubscriptionClientHandle {
fn new(sender: Sender<SubscriptionEvent>) -> Self {
Self(Mutex::new(sender))
}

pub(crate) fn update_subscription_stream(
&self,
stream: tonic::Streaming<SubscribeEntitiesResponse>,
) {
let _ = self.0.lock().try_send(SubscriptionEvent::UpdateSubsciptionStream(stream));
}
}

#[must_use = "SubscriptionClient does nothing unless polled"]
pub struct SubscriptionService {
req_rcv: Receiver<SubscriptionEvent>,
/// The stream returned by the subscription server to receive the response
sub_res_stream: tonic::Streaming<SubscribeEntitiesResponse>,
sub_res_stream: RefCell<Option<tonic::Streaming<SubscribeEntitiesResponse>>>,
/// Callback to be called on error
err_callback: Option<Box<dyn Fn(tonic::Status) + Send + Sync>>,

Expand All @@ -128,7 +141,9 @@ impl SubscriptionService {
sub_res_stream: tonic::Streaming<SubscribeEntitiesResponse>,
) -> (Self, SubscriptionClientHandle) {
let (req_sender, req_rcv) = mpsc::channel(128);
let handle = SubscriptionClientHandle(req_sender);

let handle = SubscriptionClientHandle::new(req_sender);
let sub_res_stream = RefCell::new(Some(sub_res_stream));

let client = Self {
req_rcv,
Expand All @@ -145,13 +160,11 @@ impl SubscriptionService {
// TODO: handle the subscription events properly
fn handle_event(&self, event: SubscriptionEvent) -> Result<(), Error> {
match event {
SubscriptionEvent::SubscribeEntity(entity) => {
self.subscribed_entities.add_entities(vec![entity])
}
SubscriptionEvent::UnsubscribeEntity(entity) => {
self.subscribed_entities.remove_entities(vec![entity])
SubscriptionEvent::UpdateSubsciptionStream(stream) => {
self.sub_res_stream.replace(Some(stream));
}
}
Ok(())
}

// handle the response from the subscription stream
Expand Down Expand Up @@ -222,11 +235,12 @@ impl Future for SubscriptionService {
let _ = pin.handle_event(req);
}

match pin.sub_res_stream.poll_next_unpin(cx) {
Poll::Ready(Some(res)) => pin.handle_response(res),

Poll::Ready(None) => return Poll::Ready(()),
Poll::Pending => return Poll::Pending,
if let Some(stream) = pin.sub_res_stream.get_mut() {
match stream.poll_next_unpin(cx) {
Poll::Ready(Some(res)) => pin.handle_response(res),
Poll::Ready(None) => return Poll::Ready(()),
Poll::Pending => return Poll::Pending,
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/torii/client/wasm/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions crates/torii/client/wasm/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,17 @@ async function run_wasm() {
],
},
});

// Get the entity values from the sync worker
clientWorker.postMessage({
type: "getModelValue",
data: {
model: "Moves",
keys: [
"0x517ececd29116499f4a1b64b094da79ba08dfd54a3edaa316134c41f8160973",
],
},
});
}, 2000);
}, 1000);
}
Expand Down
44 changes: 43 additions & 1 deletion crates/torii/client/wasm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,48 @@ impl Client {
let entities = self.0.synced_entities();
serde_wasm_bindgen::to_value(&entities).map_err(|e| e.into())
}

#[wasm_bindgen(js_name = addEntitiesToSync)]
pub async fn add_entities_to_sync(
&mut self,
entities: Vec<JsEntityComponent>,
) -> Result<(), JsValue> {
log("adding entities to sync...");

#[cfg(feature = "console-error-panic")]
console_error_panic_hook::set_once();

let entities = entities
.into_iter()
.map(serde_wasm_bindgen::from_value::<dojo_types::schema::EntityModel>)
.collect::<Result<Vec<_>, _>>()?;

self.0
.add_entities_to_sync(entities)
.await
.map_err(|err| JsValue::from_str(&err.to_string()))
}

#[wasm_bindgen(js_name = removeEntitiesToSync)]
pub async fn remove_entities_to_sync(
&self,
entities: Vec<JsEntityComponent>,
) -> Result<(), JsValue> {
log("removing entities to sync...");

#[cfg(feature = "console-error-panic")]
console_error_panic_hook::set_once();

let entities = entities
.into_iter()
.map(serde_wasm_bindgen::from_value::<dojo_types::schema::EntityModel>)
.collect::<Result<Vec<_>, _>>()?;

self.0
.remove_entities_to_sync(entities)
.await
.map_err(|err| JsValue::from_str(&err.to_string()))
}
}

/// Spawns the client along with the subscription service.
Expand All @@ -80,7 +122,7 @@ pub async fn spawn_client(
JsValue::from_str(format!("failed to parse world address: {err}").as_str())
})?;

let mut client = torii_client::client::ClientBuilder::new()
let client = torii_client::client::ClientBuilder::new()
.set_entities_to_sync(entities)
.build(torii_url.into(), rpc_url.into(), world_address)
.await
Expand Down
13 changes: 12 additions & 1 deletion crates/torii/client/wasm/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async function setup() {
const client = await spawn_client(
"http://localhost:8080/grpc",
"http://localhost:5050",
"0x4cf3f4fa5ffd94a2af92946e13fe7faafb8045fb9446cec6ba97ca34e78bc05",
"0x3fa481f41522b90b3684ecfab7650c259a76387fab9c380b7a959e3d4ac69f",
[
{
model: "Position",
Expand All @@ -28,6 +28,17 @@ async function setup() {
]
);

setTimeout(() => {
client.addEntitiesToSync([
{
model: "Moves",
keys: [
"0x517ececd29116499f4a1b64b094da79ba08dfd54a3edaa316134c41f8160973",
],
},
]);
}, 10000);

// setup the message handler for the worker
self.onmessage = function (e) {
const event = e.data.type;
Expand Down
18 changes: 13 additions & 5 deletions crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,6 @@ impl Sql {
for child in e.options.iter() {
let mut path_clone = path.clone();
path_clone.push(child.1.name());
// self.build_entity_query(path_clone.clone(), id, &child.1);
self.build_set_entity_queries_recursive(
path_clone, event_id, entity_id, &child.1,
);
Expand All @@ -337,27 +336,36 @@ impl Sql {
if let Ty::Struct(s) = model {
for (member_idx, member) in s.children.iter().enumerate() {
let name = member.name.clone();
let mut options = None; // TEMP: doesnt support complex enums yet

if let Ok(cairo_type) = Primitive::from_str(&member.ty.name()) {
query.push_str(&format!("external_{name} {}, ", cairo_type.to_sql_type()));
} else if let Ty::Enum(e) = &member.ty {
let options = e
let all_options = e
.options
.iter()
.map(|c| format!("'{}'", c.0))
.collect::<Vec<_>>()
.join(", ");

query.push_str(&format!(
"external_{name} TEXT CHECK(external_{name} IN ({options})) NOT NULL, ",
"external_{name} TEXT CHECK(external_{name} IN ({all_options})) NOT NULL, ",
));

options = Some(format!(
r#""{}""#,
e.options.iter().map(|c| c.0.clone()).collect::<Vec<_>>().join(",")
));
}

self.query_queue.push(format!(
"INSERT OR IGNORE INTO model_members (id, model_id, model_idx, member_idx, \
name, type, type_enum, key) VALUES ('{table_id}', '{}', '{model_idx}', \
'{member_idx}', '{name}', '{}', '{}', {})",
name, type, type_enum, enum_options, key) VALUES ('{table_id}', '{}', \
'{model_idx}', '{member_idx}', '{name}', '{}', '{}', {}, {})",
path[0],
member.ty.name(),
member.ty.as_ref(),
options.unwrap_or("NULL".into()),
member.key,
));
}
Expand Down
Loading

0 comments on commit 4dcefb3

Please sign in to comment.