Skip to content

Commit

Permalink
Move model to individual query clause
Browse files Browse the repository at this point in the history
  • Loading branch information
broody committed Nov 15, 2023
1 parent 36e5b18 commit d116ace
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 74 deletions.
47 changes: 24 additions & 23 deletions crates/torii/client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,18 @@ impl Client {

// TODO: change this to querying the gRPC url instead
let subbed_entities = subbed_entities.entities.read().clone();
for Query { model, clause } in subbed_entities {
let model_reader = world_reader.model(&model).await?;
let keys = if let Clause::Keys(clause) = clause {
clause.keys
for Query { clause } in subbed_entities {
let clause = if let Clause::Keys(clause) = clause {
clause
} else {
return Err(Error::UnsupportedQuery);
};
let values = model_reader.entity_storage(&keys).await?;
let model_reader = world_reader.model(&clause.model).await?;
let values = model_reader.entity_storage(&clause.keys).await?;

client_storage.set_entity_storage(
cairo_short_string_to_felt(&model).unwrap(),
keys,
cairo_short_string_to_felt(&clause.model).unwrap(),
clause.keys,
values,
)?;
}
Expand Down Expand Up @@ -111,38 +111,39 @@ impl Client {
/// If the requested entity is not among the synced entities, it will attempt to fetch it from
/// the RPC.
pub async fn entity(&self, query: &Query) -> Result<Option<Ty>, Error> {
let Some(mut schema) = self.metadata.read().model(&query.model).map(|m| m.schema.clone())
else {
return Ok(None);
};

let keys = if let Clause::Keys(clause) = query.clone().clause {
clause.keys
let clause = if let Clause::Keys(clause) = query.clone().clause {
clause
} else {
return Err(Error::UnsupportedQuery);
};

let Some(mut schema) = self.metadata.read().model(&clause.model).map(|m| m.schema.clone())
else {
return Ok(None);
};

if !self.subscribed_entities.is_synced(query) {
let model = self.world_reader.model(&query.model).await?;
return Ok(Some(model.entity(&keys).await?));
let model = self.world_reader.model(&clause.model).await?;
return Ok(Some(model.entity(&clause.keys).await?));
}

let Ok(Some(raw_values)) = self.storage.get_entity_storage(
cairo_short_string_to_felt(&query.model).map_err(ParseError::CairoShortStringToFelt)?,
&keys,
cairo_short_string_to_felt(&clause.model)
.map_err(ParseError::CairoShortStringToFelt)?,
&clause.keys,
) else {
return Ok(Some(schema));
};

let layout = self
.metadata
.read()
.model(&query.model)
.model(&clause.model)
.map(|m| m.layout.clone())
.expect("qed; layout should exist");

let unpacked = unpack(raw_values, layout).unwrap();
let mut keys_and_unpacked = [keys.to_vec(), unpacked].concat();
let mut keys_and_unpacked = [clause.keys.to_vec(), unpacked].concat();

schema.deserialize(&mut keys_and_unpacked).unwrap();

Expand Down Expand Up @@ -171,13 +172,13 @@ impl Client {
/// NOTE: This will establish a new subscription stream with the server.
pub async fn add_entities_to_sync(&self, queries: Vec<Query>) -> Result<(), Error> {
for query in &queries {
let keys = if let Clause::Keys(clause) = query.clone().clause {
clause.keys
let clause = if let Clause::Keys(clause) = query.clone().clause {
clause
} else {
return Err(Error::UnsupportedQuery);
};

self.initiate_entity(&query.model, keys.clone()).await?;
self.initiate_entity(&clause.model, clause.keys.clone()).await?;
}

self.subscribed_entities.add_entities(queries)?;
Expand Down
30 changes: 7 additions & 23 deletions crates/torii/client/src/client/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ mod tests {
use parking_lot::RwLock;
use starknet::core::utils::cairo_short_string_to_felt;
use starknet::macros::felt;
use torii_grpc::types::{Clause, KeysClause, Query};

use crate::client::error::Error;
use crate::utils::compute_all_storage_addresses;
Expand Down Expand Up @@ -203,13 +202,8 @@ mod tests {
fn err_if_set_values_too_many() {
let storage = create_dummy_storage();
let keys = vec![felt!("0x12345")];
let query = Query {
model: "Position".into(),
clause: Clause::Keys(KeysClause { keys: keys.clone() }),
};

let values = vec![felt!("1"), felt!("2"), felt!("3"), felt!("4"), felt!("5")];
let model = cairo_short_string_to_felt(&query.model).unwrap();
let model = cairo_short_string_to_felt("Position").unwrap();
let result = storage.set_entity_storage(model, keys, values);

assert!(storage.storage.read().is_empty());
Expand All @@ -223,13 +217,8 @@ mod tests {
fn err_if_set_values_too_few() {
let storage = create_dummy_storage();
let keys = vec![felt!("0x12345")];
let query = Query {
model: "Position".into(),
clause: Clause::Keys(KeysClause { keys: keys.clone() }),
};

let values = vec![felt!("1"), felt!("2")];
let model = cairo_short_string_to_felt(&query.model).unwrap();
let model = cairo_short_string_to_felt("Position").unwrap();
let result = storage.set_entity_storage(model, keys, values);

assert!(storage.storage.read().is_empty());
Expand All @@ -243,23 +232,18 @@ mod tests {
fn set_and_get_entity_value() {
let storage = create_dummy_storage();
let keys = vec![felt!("0x12345")];
let query = Query {
model: "Position".into(),
clause: Clause::Keys(KeysClause { keys: keys.clone() }),
};

assert!(storage.storage.read().is_empty(), "storage must be empty initially");

let model = storage.metadata.read().model(&query.model).cloned().unwrap();

let metadata = storage.metadata.read().model("Position").cloned().unwrap();
let expected_storage_addresses = compute_all_storage_addresses(
cairo_short_string_to_felt(&model.name).unwrap(),
cairo_short_string_to_felt(&metadata.name).unwrap(),
&keys,
model.packed_size,
metadata.packed_size,
);

let expected_values = vec![felt!("1"), felt!("2"), felt!("3"), felt!("4")];
let model_name_in_felt = cairo_short_string_to_felt(&query.model).unwrap();
let model_name_in_felt = cairo_short_string_to_felt("Position").unwrap();

storage
.set_entity_storage(model_name_in_felt, keys.clone(), expected_values.clone())
Expand All @@ -278,7 +262,7 @@ mod tests {
"entity keys must be indexed"
);
assert!(actual_values == expected_values);
assert!(storage.storage.read().len() == model.packed_size as usize);
assert!(storage.storage.read().len() == metadata.packed_size as usize);
assert!(
actual_storage_addresses
.into_iter()
Expand Down
27 changes: 14 additions & 13 deletions crates/torii/client/src/client/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ impl SubscribedEntities {
return Ok(());
}

let keys = if let Clause::Keys(clause) = entity.clause {
clause.keys
let clause = if let Clause::Keys(clause) = entity.clause {
clause
} else {
return Err(Error::UnsupportedQuery);
};
Expand All @@ -71,14 +71,14 @@ impl SubscribedEntities {
.metadata
.read()
.models
.get(&entity.model)
.get(&clause.model)
.map(|c| c.packed_size)
.ok_or(Error::UnknownModel(entity.model.clone()))?;
.ok_or(Error::UnknownModel(clause.model.clone()))?;

let storage_addresses = compute_all_storage_addresses(
cairo_short_string_to_felt(&entity.model)
cairo_short_string_to_felt(&clause.model)
.map_err(ParseError::CairoShortStringToFelt)?,
&keys,
&clause.keys,
model_packed_size,
);

Expand All @@ -95,8 +95,8 @@ impl SubscribedEntities {
return Ok(());
}

let keys = if let Clause::Keys(clause) = query.clause {
clause.keys
let clause = if let Clause::Keys(clause) = query.clause {
clause
} else {
return Err(Error::UnsupportedQuery);
};
Expand All @@ -105,13 +105,14 @@ impl SubscribedEntities {
.metadata
.read()
.models
.get(&query.model)
.get(&clause.model)
.map(|c| c.packed_size)
.ok_or(Error::UnknownModel(query.model.clone()))?;
.ok_or(Error::UnknownModel(clause.model.clone()))?;

let storage_addresses = compute_all_storage_addresses(
cairo_short_string_to_felt(&query.model).map_err(ParseError::CairoShortStringToFelt)?,
&keys,
cairo_short_string_to_felt(&clause.model)
.map_err(ParseError::CairoShortStringToFelt)?,
&clause.keys,
model_packed_size,
);

Expand Down Expand Up @@ -295,7 +296,7 @@ mod tests {

let metadata = self::create_dummy_metadata();

let query = Query { model: model_name, clause: Clause::Keys(KeysClause { keys }) };
let query = Query { clause: Clause::Keys(KeysClause { model: model_name, keys }) };

let subscribed_entities = super::SubscribedEntities::new(Arc::new(RwLock::new(metadata)));
subscribed_entities.add_entities(vec![query.clone()]).expect("able to add entity");
Expand Down
18 changes: 10 additions & 8 deletions crates/torii/grpc/proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ message EntityUpdate {
}

message EntityQuery {
string model = 1;
Clause clause = 2;
Clause clause = 1;
}

message Clause {
Expand All @@ -67,18 +66,21 @@ message Clause {
}

message KeysClause {
repeated bytes keys = 1;
string model = 1;
repeated bytes keys = 2;
}

message AttributeClause {
string attribute = 1;
ComparisonOperator operator = 2;
Value value = 3;
string model = 1;
string attribute = 2;
ComparisonOperator operator = 3;
Value value = 4;
}

message CompositeClause {
LogicalOperator operator = 1;
repeated Clause clauses = 2;
string model = 1;
LogicalOperator operator = 2;
repeated Clause clauses = 3;
}

enum LogicalOperator {
Expand Down
18 changes: 15 additions & 3 deletions crates/torii/grpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ use tokio::sync::mpsc::Receiver;
use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
use tonic::transport::Server;
use tonic::{Request, Response, Status};
use torii_core::error::{Error, ParseError};
use torii_core::error::{Error, ParseError, QueryError};
use torii_core::model::{parse_sql_model_members, SqlModelMember};

use self::subscription::SubscribeRequest;
use crate::proto::types::clause::ClauseType;
use crate::proto::types::KeysClause;
use crate::proto::world::world_server::WorldServer;
use crate::proto::{self};

Expand Down Expand Up @@ -144,11 +146,21 @@ impl DojoWorld {
{
let mut subs = Vec::with_capacity(queries.len());
for query in queries {
let model = cairo_short_string_to_felt(&query.model)
let clause: KeysClause = query
.clone()
.clause
.ok_or(QueryError::UnsupportedQuery)
.and_then(|clause| clause.clause_type.ok_or(QueryError::UnsupportedQuery))
.and_then(|clause_type| match clause_type {
ClauseType::Keys(clause) => Ok(clause),
_ => Err(QueryError::UnsupportedQuery),
})?;

let model = cairo_short_string_to_felt(&clause.model)
.map_err(ParseError::CairoShortStringToFelt)?;

let proto::types::ModelMetadata { packed_size, .. } =
self.model_metadata(&query.model).await?;
self.model_metadata(&clause.model).await?;

subs.push(SubscribeRequest {
query,
Expand Down
15 changes: 11 additions & 4 deletions crates/torii/grpc/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use crate::proto;

#[derive(Debug, Serialize, Deserialize, PartialEq, Hash, Eq, Clone)]
pub struct Query {
pub model: String,
pub clause: Clause,
}

Expand All @@ -25,18 +24,21 @@ pub enum Clause {

#[derive(Debug, Serialize, Deserialize, PartialEq, Hash, Eq, Clone)]
pub struct KeysClause {
pub model: String,
pub keys: Vec<FieldElement>,
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Hash, Eq, Clone)]
pub struct AttributeClause {
pub model: String,
pub attribute: String,
pub operator: ComparisonOperator,
pub value: Value,
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Hash, Eq, Clone)]
pub struct CompositeClause {
pub model: String,
pub operator: LogicalOperator,
pub clauses: Vec<Clause>,
}
Expand Down Expand Up @@ -103,7 +105,7 @@ impl TryFrom<proto::types::WorldMetadata> for dojo_types::WorldMetadata {

impl From<Query> for proto::types::EntityQuery {
fn from(value: Query) -> Self {
Self { model: value.model, clause: Some(value.clause.into()) }
Self { clause: Some(value.clause.into()) }
}
}

Expand All @@ -125,7 +127,10 @@ impl From<Clause> for proto::types::Clause {

impl From<KeysClause> for proto::types::KeysClause {
fn from(value: KeysClause) -> Self {
Self { keys: value.keys.iter().map(|k| k.to_bytes_be().into()).collect() }
Self {
model: value.model,
keys: value.keys.iter().map(|k| k.to_bytes_be().into()).collect(),
}
}
}

Expand All @@ -139,13 +144,14 @@ impl TryFrom<proto::types::KeysClause> for KeysClause {
.map(|k| FieldElement::from_byte_slice_be(&k))
.collect::<Result<Vec<_>, _>>()?;

Ok(Self { keys })
Ok(Self { model: value.model, keys })
}
}

impl From<AttributeClause> for proto::types::AttributeClause {
fn from(value: AttributeClause) -> Self {
Self {
model: value.model,
attribute: value.attribute,
operator: value.operator as i32,
value: Some(value.value.into()),
Expand All @@ -156,6 +162,7 @@ impl From<AttributeClause> for proto::types::AttributeClause {
impl From<CompositeClause> for proto::types::CompositeClause {
fn from(value: CompositeClause) -> Self {
Self {
model: value.model,
operator: value.operator as i32,
clauses: value.clauses.into_iter().map(|clause| clause.into()).collect(),
}
Expand Down

0 comments on commit d116ace

Please sign in to comment.