Skip to content

Commit

Permalink
feat: update subscription by subscription id & multiple clauses (#2176)
Browse files Browse the repository at this point in the history
* feat: determistic subscription id (by clause) so we can update subscroptions without id

* feat: start refactoring to have an array of clauses for sub

* refactor: check clauses

* refactor: start refactoring to add sub id in stream

* chore: update proto to include subscription id

* feat: add update subscription rpc endpoint

* feat: implement update subscriptions

* feat: finalized torii server & wip client

* feat: update torii client to add updating subs

* fmt

* refactor: apply suggestions

* chore: clippy

* fix: deadlock

* type alias for sub id
  • Loading branch information
Larkooo authored Jul 17, 2024
1 parent fa002d5 commit 5d8b5f9
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 80 deletions.
30 changes: 26 additions & 4 deletions crates/torii/client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,23 +141,45 @@ impl Client {
/// A direct stream to grpc subscribe entities
pub async fn on_entity_updated(
&self,
clause: Option<EntityKeysClause>,
clauses: Vec<EntityKeysClause>,
) -> Result<EntityUpdateStreaming, Error> {
let mut grpc_client = self.inner.write().await;
let stream = grpc_client.subscribe_entities(clause).await?;
let stream = grpc_client.subscribe_entities(clauses).await?;
Ok(stream)
}

/// Update the entities subscription
pub async fn update_entity_subscription(
&self,
subscription_id: u64,
clauses: Vec<EntityKeysClause>,
) -> Result<(), Error> {
let mut grpc_client = self.inner.write().await;
grpc_client.update_entities_subscription(subscription_id, clauses).await?;
Ok(())
}

/// A direct stream to grpc subscribe event messages
pub async fn on_event_message_updated(
&self,
clause: Option<EntityKeysClause>,
clauses: Vec<EntityKeysClause>,
) -> Result<EntityUpdateStreaming, Error> {
let mut grpc_client = self.inner.write().await;
let stream = grpc_client.subscribe_event_messages(clause).await?;
let stream = grpc_client.subscribe_event_messages(clauses).await?;
Ok(stream)
}

/// Update the event messages subscription
pub async fn update_event_message_subscription(
&self,
subscription_id: u64,
clauses: Vec<EntityKeysClause>,
) -> Result<(), Error> {
let mut grpc_client = self.inner.write().await;
grpc_client.update_event_messages_subscription(subscription_id, clauses).await?;
Ok(())
}

pub async fn on_starknet_event(
&self,
keys: Option<KeysClause>,
Expand Down
16 changes: 13 additions & 3 deletions crates/torii/grpc/proto/world.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ syntax = "proto3";
package world;

import "types.proto";
import "google/protobuf/empty.proto";


// The World service provides information about the world.
service World {
Expand All @@ -14,12 +16,18 @@ service World {
// Subscribe to entity updates.
rpc SubscribeEntities (SubscribeEntitiesRequest) returns (stream SubscribeEntityResponse);

// Update entity subscription
rpc UpdateEntitiesSubscription (UpdateEntitiesSubscriptionRequest) returns (google.protobuf.Empty);

// Retrieve entities
rpc RetrieveEntities (RetrieveEntitiesRequest) returns (RetrieveEntitiesResponse);

// Subscribe to entity updates.
rpc SubscribeEventMessages (SubscribeEntitiesRequest) returns (stream SubscribeEntityResponse);

// Update entity subscription
rpc UpdateEventMessagesSubscription (UpdateEntitiesSubscriptionRequest) returns (google.protobuf.Empty);

// Retrieve entities
rpc RetrieveEventMessages (RetrieveEntitiesRequest) returns (RetrieveEntitiesResponse);

Expand Down Expand Up @@ -52,15 +60,17 @@ message SubscribeModelsResponse {
}

message SubscribeEntitiesRequest {
types.EntityKeysClause clause = 1;
repeated types.EntityKeysClause clauses = 1;
}

message SubscribeEventMessagesRequest {
types.EntityKeysClause clause = 1;
message UpdateEntitiesSubscriptionRequest {
uint64 subscription_id = 1;
repeated types.EntityKeysClause clauses = 2;
}

message SubscribeEntityResponse {
types.Entity entity = 1;
uint64 subscription_id = 2;
}

message RetrieveEntitiesRequest {
Expand Down
68 changes: 54 additions & 14 deletions crates/torii/grpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::proto::world::{
world_client, MetadataRequest, RetrieveEntitiesRequest, RetrieveEntitiesResponse,
RetrieveEventsRequest, RetrieveEventsResponse, SubscribeEntitiesRequest,
SubscribeEntityResponse, SubscribeEventsRequest, SubscribeEventsResponse,
SubscribeModelsRequest, SubscribeModelsResponse,
SubscribeModelsRequest, SubscribeModelsResponse, UpdateEntitiesSubscriptionRequest,
};
use crate::types::schema::{self, Entity, SchemaError};
use crate::types::{EntityKeysClause, Event, EventQuery, KeysClause, ModelKeysClause, Query};
Expand Down Expand Up @@ -104,41 +104,80 @@ impl WorldClient {
/// Subscribe to entities updates of a World.
pub async fn subscribe_entities(
&mut self,
clause: Option<EntityKeysClause>,
clauses: Vec<EntityKeysClause>,
) -> Result<EntityUpdateStreaming, Error> {
let clause = clause.map(|c| c.into());
let clauses = clauses.into_iter().map(|c| c.into()).collect();
let stream = self
.inner
.subscribe_entities(SubscribeEntitiesRequest { clause })
.subscribe_entities(SubscribeEntitiesRequest { clauses })
.await
.map_err(Error::Grpc)
.map(|res| res.into_inner())?;

Ok(EntityUpdateStreaming(stream.map_ok(Box::new(|res| match res.entity {
Some(entity) => entity.try_into().expect("must able to serialize"),
None => Entity { hashed_keys: Felt::ZERO, models: vec![] },
Ok(EntityUpdateStreaming(stream.map_ok(Box::new(|res| {
res.entity.map_or(
(res.subscription_id, Entity { hashed_keys: Felt::ZERO, models: vec![] }),
|entity| (res.subscription_id, entity.try_into().expect("must able to serialize")),
)
}))))
}

/// Update an entities subscription.
pub async fn update_entities_subscription(
&mut self,
subscription_id: u64,
clauses: Vec<EntityKeysClause>,
) -> Result<(), Error> {
let clauses = clauses.into_iter().map(|c| c.into()).collect();

self.inner
.update_entities_subscription(UpdateEntitiesSubscriptionRequest {
subscription_id,
clauses,
})
.await
.map_err(Error::Grpc)
.map(|res| res.into_inner())
}

/// Subscribe to event messages of a World.
pub async fn subscribe_event_messages(
&mut self,
clause: Option<EntityKeysClause>,
clauses: Vec<EntityKeysClause>,
) -> Result<EntityUpdateStreaming, Error> {
let clause = clause.map(|c| c.into());
let clauses = clauses.into_iter().map(|c| c.into()).collect();
let stream = self
.inner
.subscribe_event_messages(SubscribeEntitiesRequest { clause })
.subscribe_event_messages(SubscribeEntitiesRequest { clauses })
.await
.map_err(Error::Grpc)
.map(|res| res.into_inner())?;

Ok(EntityUpdateStreaming(stream.map_ok(Box::new(|res| match res.entity {
Some(entity) => entity.try_into().expect("must able to serialize"),
None => Entity { hashed_keys: Felt::ZERO, models: vec![] },
Ok(EntityUpdateStreaming(stream.map_ok(Box::new(|res| {
res.entity.map_or(
(res.subscription_id, Entity { hashed_keys: Felt::ZERO, models: vec![] }),
|entity| (res.subscription_id, entity.try_into().expect("must able to serialize")),
)
}))))
}

/// Update an event messages subscription.
pub async fn update_event_messages_subscription(
&mut self,
subscription_id: u64,
clauses: Vec<EntityKeysClause>,
) -> Result<(), Error> {
let clauses = clauses.into_iter().map(|c| c.into()).collect();
self.inner
.update_event_messages_subscription(UpdateEntitiesSubscriptionRequest {
subscription_id,
clauses,
})
.await
.map_err(Error::Grpc)
.map(|res| res.into_inner())
}

/// Subscribe to the events of a World.
pub async fn subscribe_events(
&mut self,
Expand Down Expand Up @@ -200,9 +239,10 @@ impl Stream for ModelDiffsStreaming {
}
}

type SubscriptionId = u64;
type EntityMappedStream = MapOk<
tonic::Streaming<SubscribeEntityResponse>,
Box<dyn Fn(SubscribeEntityResponse) -> Entity + Send>,
Box<dyn Fn(SubscribeEntityResponse) -> (SubscriptionId, Entity) + Send>,
>;

#[derive(Debug)]
Expand Down
49 changes: 41 additions & 8 deletions crates/torii/grpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use futures::Stream;
use proto::world::{
MetadataRequest, MetadataResponse, RetrieveEntitiesRequest, RetrieveEntitiesResponse,
RetrieveEventsRequest, RetrieveEventsResponse, SubscribeModelsRequest, SubscribeModelsResponse,
UpdateEntitiesSubscriptionRequest,
};
use sqlx::prelude::FromRow;
use sqlx::sqlite::SqliteRow;
Expand Down Expand Up @@ -772,9 +773,9 @@ impl DojoWorld {

async fn subscribe_entities(
&self,
keys: Option<proto::types::EntityKeysClause>,
keys: Vec<proto::types::EntityKeysClause>,
) -> Result<Receiver<Result<proto::world::SubscribeEntityResponse, tonic::Status>>, Error> {
self.entity_manager.add_subscriber(keys.map(|keys| keys.into())).await
self.entity_manager.add_subscriber(keys.into_iter().map(|keys| keys.into()).collect()).await
}

async fn retrieve_entities(
Expand Down Expand Up @@ -849,9 +850,11 @@ impl DojoWorld {

async fn subscribe_event_messages(
&self,
keys: Option<proto::types::EntityKeysClause>,
clauses: Vec<proto::types::EntityKeysClause>,
) -> Result<Receiver<Result<proto::world::SubscribeEntityResponse, tonic::Status>>, Error> {
self.event_message_manager.add_subscriber(keys.map(|keys| keys.into())).await
self.event_message_manager
.add_subscriber(clauses.into_iter().map(|keys| keys.into()).collect())
.await
}

async fn retrieve_event_messages(
Expand Down Expand Up @@ -1054,13 +1057,28 @@ impl proto::world::world_server::World for DojoWorld {
&self,
request: Request<SubscribeEntitiesRequest>,
) -> ServiceResult<Self::SubscribeEntitiesStream> {
let SubscribeEntitiesRequest { clause } = request.into_inner();
let SubscribeEntitiesRequest { clauses } = request.into_inner();
let rx =
self.subscribe_entities(clause).await.map_err(|e| Status::internal(e.to_string()))?;
self.subscribe_entities(clauses).await.map_err(|e| Status::internal(e.to_string()))?;

Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::SubscribeEntitiesStream))
}

async fn update_entities_subscription(
&self,
request: Request<UpdateEntitiesSubscriptionRequest>,
) -> ServiceResult<()> {
let UpdateEntitiesSubscriptionRequest { subscription_id, clauses } = request.into_inner();
self.entity_manager
.update_subscriber(
subscription_id,
clauses.into_iter().map(|keys| keys.into()).collect(),
)
.await;

Ok(Response::new(()))
}

async fn retrieve_entities(
&self,
request: Request<RetrieveEntitiesRequest>,
Expand All @@ -1080,15 +1098,30 @@ impl proto::world::world_server::World for DojoWorld {
&self,
request: Request<SubscribeEntitiesRequest>,
) -> ServiceResult<Self::SubscribeEntitiesStream> {
let SubscribeEntitiesRequest { clause } = request.into_inner();
let SubscribeEntitiesRequest { clauses } = request.into_inner();
let rx = self
.subscribe_event_messages(clause)
.subscribe_event_messages(clauses)
.await
.map_err(|e| Status::internal(e.to_string()))?;

Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::SubscribeEntitiesStream))
}

async fn update_event_messages_subscription(
&self,
request: Request<UpdateEntitiesSubscriptionRequest>,
) -> ServiceResult<()> {
let UpdateEntitiesSubscriptionRequest { subscription_id, clauses } = request.into_inner();
self.event_message_manager
.update_subscriber(
subscription_id,
clauses.into_iter().map(|keys| keys.into()).collect(),
)
.await;

Ok(Response::new(()))
}

async fn retrieve_event_messages(
&self,
request: Request<RetrieveEntitiesRequest>,
Expand Down
Loading

0 comments on commit 5d8b5f9

Please sign in to comment.