Skip to content

Commit

Permalink
feat: update torii client and fix tets
Browse files Browse the repository at this point in the history
  • Loading branch information
Larkooo committed Jun 21, 2024
1 parent 1f66935 commit a297360
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 45 deletions.
43 changes: 14 additions & 29 deletions crates/torii/client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use tokio::sync::RwLock as AsyncRwLock;
use torii_grpc::client::{EntityUpdateStreaming, ModelDiffsStreaming};
use torii_grpc::proto::world::RetrieveEntitiesResponse;
use torii_grpc::types::schema::Entity;
use torii_grpc::types::{KeysClause, Query};
use torii_grpc::types::{EntityKeysClause, ModelKeysClause, Query};
use torii_relay::client::EventLoop;
use torii_relay::types::Message;

Expand Down Expand Up @@ -56,7 +56,6 @@ impl Client {
rpc_url: String,
relay_url: String,
world: FieldElement,
models_keys: Option<Vec<KeysClause>>,
) -> Result<Self, Error> {
let mut grpc_client = torii_grpc::client::WorldClient::new(torii_url, world).await?;

Expand All @@ -73,23 +72,6 @@ impl Client {
let provider = JsonRpcClient::new(HttpTransport::new(rpc_url));
let world_reader = WorldContractReader::new(world, provider);

if let Some(keys) = models_keys {
subbed_models.add_models(keys)?;

// TODO: change this to querying the gRPC url instead
let subbed_models = subbed_models.models_keys.read().clone();
for keys in subbed_models {
let model_reader = world_reader.model_reader(&keys.model).await?;
let values = model_reader.entity_storage(&keys.keys).await?;

client_storage.set_model_storage(
cairo_short_string_to_felt(&keys.model).unwrap(),
keys.keys,
values,
)?;
}
}

Ok(Self {
world_reader,
storage: client_storage,
Expand Down Expand Up @@ -123,7 +105,7 @@ impl Client {
self.metadata.read()
}

pub fn subscribed_models(&self) -> RwLockReadGuard<'_, HashSet<KeysClause>> {
pub fn subscribed_models(&self) -> RwLockReadGuard<'_, HashSet<ModelKeysClause>> {

Check warning on line 108 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L108

Added line #L108 was not covered by tests
self.subscribed_models.models_keys.read()
}

Expand Down Expand Up @@ -151,20 +133,20 @@ impl Client {
/// A direct stream to grpc subscribe entities
pub async fn on_entity_updated(
&self,
ids: Vec<FieldElement>,
clause: Option<EntityKeysClause>,

Check warning on line 136 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L136

Added line #L136 was not covered by tests
) -> Result<EntityUpdateStreaming, Error> {
let mut grpc_client = self.inner.write().await;
let stream = grpc_client.subscribe_entities(ids).await?;
let stream = grpc_client.subscribe_entities(clause).await?;

Check warning on line 139 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L139

Added line #L139 was not covered by tests
Ok(stream)
}

/// A direct stream to grpc subscribe event messages
pub async fn on_event_message_updated(
&self,
ids: Vec<FieldElement>,
clause: Option<EntityKeysClause>,

Check warning on line 146 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L146

Added line #L146 was not covered by tests
) -> Result<EntityUpdateStreaming, Error> {
let mut grpc_client = self.inner.write().await;
let stream = grpc_client.subscribe_event_messages(ids).await?;
let stream = grpc_client.subscribe_event_messages(clause).await?;

Check warning on line 149 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L149

Added line #L149 was not covered by tests
Ok(stream)
}

Expand All @@ -175,7 +157,7 @@ impl Client {
///
/// If the requested model is not among the synced models, it will attempt to fetch it from
/// the RPC.
pub async fn model(&self, keys: &KeysClause) -> Result<Option<Ty>, Error> {
pub async fn model(&self, keys: &ModelKeysClause) -> Result<Option<Ty>, Error> {

Check warning on line 160 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L160

Added line #L160 was not covered by tests
let Some(mut schema) = self.metadata.read().model(&keys.model).map(|m| m.schema.clone())
else {
return Ok(None);
Expand Down Expand Up @@ -211,7 +193,7 @@ impl Client {
/// Initiate the model subscriptions and returns a [SubscriptionService] which when await'ed
/// will execute the subscription service and starts the syncing process.
pub async fn start_subscription(&self) -> Result<SubscriptionService, Error> {
let models_keys: Vec<KeysClause> =
let models_keys: Vec<ModelKeysClause> =

Check warning on line 196 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L196

Added line #L196 was not covered by tests
self.subscribed_models.models_keys.read().clone().into_iter().collect();
let sub_res_stream = self.initiate_subscription(models_keys).await?;

Expand All @@ -229,7 +211,7 @@ impl Client {
/// Adds entities to the list of entities to be synced.
///
/// NOTE: This will establish a new subscription stream with the server.
pub async fn add_models_to_sync(&self, models_keys: Vec<KeysClause>) -> Result<(), Error> {
pub async fn add_models_to_sync(&self, models_keys: Vec<ModelKeysClause>) -> Result<(), Error> {

Check warning on line 214 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L214

Added line #L214 was not covered by tests
for keys in &models_keys {
self.initiate_model(&keys.model, keys.keys.clone()).await?;
}
Expand All @@ -250,7 +232,10 @@ impl Client {
/// Removes models from the list of models to be synced.
///
/// NOTE: This will establish a new subscription stream with the server.
pub async fn remove_models_to_sync(&self, models_keys: Vec<KeysClause>) -> Result<(), Error> {
pub async fn remove_models_to_sync(
&self,
models_keys: Vec<ModelKeysClause>,
) -> Result<(), Error> {

Check warning on line 238 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L235-L238

Added lines #L235 - L238 were not covered by tests
self.subscribed_models.remove_models(models_keys)?;

let updated_entities =
Expand All @@ -270,7 +255,7 @@ impl Client {

async fn initiate_subscription(
&self,
keys: Vec<KeysClause>,
keys: Vec<ModelKeysClause>,

Check warning on line 258 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L258

Added line #L258 was not covered by tests
) -> Result<ModelDiffsStreaming, Error> {
let mut grpc_client = self.inner.write().await;
let stream = grpc_client.subscribe_model_diffs(keys).await?;
Expand Down
18 changes: 9 additions & 9 deletions crates/torii/client/src/client/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use starknet::core::types::{StateDiff, StateUpdate};
use starknet::core::utils::cairo_short_string_to_felt;
use starknet_crypto::FieldElement;
use torii_grpc::client::ModelDiffsStreaming;
use torii_grpc::types::KeysClause;
use torii_grpc::types::ModelKeysClause;

use crate::client::error::{Error, ParseError};
use crate::client::storage::ModelStorage;
Expand All @@ -24,13 +24,13 @@ pub enum SubscriptionEvent {

pub struct SubscribedModels {
metadata: Arc<RwLock<WorldMetadata>>,
pub(crate) models_keys: RwLock<HashSet<KeysClause>>,
pub(crate) models_keys: RwLock<HashSet<ModelKeysClause>>,
/// All the relevant storage addresses derived from the subscribed models
pub(crate) subscribed_storage_addresses: RwLock<HashSet<FieldElement>>,
}

impl SubscribedModels {
pub(crate) fn is_synced(&self, keys: &KeysClause) -> bool {
pub(crate) fn is_synced(&self, keys: &ModelKeysClause) -> bool {

Check warning on line 33 in crates/torii/client/src/client/subscription.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/subscription.rs#L33

Added line #L33 was not covered by tests
self.models_keys.read().contains(keys)
}

Expand All @@ -42,21 +42,21 @@ impl SubscribedModels {
}
}

pub(crate) fn add_models(&self, models_keys: Vec<KeysClause>) -> Result<(), Error> {
pub(crate) fn add_models(&self, models_keys: Vec<ModelKeysClause>) -> Result<(), Error> {
for keys in models_keys {
Self::add_model(self, keys)?;
}
Ok(())
}

pub(crate) fn remove_models(&self, entities_keys: Vec<KeysClause>) -> Result<(), Error> {
pub(crate) fn remove_models(&self, entities_keys: Vec<ModelKeysClause>) -> Result<(), Error> {
for keys in entities_keys {
Self::remove_model(self, keys)?;
}
Ok(())
}

pub(crate) fn add_model(&self, keys: KeysClause) -> Result<(), Error> {
pub(crate) fn add_model(&self, keys: ModelKeysClause) -> Result<(), Error> {
if !self.models_keys.write().insert(keys.clone()) {
return Ok(());
}
Expand All @@ -83,7 +83,7 @@ impl SubscribedModels {
Ok(())
}

pub(crate) fn remove_model(&self, keys: KeysClause) -> Result<(), Error> {
pub(crate) fn remove_model(&self, keys: ModelKeysClause) -> Result<(), Error> {
if !self.models_keys.write().remove(&keys) {
return Ok(());
}
Expand Down Expand Up @@ -247,7 +247,7 @@ mod tests {
use parking_lot::RwLock;
use starknet::core::utils::cairo_short_string_to_felt;
use starknet::macros::felt;
use torii_grpc::types::KeysClause;
use torii_grpc::types::ModelKeysClause;

use crate::utils::compute_all_storage_addresses;

Expand Down Expand Up @@ -283,7 +283,7 @@ mod tests {

let metadata = self::create_dummy_metadata();

let keys = KeysClause { model: model_name, keys };
let keys = ModelKeysClause { model: model_name, keys };

let subscribed_models = super::SubscribedModels::new(Arc::new(RwLock::new(metadata)));
subscribed_models.add_models(vec![keys.clone()]).expect("able to add model");
Expand Down
2 changes: 1 addition & 1 deletion crates/torii/grpc/src/server/subscriptions/entity.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::str::FromStr;
Expand Down
4 changes: 2 additions & 2 deletions crates/torii/grpc/src/server/subscriptions/event_message.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::str::FromStr;
Expand All @@ -17,7 +17,7 @@ use torii_core::error::{Error, ParseError};
use torii_core::model::{build_sql_query, map_row_to_ty};
use torii_core::simple_broker::SimpleBroker;
use torii_core::sql::FELT_DELIMITER;
use torii_core::types::{Entity, EventMessage};
use torii_core::types::EventMessage;
use tracing::{error, trace};

use crate::proto;
Expand Down
11 changes: 7 additions & 4 deletions crates/torii/grpc/src/server/tests/entities_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ use torii_core::processors::register_model::RegisterModelProcessor;
use torii_core::processors::store_set_record::StoreSetRecordProcessor;
use torii_core::sql::Sql;

use crate::proto::types::KeysClause;
use crate::server::DojoWorld;
use crate::types::schema::Entity;
use crate::types::KeysClause;

#[tokio::test(flavor = "multi_thread")]
async fn test_entities_queries() {
Expand Down Expand Up @@ -116,9 +116,12 @@ async fn test_entities_queries() {
"entities",
"entity_model",
"entity_id",
KeysClause { model: "Moves".to_string(), keys: vec![account.address()] }.into(),
1,
0,
KeysClause {
keys: vec![account.address()].iter().map(|k| k.to_bytes_be().to_vec()).collect(),
}
.into(),
Some(1),
None,
)
.await
.unwrap()
Expand Down

0 comments on commit a297360

Please sign in to comment.