Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat(torii-grpc): support Events query #1567

Merged
merged 10 commits into from
Feb 29, 2024
19 changes: 19 additions & 0 deletions crates/torii/grpc/proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ message Entity {
repeated Model models = 2;
}

message Event {
// The event's keys
repeated bytes keys = 1;
// Data of the event
repeated bytes data = 2;
// event's transaction hash
bytes transaction_hash = 3;
}

message StorageEntry {
// The key of the changed value
string key = 1;
Expand Down Expand Up @@ -75,6 +84,12 @@ message Query {
uint32 offset = 3;
}

message EventQuery {
EventKeysClause keys =1;
uint32 limit = 2;
uint32 offset = 3;
}

message Clause {
oneof clause_type {
HashedKeysClause hashed_keys = 1;
Expand All @@ -88,6 +103,10 @@ message HashedKeysClause {
repeated bytes hashed_keys = 1;
}

message EventKeysClause {
repeated bytes keys = 1;
}

message KeysClause {
string model = 1;
repeated bytes keys = 2;
Expand Down
12 changes: 12 additions & 0 deletions crates/torii/grpc/proto/world.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ service World {

// Retrieve entities
rpc RetrieveEntities (RetrieveEntitiesRequest) returns (RetrieveEntitiesResponse);

// Retrieve events
rpc RetrieveEvents (RetrieveEventsRequest) returns (RetrieveEventsResponse);
}


Expand Down Expand Up @@ -55,3 +58,12 @@ message RetrieveEntitiesRequest {
message RetrieveEntitiesResponse {
repeated types.Entity entities = 1;
}

message RetrieveEventsRequest {
// The events to retrieve
types.EventQuery query = 1;
}

message RetrieveEventsResponse {
repeated types.Event events = 1;
}
88 changes: 87 additions & 1 deletion crates/torii/grpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use std::str;
use std::str::FromStr;
use std::sync::Arc;

use dojo_types::schema::Ty;
use futures::Stream;
use proto::world::{
MetadataRequest, MetadataResponse, RetrieveEntitiesRequest, RetrieveEntitiesResponse,
SubscribeModelsRequest, SubscribeModelsResponse,
RetrieveEventsRequest, RetrieveEventsResponse, SubscribeModelsRequest, SubscribeModelsResponse,
};
use sqlx::sqlite::SqliteRow;
use sqlx::{Pool, Row, Sqlite};
Expand Down Expand Up @@ -126,6 +127,18 @@
self.entities_by_hashed_keys(None, limit, offset).await
}

async fn events_all(&self, limit: u32, offset: u32) -> Result<Vec<proto::types::Event>, Error> {
let query = r#"
SELECT keys, data, transaction_hash
FROM events
LIMIT ? OFFSET ?
gianalarcon marked this conversation as resolved.
Show resolved Hide resolved
"#.to_string();

Check warning on line 135 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L130-L135

Added lines #L130 - L135 were not covered by tests

let row_events: Vec<(String, String, String)> =
sqlx::query_as(&query).bind(limit).bind(offset).fetch_all(&self.pool).await?;
row_events.iter().map(map_row_to_event).collect()
}

Check warning on line 140 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L137-L140

Added lines #L137 - L140 were not covered by tests

async fn entities_by_hashed_keys(
&self,
hashed_keys: Option<proto::types::HashedKeysClause>,
Expand Down Expand Up @@ -245,6 +258,41 @@
db_entities.iter().map(|row| Self::map_row_to_entity(row, &schemas)).collect()
}

async fn events_by_keys(
&self,
keys_clause: proto::types::EventKeysClause,
limit: u32,
offset: u32,
) -> Result<Vec<proto::types::Event>, Error> {
let keys = keys_clause
.keys

Check warning on line 268 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L261-L268

Added lines #L261 - L268 were not covered by tests
.iter()
.map(|bytes| {
if bytes.is_empty() {

Check warning on line 271 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L270-L271

Added lines #L270 - L271 were not covered by tests
return Ok("%".to_string());
}
Ok(str::from_utf8(bytes).unwrap().to_string())
})
.collect::<Result<Vec<_>, Error>>()?;

Check warning on line 276 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L274-L276

Added lines #L274 - L276 were not covered by tests
let keys_pattern = keys.join("/") + "/%";

let events_query = r#"
SELECT keys, data, transaction_hash
FROM events
WHERE keys LIKE ?
LIMIT ? OFFSET ?

Check warning on line 283 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L278-L283

Added lines #L278 - L283 were not covered by tests
gianalarcon marked this conversation as resolved.
Show resolved Hide resolved
"#.to_string();

let row_events: Vec<(String, String, String)> = sqlx::query_as(&events_query)
.bind(&keys_pattern)
.bind(limit)
.bind(offset)
.fetch_all(&self.pool)

Check warning on line 290 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L286-L290

Added lines #L286 - L290 were not covered by tests
.await?;

Check warning on line 292 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L292

Added line #L292 was not covered by tests
row_events.iter().map(map_row_to_event).collect()
}

Check warning on line 295 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L294-L295

Added lines #L294 - L295 were not covered by tests
async fn entities_by_member(
&self,
member_clause: proto::types::MemberClause,
Expand Down Expand Up @@ -417,6 +465,17 @@
Ok(RetrieveEntitiesResponse { entities })
}

async fn retrieve_events(
&self,
query: proto::types::EventQuery,
) -> Result<proto::world::RetrieveEventsResponse, Error> {

Check warning on line 471 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L468-L471

Added lines #L468 - L471 were not covered by tests
let events = match query.keys {
None => self.events_all(query.limit, query.offset).await?,
Some(keys) => self.events_by_keys(keys, query.limit, query.offset).await?,
};
Ok(RetrieveEventsResponse { events })
}

Check warning on line 478 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L473-L478

Added lines #L473 - L478 were not covered by tests
fn map_row_to_entity(row: &SqliteRow, schemas: &[Ty]) -> Result<proto::types::Entity, Error> {
let hashed_keys =
FieldElement::from_str(&row.get::<String, _>("id")).map_err(ParseError::FromStr)?;
Expand All @@ -434,6 +493,18 @@
}
}

fn process_event_field(data: &String) -> Vec<Vec<u8>> {
data.trim_end_matches('/').split('/').map(|s| s.to_owned().into_bytes()).collect()

Check warning on line 497 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L496-L497

Added lines #L496 - L497 were not covered by tests
}

fn map_row_to_event(row: &(String, String, String)) -> Result<proto::types::Event, Error> {
let keys = process_event_field(&row.0);
let data = process_event_field(&row.1);
let transaction_hash = row.2.to_owned().into_bytes();

Check warning on line 504 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L503-L504

Added lines #L503 - L504 were not covered by tests
Ok(proto::types::Event { keys, data, transaction_hash })
}

Check warning on line 507 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L506-L507

Added lines #L506 - L507 were not covered by tests
type ServiceResult<T> = Result<Response<T>, Status>;
type SubscribeModelsResponseStream =
Pin<Box<dyn Stream<Item = Result<SubscribeModelsResponse, Status>> + Send>>;
Expand Down Expand Up @@ -503,6 +574,21 @@

Ok(Response::new(entities))
}

async fn retrieve_events(
&self,

Check warning on line 579 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L577-L579

Added lines #L577 - L579 were not covered by tests
request: Request<RetrieveEventsRequest>,
) -> Result<Response<RetrieveEventsResponse>, Status> {
let query = request
.into_inner()
.query
.ok_or_else(|| Status::invalid_argument("Missing query argument"))?;

let events =
self.retrieve_events(query).await.map_err(|e| Status::internal(e.to_string()))?;

Ok(Response::new(events))
}

Check warning on line 591 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L581-L591

Added lines #L581 - L591 were not covered by tests
}

pub async fn new(
Expand Down
Loading