Skip to content

Commit

Permalink
feat(torii-grpc): support multiple entity models in queries & complex…
Browse files Browse the repository at this point in the history
… keys clause for subscriptions + queries (#2095)

* feat: start working on keys entity subscription

* refactor: implementing keys subscriptions and using existing queries

* refactor: finish refactor on clauses

* fix: if felt bytes 0 then wildcard

* fmt

* feat: update torii client and fix tets

* fix: clippy

* refactor: do not consider overflowing key patterns

* feat: fully use new keysclause & add pattern matching

* feat: support pattern matching in queries

* fmt

* refactor: new types in torii client

* clippy

* fix: ignroe non matching entity keys in fixedlen

* add a model to test keys clause pattern matching

* fix: fix tests

* fix: lock version

* chore: remove debug print

* fix: match fixed len for queries

* chore: use regex for key pattern matching

* feat: add optional model clause to keysclause

* feat: add model specf to query by keys

* fix: queries

* feat: wrap up queries

* fmt & clippy

* chore: enable regexp for tests

* chore: update test for new model

* refactor: use array of models for keysclause

---------

Co-authored-by: glihm <[email protected]>
  • Loading branch information
Larkooo and glihm authored Jun 27, 2024
1 parent 48232e9 commit ca42629
Show file tree
Hide file tree
Showing 28 changed files with 1,865 additions and 398 deletions.
2 changes: 1 addition & 1 deletion crates/dojo-bindgen/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ mod tests {
gather_dojo_data(&manifest_path, "dojo_example", "dev", dojo_metadata.skip_migration)
.unwrap();

assert_eq!(data.models.len(), 7);
assert_eq!(data.models.len(), 8);

assert_eq!(data.world.name, "dojo_example");

Expand Down
4 changes: 2 additions & 2 deletions crates/dojo-world/src/manifest/manifest_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,10 +419,10 @@ fn fetch_remote_manifest() {
DeploymentManifest::load_from_remote(provider, world_address).await.unwrap()
});

assert_eq!(local_manifest.models.len(), 7);
assert_eq!(local_manifest.models.len(), 8);
assert_eq!(local_manifest.contracts.len(), 3);

assert_eq!(remote_manifest.models.len(), 7);
assert_eq!(remote_manifest.models.len(), 8);
assert_eq!(remote_manifest.contracts.len(), 3);

// compute diff from local and remote manifest
Expand Down
45 changes: 15 additions & 30 deletions crates/torii/client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use tokio::sync::RwLock as AsyncRwLock;
use torii_grpc::client::{EntityUpdateStreaming, EventUpdateStreaming, ModelDiffsStreaming};
use torii_grpc::proto::world::{RetrieveEntitiesResponse, RetrieveEventsResponse};
use torii_grpc::types::schema::{Entity, SchemaError};
use torii_grpc::types::{Event, EventQuery, KeysClause, Query};
use torii_grpc::types::{EntityKeysClause, Event, EventQuery, KeysClause, ModelKeysClause, Query};
use torii_relay::client::EventLoop;
use torii_relay::types::Message;

Expand Down Expand Up @@ -56,7 +56,6 @@ impl Client {
rpc_url: String,
relay_url: String,
world: FieldElement,
models_keys: Option<Vec<KeysClause>>,
) -> Result<Self, Error> {
let mut grpc_client = torii_grpc::client::WorldClient::new(torii_url, world).await?;

Expand All @@ -73,23 +72,6 @@ impl Client {
let provider = JsonRpcClient::new(HttpTransport::new(rpc_url));
let world_reader = WorldContractReader::new(world, provider);

if let Some(keys) = models_keys {
subbed_models.add_models(keys)?;

// TODO: change this to querying the gRPC url instead
let subbed_models = subbed_models.models_keys.read().clone();
for keys in subbed_models {
let model_reader = world_reader.model_reader(&keys.model).await?;
let values = model_reader.entity_storage(&keys.keys).await?;

client_storage.set_model_storage(
cairo_short_string_to_felt(&keys.model).unwrap(),
keys.keys,
values,
)?;
}
}

Ok(Self {
world_reader,
storage: client_storage,
Expand Down Expand Up @@ -123,7 +105,7 @@ impl Client {
self.metadata.read()
}

pub fn subscribed_models(&self) -> RwLockReadGuard<'_, HashSet<KeysClause>> {
pub fn subscribed_models(&self) -> RwLockReadGuard<'_, HashSet<ModelKeysClause>> {
self.subscribed_models.models_keys.read()
}

Expand Down Expand Up @@ -163,26 +145,26 @@ impl Client {
/// A direct stream to grpc subscribe entities
pub async fn on_entity_updated(
&self,
ids: Vec<FieldElement>,
clause: Option<EntityKeysClause>,
) -> Result<EntityUpdateStreaming, Error> {
let mut grpc_client = self.inner.write().await;
let stream = grpc_client.subscribe_entities(ids).await?;
let stream = grpc_client.subscribe_entities(clause).await?;
Ok(stream)
}

/// A direct stream to grpc subscribe event messages
pub async fn on_event_message_updated(
&self,
ids: Vec<FieldElement>,
clause: Option<EntityKeysClause>,
) -> Result<EntityUpdateStreaming, Error> {
let mut grpc_client = self.inner.write().await;
let stream = grpc_client.subscribe_event_messages(ids).await?;
let stream = grpc_client.subscribe_event_messages(clause).await?;
Ok(stream)
}

pub async fn on_starknet_event(
&self,
keys: Option<Vec<FieldElement>>,
keys: Option<KeysClause>,
) -> Result<EventUpdateStreaming, Error> {
let mut grpc_client = self.inner.write().await;
let stream = grpc_client.subscribe_events(keys).await?;
Expand All @@ -196,7 +178,7 @@ impl Client {
///
/// If the requested model is not among the synced models, it will attempt to fetch it from
/// the RPC.
pub async fn model(&self, keys: &KeysClause) -> Result<Option<Ty>, Error> {
pub async fn model(&self, keys: &ModelKeysClause) -> Result<Option<Ty>, Error> {
let Some(mut schema) = self.metadata.read().model(&keys.model).map(|m| m.schema.clone())
else {
return Ok(None);
Expand Down Expand Up @@ -232,7 +214,7 @@ impl Client {
/// Initiate the model subscriptions and returns a [SubscriptionService] which when await'ed
/// will execute the subscription service and starts the syncing process.
pub async fn start_subscription(&self) -> Result<SubscriptionService, Error> {
let models_keys: Vec<KeysClause> =
let models_keys: Vec<ModelKeysClause> =
self.subscribed_models.models_keys.read().clone().into_iter().collect();
let sub_res_stream = self.initiate_subscription(models_keys).await?;

Expand All @@ -250,7 +232,7 @@ impl Client {
/// Adds entities to the list of entities to be synced.
///
/// NOTE: This will establish a new subscription stream with the server.
pub async fn add_models_to_sync(&self, models_keys: Vec<KeysClause>) -> Result<(), Error> {
pub async fn add_models_to_sync(&self, models_keys: Vec<ModelKeysClause>) -> Result<(), Error> {
for keys in &models_keys {
self.initiate_model(&keys.model, keys.keys.clone()).await?;
}
Expand All @@ -271,7 +253,10 @@ impl Client {
/// Removes models from the list of models to be synced.
///
/// NOTE: This will establish a new subscription stream with the server.
pub async fn remove_models_to_sync(&self, models_keys: Vec<KeysClause>) -> Result<(), Error> {
pub async fn remove_models_to_sync(
&self,
models_keys: Vec<ModelKeysClause>,
) -> Result<(), Error> {
self.subscribed_models.remove_models(models_keys)?;

let updated_entities =
Expand All @@ -291,7 +276,7 @@ impl Client {

async fn initiate_subscription(
&self,
keys: Vec<KeysClause>,
keys: Vec<ModelKeysClause>,
) -> Result<ModelDiffsStreaming, Error> {
let mut grpc_client = self.inner.write().await;
let stream = grpc_client.subscribe_model_diffs(keys).await?;
Expand Down
18 changes: 9 additions & 9 deletions crates/torii/client/src/client/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use starknet::core::types::{StateDiff, StateUpdate};
use starknet::core::utils::cairo_short_string_to_felt;
use starknet_crypto::FieldElement;
use torii_grpc::client::ModelDiffsStreaming;
use torii_grpc::types::KeysClause;
use torii_grpc::types::ModelKeysClause;

use crate::client::error::{Error, ParseError};
use crate::client::storage::ModelStorage;
Expand All @@ -24,13 +24,13 @@ pub enum SubscriptionEvent {

pub struct SubscribedModels {
metadata: Arc<RwLock<WorldMetadata>>,
pub(crate) models_keys: RwLock<HashSet<KeysClause>>,
pub(crate) models_keys: RwLock<HashSet<ModelKeysClause>>,
/// All the relevant storage addresses derived from the subscribed models
pub(crate) subscribed_storage_addresses: RwLock<HashSet<FieldElement>>,
}

impl SubscribedModels {
pub(crate) fn is_synced(&self, keys: &KeysClause) -> bool {
pub(crate) fn is_synced(&self, keys: &ModelKeysClause) -> bool {
self.models_keys.read().contains(keys)
}

Expand All @@ -42,21 +42,21 @@ impl SubscribedModels {
}
}

pub(crate) fn add_models(&self, models_keys: Vec<KeysClause>) -> Result<(), Error> {
pub(crate) fn add_models(&self, models_keys: Vec<ModelKeysClause>) -> Result<(), Error> {
for keys in models_keys {
Self::add_model(self, keys)?;
}
Ok(())
}

pub(crate) fn remove_models(&self, entities_keys: Vec<KeysClause>) -> Result<(), Error> {
pub(crate) fn remove_models(&self, entities_keys: Vec<ModelKeysClause>) -> Result<(), Error> {
for keys in entities_keys {
Self::remove_model(self, keys)?;
}
Ok(())
}

pub(crate) fn add_model(&self, keys: KeysClause) -> Result<(), Error> {
pub(crate) fn add_model(&self, keys: ModelKeysClause) -> Result<(), Error> {
if !self.models_keys.write().insert(keys.clone()) {
return Ok(());
}
Expand All @@ -83,7 +83,7 @@ impl SubscribedModels {
Ok(())
}

pub(crate) fn remove_model(&self, keys: KeysClause) -> Result<(), Error> {
pub(crate) fn remove_model(&self, keys: ModelKeysClause) -> Result<(), Error> {
if !self.models_keys.write().remove(&keys) {
return Ok(());
}
Expand Down Expand Up @@ -247,7 +247,7 @@ mod tests {
use parking_lot::RwLock;
use starknet::core::utils::cairo_short_string_to_felt;
use starknet::macros::felt;
use torii_grpc::types::KeysClause;
use torii_grpc::types::ModelKeysClause;

use crate::utils::compute_all_storage_addresses;

Expand Down Expand Up @@ -283,7 +283,7 @@ mod tests {

let metadata = self::create_dummy_metadata();

let keys = KeysClause { model: model_name, keys };
let keys = ModelKeysClause { model: model_name, keys };

let subscribed_models = super::SubscribedModels::new(Arc::new(RwLock::new(metadata)));
subscribed_models.add_models(vec![keys.clone()]).expect("able to add model");
Expand Down
4 changes: 3 additions & 1 deletion crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,16 @@ impl Sql {
VALUES (?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET \
updated_at=CURRENT_TIMESTAMP, event_id=EXCLUDED.event_id RETURNING \
*";
let event_message_updated: EventMessageUpdated = sqlx::query_as(insert_entities)
let mut event_message_updated: EventMessageUpdated = sqlx::query_as(insert_entities)
.bind(&entity_id)
.bind(&keys_str)
.bind(event_id)
.bind(utc_dt_string_from_timestamp(block_timestamp))
.fetch_one(&self.pool)
.await?;

event_message_updated.updated_model = Some(entity.clone());

let path = vec![entity.name()];
self.build_set_entity_queries_recursive(
path,
Expand Down
2 changes: 1 addition & 1 deletion crates/torii/core/src/sql_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ async fn test_load_from_remote() {

let _block_timestamp = 1710754478_u64;
let models = sqlx::query("SELECT * FROM models").fetch_all(&pool).await.unwrap();
assert_eq!(models.len(), 7);
assert_eq!(models.len(), 8);

let (id, name, packed_size, unpacked_size): (String, String, u8, u8) = sqlx::query_as(
"SELECT id, name, packed_size, unpacked_size FROM models WHERE name = 'Position'",
Expand Down
4 changes: 4 additions & 0 deletions crates/torii/core/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ pub struct EventMessage {
pub executed_at: DateTime<Utc>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,

// this should never be None. as a EventMessage cannot be deleted
#[sqlx(skip)]
pub updated_model: Option<Ty>,
}

#[derive(FromRow, Deserialize, Debug, Clone)]
Expand Down
28 changes: 21 additions & 7 deletions crates/torii/grpc/proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ message Query {
}

message EventQuery {
EventKeysClause keys =1;
KeysClause keys = 1;
uint32 limit = 2;
uint32 offset = 3;
}
Expand All @@ -99,17 +99,26 @@ message Clause {
}
}

message HashedKeysClause {
repeated bytes hashed_keys = 1;
message ModelKeysClause {
string model = 1;
repeated bytes keys = 2;
}

message EventKeysClause {
repeated bytes keys = 1;
message EntityKeysClause {
oneof clause_type {
HashedKeysClause hashed_keys = 1;
KeysClause keys = 2;
}
}

message KeysClause {
string model = 1;
repeated bytes keys = 2;
repeated bytes keys = 1;
PatternMatching pattern_matching = 2;
repeated string models = 3;
}

message HashedKeysClause {
repeated bytes hashed_keys = 1;
}

message MemberClause {
Expand All @@ -125,6 +134,11 @@ message CompositeClause {
repeated Clause clauses = 3;
}

enum PatternMatching {
FixedLen = 0;
VariableLen = 1;
}

enum LogicalOperator {
AND = 0;
OR = 1;
Expand Down
8 changes: 4 additions & 4 deletions crates/torii/grpc/proto/world.proto
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ message MetadataResponse {

message SubscribeModelsRequest {
// The list of model keys to subscribe to.
repeated types.KeysClause models_keys = 1;
repeated types.ModelKeysClause models_keys = 1;
}

message SubscribeModelsResponse {
Expand All @@ -52,11 +52,11 @@ message SubscribeModelsResponse {
}

message SubscribeEntitiesRequest {
repeated bytes hashed_keys = 1;
types.EntityKeysClause clause = 1;
}

message SubscribeEventMessagesRequest {
repeated bytes hashed_keys = 1;
types.EntityKeysClause clause = 1;
}

message SubscribeEntityResponse {
Expand All @@ -83,7 +83,7 @@ message RetrieveEventsResponse {
}

message SubscribeEventsRequest {
types.EventKeysClause keys = 1;
types.KeysClause keys = 1;
}

message SubscribeEventsResponse {
Expand Down
Loading

0 comments on commit ca42629

Please sign in to comment.