Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(torii-grpc): support multiple entity models in queries & complex keys clause for subscriptions + queries #2095

Merged
merged 28 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
3b00918
feat: start working on keys entity subscription
Larkooo Jun 21, 2024
f8269f8
refactor: implementing keys subscriptions and using existing queries
Larkooo Jun 21, 2024
f7090bc
refactor: finish refactor on clauses
Larkooo Jun 21, 2024
e3b589e
fix: if felt bytes 0 then wildcard
Larkooo Jun 21, 2024
4990040
fmt
Larkooo Jun 21, 2024
7d6f32a
feat: update torii client and fix tets
Larkooo Jun 21, 2024
79af3b5
fix: clippy
Larkooo Jun 21, 2024
d25fb2e
refactor: do not consider overflowing key patterns
Larkooo Jun 21, 2024
0a0f0cd
feat: fully use new keysclause & add pattern matching
Larkooo Jun 25, 2024
3b95716
feat: support pattern matching in queries
Larkooo Jun 25, 2024
f467602
fmt
Larkooo Jun 25, 2024
b261cd7
refactor: new types in torii client
Larkooo Jun 25, 2024
e8e2625
clippy
Larkooo Jun 25, 2024
dc23e6d
fix: ignroe non matching entity keys in fixedlen
Larkooo Jun 25, 2024
f1be928
add a model to test keys clause pattern matching
glihm Jun 26, 2024
6a136e3
fix: fix tests
glihm Jun 26, 2024
526a248
fix: lock version
glihm Jun 26, 2024
4bfe05c
chore: remove debug print
Larkooo Jun 26, 2024
b93b058
fix: match fixed len for queries
Larkooo Jun 26, 2024
c44b1d5
chore: use regex for key pattern matching
Larkooo Jun 26, 2024
c83efe1
feat: add optional model clause to keysclause
Larkooo Jun 26, 2024
1184bd6
feat: add model specf to query by keys
Larkooo Jun 26, 2024
64a9543
fix: queries
Larkooo Jun 26, 2024
a77b2d2
feat: wrap up queries
Larkooo Jun 27, 2024
8f3f560
fmt & clippy
Larkooo Jun 27, 2024
5fb468b
chore: enable regexp for tests
Larkooo Jun 27, 2024
fc9de16
chore: update test for new model
Larkooo Jun 27, 2024
48c9c2e
refactor: use array of models for keysclause
Larkooo Jun 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/dojo-bindgen/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ mod tests {
gather_dojo_data(&manifest_path, "dojo_example", "dev", dojo_metadata.skip_migration)
.unwrap();

assert_eq!(data.models.len(), 7);
assert_eq!(data.models.len(), 8);

assert_eq!(data.world.name, "dojo_example");

Expand Down
4 changes: 2 additions & 2 deletions crates/dojo-world/src/manifest/manifest_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,10 +419,10 @@ fn fetch_remote_manifest() {
DeploymentManifest::load_from_remote(provider, world_address).await.unwrap()
});

assert_eq!(local_manifest.models.len(), 7);
assert_eq!(local_manifest.models.len(), 8);
assert_eq!(local_manifest.contracts.len(), 3);

assert_eq!(remote_manifest.models.len(), 7);
assert_eq!(remote_manifest.models.len(), 8);
assert_eq!(remote_manifest.contracts.len(), 3);

// compute diff from local and remote manifest
Expand Down
45 changes: 15 additions & 30 deletions crates/torii/client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use torii_grpc::client::{EntityUpdateStreaming, EventUpdateStreaming, ModelDiffsStreaming};
use torii_grpc::proto::world::{RetrieveEntitiesResponse, RetrieveEventsResponse};
use torii_grpc::types::schema::{Entity, SchemaError};
use torii_grpc::types::{Event, EventQuery, KeysClause, Query};
use torii_grpc::types::{EntityKeysClause, Event, EventQuery, KeysClause, ModelKeysClause, Query};
use torii_relay::client::EventLoop;
use torii_relay::types::Message;

