Skip to content

Commit

Permalink
feat(torii-grpc): add events subscription (#2065)
Browse files Browse the repository at this point in the history
  • Loading branch information
Larkooo authored Jun 21, 2024
1 parent 06d5803 commit 45fa7dc
Show file tree
Hide file tree
Showing 7 changed files with 334 additions and 18 deletions.
29 changes: 25 additions & 4 deletions crates/torii/client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ use starknet::providers::jsonrpc::HttpTransport;
use starknet::providers::JsonRpcClient;
use starknet_crypto::FieldElement;
use tokio::sync::RwLock as AsyncRwLock;
use torii_grpc::client::{EntityUpdateStreaming, ModelDiffsStreaming};
use torii_grpc::proto::world::RetrieveEntitiesResponse;
use torii_grpc::types::schema::Entity;
use torii_grpc::types::{KeysClause, Query};
use torii_grpc::client::{EntityUpdateStreaming, EventUpdateStreaming, ModelDiffsStreaming};
use torii_grpc::proto::world::{RetrieveEntitiesResponse, RetrieveEventsResponse};
use torii_grpc::types::schema::{Entity, SchemaError};
use torii_grpc::types::{Event, EventQuery, KeysClause, Query};
use torii_relay::client::EventLoop;
use torii_relay::types::Message;

Expand Down Expand Up @@ -148,6 +148,18 @@ impl Client {
Ok(entities.into_iter().map(TryInto::try_into).collect::<Result<Vec<Entity>, _>>()?)
}

/// Retrieve raw starknet events matching the keys provided.
/// If the keys are empty, it will return all events.
pub async fn starknet_events(&self, query: EventQuery) -> Result<Vec<Event>, Error> {
let mut grpc_client = self.inner.write().await;
let RetrieveEventsResponse { events } = grpc_client.retrieve_events(query).await?;
Ok(events
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<Event>, _>>()
.map_err(SchemaError::SliceError)?)
}

/// A direct stream to grpc subscribe entities
pub async fn on_entity_updated(
&self,
Expand All @@ -168,6 +180,15 @@ impl Client {
Ok(stream)
}

pub async fn on_starknet_event(
&self,
keys: Option<Vec<FieldElement>>,
) -> Result<EventUpdateStreaming, Error> {
let mut grpc_client = self.inner.write().await;
let stream = grpc_client.subscribe_events(keys).await?;
Ok(stream)
}

/// Returns the value of a model.
///
/// This function will only return `None`, if `model` doesn't exist. If there is no model with
Expand Down
11 changes: 11 additions & 0 deletions crates/torii/grpc/proto/world.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ service World {

// Retrieve events
rpc RetrieveEvents (RetrieveEventsRequest) returns (RetrieveEventsResponse);

// Subscribe to events
rpc SubscribeEvents (SubscribeEventsRequest) returns (stream SubscribeEventsResponse);
}


Expand Down Expand Up @@ -78,3 +81,11 @@ message RetrieveEventsRequest {
message RetrieveEventsResponse {
repeated types.Event events = 1;
}

message SubscribeEventsRequest {
types.EventKeysClause keys = 1;
}

message SubscribeEventsResponse {
types.Event event = 1;
}
55 changes: 52 additions & 3 deletions crates/torii/grpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ use futures_util::{Stream, StreamExt, TryStreamExt};
use starknet::core::types::{FromStrError, StateDiff, StateUpdate};
use starknet_crypto::FieldElement;

use crate::proto::types::EventKeysClause;
use crate::proto::world::{
world_client, MetadataRequest, RetrieveEntitiesRequest, RetrieveEntitiesResponse,
SubscribeEntitiesRequest, SubscribeEntityResponse, SubscribeModelsRequest,
SubscribeModelsResponse,
RetrieveEventsRequest, RetrieveEventsResponse, SubscribeEntitiesRequest,
SubscribeEntityResponse, SubscribeEventsRequest, SubscribeEventsResponse,
SubscribeModelsRequest, SubscribeModelsResponse,
};
use crate::types::schema::{self, Entity, SchemaError};
use crate::types::{KeysClause, Query};
use crate::types::{Event, EventQuery, KeysClause, Query};

#[derive(Debug, thiserror::Error)]
pub enum Error {
Expand Down Expand Up @@ -92,6 +94,14 @@ impl WorldClient {
.map(|res| res.into_inner())
}

pub async fn retrieve_events(
&mut self,
query: EventQuery,
) -> Result<RetrieveEventsResponse, Error> {
let request = RetrieveEventsRequest { query: Some(query.into()) };
self.inner.retrieve_events(request).await.map_err(Error::Grpc).map(|res| res.into_inner())
}

/// Subscribe to entities updates of a World.
pub async fn subscribe_entities(
&mut self,
Expand Down Expand Up @@ -130,6 +140,28 @@ impl WorldClient {
}))))
}

