Skip to content

Commit

Permalink
Feat(torii-grpc): support Events query (#1567)
Browse files Browse the repository at this point in the history
* Feat: Add fn retrieve_events()

* Feat: Add event related messages

* Update crates/torii/grpc/proto/types.proto

Co-authored-by: Yun <[email protected]>

* Update crates/torii/grpc/proto/types.proto

Co-authored-by: Yun <[email protected]>

* Remove hashed_keys on EventQuery, Remove EventClause

* Feat: Improve fn events_all and events_by_keys

* Refactor code

* fix clippy

* Update query

* Fix sql query

---------

Co-authored-by: Yun <[email protected]>
  • Loading branch information
gianalarcon and broody authored Feb 29, 2024
1 parent be5469e commit 6a513fe
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 1 deletion.
19 changes: 19 additions & 0 deletions crates/torii/grpc/proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ message Entity {
repeated Model models = 2;
}

message Event {
// The event's keys
repeated bytes keys = 1;
// Data of the event
repeated bytes data = 2;
// event's transaction hash
bytes transaction_hash = 3;
}

message StorageEntry {
// The key of the changed value
string key = 1;
Expand Down Expand Up @@ -75,6 +84,12 @@ message Query {
uint32 offset = 3;
}

message EventQuery {
EventKeysClause keys =1;
uint32 limit = 2;
uint32 offset = 3;
}

message Clause {
oneof clause_type {
HashedKeysClause hashed_keys = 1;
Expand All @@ -88,6 +103,10 @@ message HashedKeysClause {
repeated bytes hashed_keys = 1;
}

message EventKeysClause {
repeated bytes keys = 1;
}

message KeysClause {
string model = 1;
repeated bytes keys = 2;
Expand Down
12 changes: 12 additions & 0 deletions crates/torii/grpc/proto/world.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ service World {

// Retrieve entities
rpc RetrieveEntities (RetrieveEntitiesRequest) returns (RetrieveEntitiesResponse);

// Retrieve events
rpc RetrieveEvents (RetrieveEventsRequest) returns (RetrieveEventsResponse);
}


Expand Down Expand Up @@ -56,3 +59,12 @@ message RetrieveEntitiesResponse {
repeated types.Entity entities = 1;
uint32 total_count = 2;
}

message RetrieveEventsRequest {
// The events to retrieve
types.EventQuery query = 1;
}

message RetrieveEventsResponse {
repeated types.Event events = 1;
}
92 changes: 91 additions & 1 deletion crates/torii/grpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ pub mod subscriptions;
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use std::str;
use std::str::FromStr;
use std::sync::Arc;

use dojo_types::schema::Ty;
use futures::Stream;
use proto::world::{
MetadataRequest, MetadataResponse, RetrieveEntitiesRequest, RetrieveEntitiesResponse,
SubscribeModelsRequest, SubscribeModelsResponse,
RetrieveEventsRequest, RetrieveEventsResponse, SubscribeModelsRequest, SubscribeModelsResponse,
};
use sqlx::sqlite::SqliteRow;
use sqlx::{Pool, Row, Sqlite};
Expand Down Expand Up @@ -126,6 +127,20 @@ impl DojoWorld {
self.entities_by_hashed_keys(None, limit, offset).await
}

async fn events_all(&self, limit: u32, offset: u32) -> Result<Vec<proto::types::Event>, Error> {
let query = r#"
SELECT keys, data, transaction_hash
FROM events
ORDER BY id DESC
LIMIT ? OFFSET ?
"#
.to_string();

let row_events: Vec<(String, String, String)> =
sqlx::query_as(&query).bind(limit).bind(offset).fetch_all(&self.pool).await?;
row_events.iter().map(map_row_to_event).collect()
}

async fn entities_by_hashed_keys(
&self,
hashed_keys: Option<proto::types::HashedKeysClause>,
Expand Down Expand Up @@ -278,6 +293,43 @@ impl DojoWorld {
))
}

async fn events_by_keys(
&self,
keys_clause: proto::types::EventKeysClause,
limit: u32,
offset: u32,
) -> Result<Vec<proto::types::Event>, Error> {
let keys = keys_clause
.keys
.iter()
.map(|bytes| {
if bytes.is_empty() {
return Ok("%".to_string());
}
Ok(str::from_utf8(bytes).unwrap().to_string())
})
.collect::<Result<Vec<_>, Error>>()?;
let keys_pattern = keys.join("/") + "/%";

let events_query = r#"
SELECT keys, data, transaction_hash
FROM events
WHERE keys LIKE ?
ORDER BY id DESC
LIMIT ? OFFSET ?
"#
.to_string();

let row_events: Vec<(String, String, String)> = sqlx::query_as(&events_query)
.bind(&keys_pattern)
.bind(limit)
.bind(offset)
.fetch_all(&self.pool)
.await?;

row_events.iter().map(map_row_to_event).collect()
}

async fn entities_by_member(
&self,
member_clause: proto::types::MemberClause,
Expand Down Expand Up @@ -455,6 +507,17 @@ impl DojoWorld {
Ok(RetrieveEntitiesResponse { entities, total_count })
}

async fn retrieve_events(
&self,
query: proto::types::EventQuery,
) -> Result<proto::world::RetrieveEventsResponse, Error> {
let events = match query.keys {
None => self.events_all(query.limit, query.offset).await?,
Some(keys) => self.events_by_keys(keys, query.limit, query.offset).await?,
};
Ok(RetrieveEventsResponse { events })
}

fn map_row_to_entity(row: &SqliteRow, schemas: &[Ty]) -> Result<proto::types::Entity, Error> {
let hashed_keys =
FieldElement::from_str(&row.get::<String, _>("id")).map_err(ParseError::FromStr)?;
Expand All @@ -472,6 +535,18 @@ impl DojoWorld {
}
}

fn process_event_field(data: &str) -> Vec<Vec<u8>> {
data.trim_end_matches('/').split('/').map(|s| s.to_owned().into_bytes()).collect()
}

fn map_row_to_event(row: &(String, String, String)) -> Result<proto::types::Event, Error> {
let keys = process_event_field(&row.0);
let data = process_event_field(&row.1);
let transaction_hash = row.2.to_owned().into_bytes();

Ok(proto::types::Event { keys, data, transaction_hash })
}

type ServiceResult<T> = Result<Response<T>, Status>;
type SubscribeModelsResponseStream =
Pin<Box<dyn Stream<Item = Result<SubscribeModelsResponse, Status>> + Send>>;
Expand Down Expand Up @@ -541,6 +616,21 @@ impl proto::world::world_server::World for DojoWorld {

Ok(Response::new(entities))
}

async fn retrieve_events(
&self,
request: Request<RetrieveEventsRequest>,
) -> Result<Response<RetrieveEventsResponse>, Status> {
let query = request
.into_inner()
.query
.ok_or_else(|| Status::invalid_argument("Missing query argument"))?;

let events =
self.retrieve_events(query).await.map_err(|e| Status::internal(e.to_string()))?;

Ok(Response::new(events))
}
}

pub async fn new(
Expand Down

0 comments on commit 6a513fe

Please sign in to comment.