Expand Down Expand Up @@ -56,7 +56,6 @@
rpc_url: String,
relay_url: String,
world: FieldElement,
models_keys: Option<Vec<KeysClause>>,
) -> Result<Self, Error> {
let mut grpc_client = torii_grpc::client::WorldClient::new(torii_url, world).await?;

Expand All @@ -73,23 +72,6 @@
let provider = JsonRpcClient::new(HttpTransport::new(rpc_url));
let world_reader = WorldContractReader::new(world, provider);

if let Some(keys) = models_keys {
subbed_models.add_models(keys)?;

// TODO: change this to querying the gRPC url instead
let subbed_models = subbed_models.models_keys.read().clone();
for keys in subbed_models {
let model_reader = world_reader.model_reader(&keys.model).await?;
let values = model_reader.entity_storage(&keys.keys).await?;

client_storage.set_model_storage(
cairo_short_string_to_felt(&keys.model).unwrap(),
keys.keys,
values,
)?;
}
}

Ok(Self {
world_reader,
storage: client_storage,
Expand Down Expand Up @@ -123,7 +105,7 @@
self.metadata.read()
}

pub fn subscribed_models(&self) -> RwLockReadGuard<'_, HashSet<KeysClause>> {
pub fn subscribed_models(&self) -> RwLockReadGuard<'_, HashSet<ModelKeysClause>> {

Check warning on line 108 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L108

Added line #L108 was not covered by tests
self.subscribed_models.models_keys.read()
}

Expand Down Expand Up @@ -163,26 +145,26 @@
/// A direct stream to grpc subscribe entities
pub async fn on_entity_updated(
&self,
ids: Vec<FieldElement>,
clause: Option<EntityKeysClause>,

Check warning on line 148 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L148

Added line #L148 was not covered by tests
) -> Result<EntityUpdateStreaming, Error> {
let mut grpc_client = self.inner.write().await;
let stream = grpc_client.subscribe_entities(ids).await?;
let stream = grpc_client.subscribe_entities(clause).await?;

Check warning on line 151 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L151

Added line #L151 was not covered by tests
Ok(stream)
}

/// A direct stream to grpc subscribe event messages
pub async fn on_event_message_updated(
&self,
ids: Vec<FieldElement>,
clause: Option<EntityKeysClause>,

Check warning on line 158 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L158

Added line #L158 was not covered by tests
) -> Result<EntityUpdateStreaming, Error> {
let mut grpc_client = self.inner.write().await;
let stream = grpc_client.subscribe_event_messages(ids).await?;
let stream = grpc_client.subscribe_event_messages(clause).await?;

Check warning on line 161 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L161

Added line #L161 was not covered by tests
Ok(stream)
}

pub async fn on_starknet_event(
&self,
keys: Option<Vec<FieldElement>>,
keys: Option<KeysClause>,

Check warning on line 167 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L167

Added line #L167 was not covered by tests
) -> Result<EventUpdateStreaming, Error> {
let mut grpc_client = self.inner.write().await;
let stream = grpc_client.subscribe_events(keys).await?;
Expand All @@ -196,7 +178,7 @@
///
/// If the requested model is not among the synced models, it will attempt to fetch it from
/// the RPC.
pub async fn model(&self, keys: &KeysClause) -> Result<Option<Ty>, Error> {
pub async fn model(&self, keys: &ModelKeysClause) -> Result<Option<Ty>, Error> {

Check warning on line 181 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L181

Added line #L181 was not covered by tests
let Some(mut schema) = self.metadata.read().model(&keys.model).map(|m| m.schema.clone())
else {
return Ok(None);
Expand Down Expand Up @@ -232,7 +214,7 @@
/// Initiate the model subscriptions and returns a [SubscriptionService] which when await'ed
/// will execute the subscription service and starts the syncing process.
pub async fn start_subscription(&self) -> Result<SubscriptionService, Error> {
let models_keys: Vec<KeysClause> =
let models_keys: Vec<ModelKeysClause> =

Check warning on line 217 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L217

Added line #L217 was not covered by tests
self.subscribed_models.models_keys.read().clone().into_iter().collect();
let sub_res_stream = self.initiate_subscription(models_keys).await?;

Expand All @@ -250,7 +232,7 @@
/// Adds entities to the list of entities to be synced.
///
/// NOTE: This will establish a new subscription stream with the server.
pub async fn add_models_to_sync(&self, models_keys: Vec<KeysClause>) -> Result<(), Error> {
pub async fn add_models_to_sync(&self, models_keys: Vec<ModelKeysClause>) -> Result<(), Error> {

Check warning on line 235 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L235

Added line #L235 was not covered by tests
for keys in &models_keys {
self.initiate_model(&keys.model, keys.keys.clone()).await?;
}
Expand All @@ -271,7 +253,10 @@
/// Removes models from the list of models to be synced.
///
/// NOTE: This will establish a new subscription stream with the server.
pub async fn remove_models_to_sync(&self, models_keys: Vec<KeysClause>) -> Result<(), Error> {
pub async fn remove_models_to_sync(
&self,
models_keys: Vec<ModelKeysClause>,
) -> Result<(), Error> {

Check warning on line 259 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L256-L259

Added lines #L256 - L259 were not covered by tests
self.subscribed_models.remove_models(models_keys)?;

let updated_entities =
Expand All @@ -291,7 +276,7 @@

async fn initiate_subscription(
&self,
keys: Vec<KeysClause>,
keys: Vec<ModelKeysClause>,

Check warning on line 279 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L279

Added line #L279 was not covered by tests
) -> Result<ModelDiffsStreaming, Error> {
let mut grpc_client = self.inner.write().await;
let stream = grpc_client.subscribe_model_diffs(keys).await?;
Expand Down
18 changes: 9 additions & 9 deletions crates/torii/client/src/client/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
use starknet::core::utils::cairo_short_string_to_felt;
use starknet_crypto::FieldElement;
use torii_grpc::client::ModelDiffsStreaming;
use torii_grpc::types::KeysClause;
use torii_grpc::types::ModelKeysClause;

use crate::client::error::{Error, ParseError};
use crate::client::storage::ModelStorage;
Expand All @@ -24,13 +24,13 @@

pub struct SubscribedModels {
metadata: Arc<RwLock<WorldMetadata>>,
pub(crate) models_keys: RwLock<HashSet<KeysClause>>,
pub(crate) models_keys: RwLock<HashSet<ModelKeysClause>>,
/// All the relevant storage addresses derived from the subscribed models
pub(crate) subscribed_storage_addresses: RwLock<HashSet<FieldElement>>,
}

impl SubscribedModels {
pub(crate) fn is_synced(&self, keys: &KeysClause) -> bool {
pub(crate) fn is_synced(&self, keys: &ModelKeysClause) -> bool {

Check warning on line 33 in crates/torii/client/src/client/subscription.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/subscription.rs#L33

Added line #L33 was not covered by tests
self.models_keys.read().contains(keys)
}

Expand All @@ -42,21 +42,21 @@
}
}

pub(crate) fn add_models(&self, models_keys: Vec<KeysClause>) -> Result<(), Error> {
pub(crate) fn add_models(&self, models_keys: Vec<ModelKeysClause>) -> Result<(), Error> {
for keys in models_keys {
Self::add_model(self, keys)?;
}
Ok(())
}

pub(crate) fn remove_models(&self, entities_keys: Vec<KeysClause>) -> Result<(), Error> {
pub(crate) fn remove_models(&self, entities_keys: Vec<ModelKeysClause>) -> Result<(), Error> {
for keys in entities_keys {
Self::remove_model(self, keys)?;
}
Ok(())
}

pub(crate) fn add_model(&self, keys: KeysClause) -> Result<(), Error> {
pub(crate) fn add_model(&self, keys: ModelKeysClause) -> Result<(), Error> {
if !self.models_keys.write().insert(keys.clone()) {
return Ok(());
}
Expand All @@ -83,7 +83,7 @@
Ok(())
}

pub(crate) fn remove_model(&self, keys: KeysClause) -> Result<(), Error> {
pub(crate) fn remove_model(&self, keys: ModelKeysClause) -> Result<(), Error> {
if !self.models_keys.write().remove(&keys) {
return Ok(());
}
Expand Down Expand Up @@ -247,7 +247,7 @@
use parking_lot::RwLock;
use starknet::core::utils::cairo_short_string_to_felt;
use starknet::macros::felt;
use torii_grpc::types::KeysClause;
use torii_grpc::types::ModelKeysClause;

use crate::utils::compute_all_storage_addresses;

Expand Down Expand Up @@ -283,7 +283,7 @@

let metadata = self::create_dummy_metadata();

let keys = KeysClause { model: model_name, keys };
let keys = ModelKeysClause { model: model_name, keys };

let subscribed_models = super::SubscribedModels::new(Arc::new(RwLock::new(metadata)));
subscribed_models.add_models(vec![keys.clone()]).expect("able to add model");
Expand Down
4 changes: 3 additions & 1 deletion crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,16 @@
VALUES (?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET \
updated_at=CURRENT_TIMESTAMP, event_id=EXCLUDED.event_id RETURNING \
*";
let event_message_updated: EventMessageUpdated = sqlx::query_as(insert_entities)
let mut event_message_updated: EventMessageUpdated = sqlx::query_as(insert_entities)

Check warning on line 231 in crates/torii/core/src/sql.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/sql.rs#L231

Added line #L231 was not covered by tests
.bind(&entity_id)
.bind(&keys_str)
.bind(event_id)
.bind(utc_dt_string_from_timestamp(block_timestamp))
.fetch_one(&self.pool)
.await?;

event_message_updated.updated_model = Some(entity.clone());

Check warning on line 240 in crates/torii/core/src/sql.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/sql.rs#L239-L240

Added lines #L239 - L240 were not covered by tests
let path = vec![entity.name()];
self.build_set_entity_queries_recursive(
path,
Expand Down
2 changes: 1 addition & 1 deletion crates/torii/core/src/sql_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ async fn test_load_from_remote() {

let _block_timestamp = 1710754478_u64;
let models = sqlx::query("SELECT * FROM models").fetch_all(&pool).await.unwrap();
assert_eq!(models.len(), 7);
assert_eq!(models.len(), 8);

let (id, name, packed_size, unpacked_size): (String, String, u8, u8) = sqlx::query_as(
"SELECT id, name, packed_size, unpacked_size FROM models WHERE name = 'Position'",
Expand Down
4 changes: 4 additions & 0 deletions crates/torii/core/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ pub struct EventMessage {
pub executed_at: DateTime<Utc>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,

// this should never be None. as a EventMessage cannot be deleted
#[sqlx(skip)]
pub updated_model: Option<Ty>,
}

#[derive(FromRow, Deserialize, Debug, Clone)]
Expand Down
28 changes: 21 additions & 7 deletions crates/torii/grpc/proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ message Query {
}

message EventQuery {
EventKeysClause keys =1;
KeysClause keys = 1;
uint32 limit = 2;
uint32 offset = 3;
}
Expand All @@ -99,17 +99,26 @@ message Clause {
}
}

message HashedKeysClause {
repeated bytes hashed_keys = 1;
message ModelKeysClause {
string model = 1;
repeated bytes keys = 2;
}

message EventKeysClause {
repeated bytes keys = 1;
message EntityKeysClause {
oneof clause_type {
HashedKeysClause hashed_keys = 1;
KeysClause keys = 2;
}
}

message KeysClause {
string model = 1;
repeated bytes keys = 2;
repeated bytes keys = 1;
PatternMatching pattern_matching = 2;
repeated string models = 3;
}

message HashedKeysClause {
repeated bytes hashed_keys = 1;
}

message MemberClause {
Expand All @@ -125,6 +134,11 @@ message CompositeClause {
repeated Clause clauses = 3;
}

enum PatternMatching {
FixedLen = 0;
VariableLen = 1;
}

enum LogicalOperator {
AND = 0;
OR = 1;
Expand Down
8 changes: 4 additions & 4 deletions crates/torii/grpc/proto/world.proto
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ message MetadataResponse {

message SubscribeModelsRequest {
// The list of model keys to subscribe to.
repeated types.KeysClause models_keys = 1;
repeated types.ModelKeysClause models_keys = 1;
}

message SubscribeModelsResponse {
Expand All @@ -52,11 +52,11 @@ message SubscribeModelsResponse {
}

message SubscribeEntitiesRequest {
repeated bytes hashed_keys = 1;
types.EntityKeysClause clause = 1;
}

message SubscribeEventMessagesRequest {
repeated bytes hashed_keys = 1;
types.EntityKeysClause clause = 1;
}

message SubscribeEntityResponse {
Expand All @@ -83,7 +83,7 @@ message RetrieveEventsResponse {
}

message SubscribeEventsRequest {
types.EventKeysClause keys = 1;
types.KeysClause keys = 1;
}

message SubscribeEventsResponse {
Expand Down
Loading
Loading