/// Subscribe to the events of a World.
pub async fn subscribe_events(
&mut self,
keys: Option<Vec<FieldElement>>,
) -> Result<EventUpdateStreaming, Error> {
let keys = keys.map(|keys| EventKeysClause {
keys: keys.iter().map(|key| key.to_bytes_be().to_vec()).collect(),
});

let stream = self
.inner
.subscribe_events(SubscribeEventsRequest { keys })
.await
.map_err(Error::Grpc)
.map(|res| res.into_inner())?;

Ok(EventUpdateStreaming(stream.map_ok(Box::new(|res| match res.event {
Some(event) => event.try_into().expect("must able to serialize"),
None => Event { keys: vec![], data: vec![], transaction_hash: FieldElement::ZERO },
}))))
}

/// Subscribe to the model diff for a set of models of a World.
pub async fn subscribe_model_diffs(
&mut self,
Expand Down Expand Up @@ -187,6 +219,23 @@ impl Stream for EntityUpdateStreaming {
}
}

type EventMappedStream = MapOk<
tonic::Streaming<SubscribeEventsResponse>,
Box<dyn Fn(SubscribeEventsResponse) -> Event + Send>,
>;

pub struct EventUpdateStreaming(EventMappedStream);

impl Stream for EventUpdateStreaming {
type Item = <EventMappedStream as Stream>::Item;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.0.poll_next_unpin(cx)
}
}

fn empty_state_update() -> StateUpdate {
StateUpdate {
block_hash: FieldElement::ZERO,
Expand Down
72 changes: 61 additions & 11 deletions crates/torii/grpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use starknet::core::utils::{cairo_short_string_to_felt, get_selector_from_name};
use starknet::providers::jsonrpc::HttpTransport;
use starknet::providers::JsonRpcClient;
use starknet_crypto::FieldElement;
use subscriptions::event::EventManager;
use tokio::net::TcpListener;
use tokio::sync::mpsc::Receiver;
use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
Expand All @@ -38,7 +39,9 @@ use self::subscriptions::event_message::EventMessageManager;
use self::subscriptions::model_diff::{ModelDiffRequest, StateDiffManager};
use crate::proto::types::clause::ClauseType;
use crate::proto::world::world_server::WorldServer;
use crate::proto::world::{SubscribeEntitiesRequest, SubscribeEntityResponse};
use crate::proto::world::{
SubscribeEntitiesRequest, SubscribeEntityResponse, SubscribeEventsResponse,
};
use crate::proto::{self};
use crate::types::ComparisonOperator;

Expand All @@ -57,6 +60,7 @@ pub struct DojoWorld {
model_cache: Arc<ModelCache>,
entity_manager: Arc<EntityManager>,
event_message_manager: Arc<EventMessageManager>,
event_manager: Arc<EventManager>,
state_diff_manager: Arc<StateDiffManager>,
}

Expand All @@ -70,6 +74,7 @@ impl DojoWorld {
let model_cache = Arc::new(ModelCache::new(pool.clone()));
let entity_manager = Arc::new(EntityManager::default());
let event_message_manager = Arc::new(EventMessageManager::default());
let event_manager = Arc::new(EventManager::default());
let state_diff_manager = Arc::new(StateDiffManager::default());

tokio::task::spawn(subscriptions::model_diff::Service::new_with_block_rcv(
Expand All @@ -91,12 +96,15 @@ impl DojoWorld {
Arc::clone(&model_cache),
));

tokio::task::spawn(subscriptions::event::Service::new(Arc::clone(&event_manager)));

Self {
pool,
world_address,
model_cache,
entity_manager,
event_message_manager,
event_manager,
state_diff_manager,
}
}
Expand Down Expand Up @@ -307,7 +315,7 @@ impl DojoWorld {
return Ok("%".to_string());
}
Ok(FieldElement::from_byte_slice_be(bytes)
.map(|felt| format!("{:#x}", felt))
.map(|felt| format!("{felt:#x}"))
.map_err(ParseError::FromByteSliceError)?)
})
.collect::<Result<Vec<_>, Error>>()?;
Expand Down Expand Up @@ -392,7 +400,10 @@ impl DojoWorld {
if bytes.is_empty() {
return Ok("%".to_string());
}
Ok(str::from_utf8(bytes).unwrap().to_string())

Ok(FieldElement::from_byte_slice_be(bytes)
.map(|felt| format!("{felt:#x}"))
.map_err(ParseError::FromByteSliceError)?)
})
.collect::<Result<Vec<_>, Error>>()?;
let keys_pattern = keys.join("/") + "/%";
Expand Down Expand Up @@ -736,6 +747,24 @@ impl DojoWorld {
Ok(RetrieveEventsResponse { events })
}

async fn subscribe_events(
&self,
clause: proto::types::EventKeysClause,
) -> Result<Receiver<Result<proto::world::SubscribeEventsResponse, tonic::Status>>, Error> {
self.event_manager
.add_subscriber(
clause
.keys
.iter()
.map(|key| {
FieldElement::from_byte_slice_be(key)
.map_err(ParseError::FromByteSliceError)
})
.collect::<Result<Vec<_>, _>>()?,
)
.await
}

fn map_row_to_entity(
row: &SqliteRow,
arrays_rows: &HashMap<String, Vec<SqliteRow>>,
Expand All @@ -761,14 +790,21 @@ impl DojoWorld {
}
}

fn process_event_field(data: &str) -> Vec<Vec<u8>> {
data.trim_end_matches('/').split('/').map(|s| s.to_owned().into_bytes()).collect()
fn process_event_field(data: &str) -> Result<Vec<Vec<u8>>, Error> {
Ok(data
.trim_end_matches('/')
.split('/')
.map(|d| {
FieldElement::from_str(d).map_err(ParseError::FromStr).map(|f| f.to_bytes_be().to_vec())
})
.collect::<Result<Vec<_>, _>>()?)
}

fn map_row_to_event(row: &(String, String, String)) -> Result<proto::types::Event, Error> {
let keys = process_event_field(&row.0);
let data = process_event_field(&row.1);
let transaction_hash = row.2.to_owned().into_bytes();
let keys = process_event_field(&row.0)?;
let data = process_event_field(&row.1)?;
let transaction_hash =
FieldElement::from_str(&row.2).map_err(ParseError::FromStr)?.to_bytes_be().to_vec();

Ok(proto::types::Event { keys, data, transaction_hash })
}
Expand All @@ -778,23 +814,26 @@ type SubscribeModelsResponseStream =
Pin<Box<dyn Stream<Item = Result<SubscribeModelsResponse, Status>> + Send>>;
type SubscribeEntitiesResponseStream =
Pin<Box<dyn Stream<Item = Result<SubscribeEntityResponse, Status>> + Send>>;
type SubscribeEventsResponseStream =
Pin<Box<dyn Stream<Item = Result<SubscribeEventsResponse, Status>> + Send>>;

#[tonic::async_trait]
impl proto::world::world_server::World for DojoWorld {
type SubscribeModelsStream = SubscribeModelsResponseStream;
type SubscribeEntitiesStream = SubscribeEntitiesResponseStream;
type SubscribeEventMessagesStream = SubscribeEntitiesResponseStream;
type SubscribeEventsStream = SubscribeEventsResponseStream;

async fn world_metadata(
&self,
_request: Request<MetadataRequest>,
) -> Result<Response<MetadataResponse>, Status> {
let metadata = self.metadata().await.map_err(|e| match e {
let metadata = Some(self.metadata().await.map_err(|e| match e {
Error::Sql(sqlx::Error::RowNotFound) => Status::not_found("World not found"),
e => Status::internal(e.to_string()),
})?;
})?);

Ok(Response::new(MetadataResponse { metadata: Some(metadata) }))
Ok(Response::new(MetadataResponse { metadata }))
}

async fn subscribe_models(
Expand Down Expand Up @@ -895,6 +934,17 @@ impl proto::world::world_server::World for DojoWorld {

Ok(Response::new(events))
}

async fn subscribe_events(
&self,
request: Request<proto::world::SubscribeEventsRequest>,
) -> ServiceResult<Self::SubscribeEventsStream> {
let keys = request.into_inner().keys.unwrap_or_default();

let rx = self.subscribe_events(keys).await.map_err(|e| Status::internal(e.to_string()))?;

Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::SubscribeEventsStream))
}
}

pub async fn new(
Expand Down
Loading

0 comments on commit 45fa7dc

Please sign in to comment.