Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat(torii-grpc): support Events query #1567

Merged
merged 10 commits into from
Feb 29, 2024
19 changes: 19 additions & 0 deletions crates/torii/grpc/proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ message Entity {
repeated Model models = 2;
}

message Event {
// The event's keys
repeated bytes keys = 1;
// Data of the event
repeated bytes data = 2;
// event's transaction hash
bytes transaction_hash = 3;
}

message StorageEntry {
// The key of the changed value
string key = 1;
Expand Down Expand Up @@ -75,6 +84,12 @@ message Query {
uint32 offset = 3;
}

message EventQuery {
EventKeysClause keys =1;
uint32 limit = 2;
uint32 offset = 3;
}

message Clause {
oneof clause_type {
HashedKeysClause hashed_keys = 1;
Expand All @@ -88,6 +103,10 @@ message HashedKeysClause {
repeated bytes hashed_keys = 1;
}

message EventKeysClause {
repeated bytes keys = 1;
}

message KeysClause {
string model = 1;
repeated bytes keys = 2;
Expand Down
12 changes: 12 additions & 0 deletions crates/torii/grpc/proto/world.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ service World {

// Retrieve entities
rpc RetrieveEntities (RetrieveEntitiesRequest) returns (RetrieveEntitiesResponse);

// Retrieve events
rpc RetrieveEvents (RetrieveEventsRequest) returns (RetrieveEventsResponse);
}


Expand Down Expand Up @@ -55,3 +58,12 @@ message RetrieveEntitiesRequest {
message RetrieveEntitiesResponse {
repeated types.Entity entities = 1;
}

message RetrieveEventsRequest {
// The events to retrieve
types.EventQuery query = 1;
}

message RetrieveEventsResponse {
repeated types.Event events = 1;
}
92 changes: 91 additions & 1 deletion crates/torii/grpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use std::str;
use std::str::FromStr;
use std::sync::Arc;

use dojo_types::schema::Ty;
use futures::Stream;
use proto::world::{
MetadataRequest, MetadataResponse, RetrieveEntitiesRequest, RetrieveEntitiesResponse,
SubscribeModelsRequest, SubscribeModelsResponse,
RetrieveEventsRequest, RetrieveEventsResponse, SubscribeModelsRequest, SubscribeModelsResponse,
};
use sqlx::sqlite::SqliteRow;
use sqlx::{Pool, Row, Sqlite};
Expand Down Expand Up @@ -126,6 +127,20 @@
self.entities_by_hashed_keys(None, limit, offset).await
}

async fn events_all(&self, limit: u32, offset: u32) -> Result<Vec<proto::types::Event>, Error> {
let query = r#"
SELECT keys, data, transaction_hash
FROM events
ORDER BY id DESC
LIMIT ? OFFSET ?
gianalarcon marked this conversation as resolved.
Show resolved Hide resolved
"#
.to_string();

Check warning on line 137 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L130-L137

Added lines #L130 - L137 were not covered by tests

let row_events: Vec<(String, String, String)> =
sqlx::query_as(&query).bind(limit).bind(offset).fetch_all(&self.pool).await?;
row_events.iter().map(map_row_to_event).collect()
}

Check warning on line 142 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L139-L142

Added lines #L139 - L142 were not covered by tests

async fn entities_by_hashed_keys(
&self,
hashed_keys: Option<proto::types::HashedKeysClause>,
Expand Down Expand Up @@ -245,6 +260,43 @@
db_entities.iter().map(|row| Self::map_row_to_entity(row, &schemas)).collect()
}

async fn events_by_keys(
&self,
keys_clause: proto::types::EventKeysClause,
limit: u32,
offset: u32,
) -> Result<Vec<proto::types::Event>, Error> {
let keys = keys_clause
.keys

Check warning on line 270 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L263-L270

Added lines #L263 - L270 were not covered by tests
.iter()
.map(|bytes| {
if bytes.is_empty() {

Check warning on line 273 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L272-L273

Added lines #L272 - L273 were not covered by tests
return Ok("%".to_string());
}
Ok(str::from_utf8(bytes).unwrap().to_string())
})
.collect::<Result<Vec<_>, Error>>()?;

Check warning on line 278 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L276-L278

Added lines #L276 - L278 were not covered by tests
let keys_pattern = keys.join("/") + "/%";

let events_query = r#"
SELECT keys, data, transaction_hash
FROM events
WHERE keys LIKE ?
ORDER BY id DESC

Check warning on line 285 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L280-L285

Added lines #L280 - L285 were not covered by tests
LIMIT ? OFFSET ?
gianalarcon marked this conversation as resolved.
Show resolved Hide resolved
"#
.to_string();

let row_events: Vec<(String, String, String)> = sqlx::query_as(&events_query)
.bind(&keys_pattern)
.bind(limit)

Check warning on line 292 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L288-L292

Added lines #L288 - L292 were not covered by tests
.bind(offset)
.fetch_all(&self.pool)

Check warning on line 294 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L294

Added line #L294 was not covered by tests
.await?;

row_events.iter().map(map_row_to_event).collect()
}

Check warning on line 299 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L296-L299

Added lines #L296 - L299 were not covered by tests
async fn entities_by_member(
&self,
member_clause: proto::types::MemberClause,
Expand Down Expand Up @@ -417,6 +469,17 @@
Ok(RetrieveEntitiesResponse { entities })
}

async fn retrieve_events(
&self,
query: proto::types::EventQuery,
) -> Result<proto::world::RetrieveEventsResponse, Error> {

Check warning on line 475 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L472-L475

Added lines #L472 - L475 were not covered by tests
let events = match query.keys {
None => self.events_all(query.limit, query.offset).await?,
Some(keys) => self.events_by_keys(keys, query.limit, query.offset).await?,
};
Ok(RetrieveEventsResponse { events })
}

Check warning on line 482 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L477-L482

Added lines #L477 - L482 were not covered by tests
fn map_row_to_entity(row: &SqliteRow, schemas: &[Ty]) -> Result<proto::types::Entity, Error> {
let hashed_keys =
FieldElement::from_str(&row.get::<String, _>("id")).map_err(ParseError::FromStr)?;
Expand All @@ -434,6 +497,18 @@
}
}

fn process_event_field(data: &str) -> Vec<Vec<u8>> {
data.trim_end_matches('/').split('/').map(|s| s.to_owned().into_bytes()).collect()

Check warning on line 501 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L500-L501

Added lines #L500 - L501 were not covered by tests
}

fn map_row_to_event(row: &(String, String, String)) -> Result<proto::types::Event, Error> {
let keys = process_event_field(&row.0);
let data = process_event_field(&row.1);
let transaction_hash = row.2.to_owned().into_bytes();

Check warning on line 508 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L507-L508

Added lines #L507 - L508 were not covered by tests
Ok(proto::types::Event { keys, data, transaction_hash })
}

Check warning on line 511 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L510-L511

Added lines #L510 - L511 were not covered by tests
type ServiceResult<T> = Result<Response<T>, Status>;
type SubscribeModelsResponseStream =
Pin<Box<dyn Stream<Item = Result<SubscribeModelsResponse, Status>> + Send>>;
Expand Down Expand Up @@ -503,6 +578,21 @@

Ok(Response::new(entities))
}

async fn retrieve_events(
&self,

Check warning on line 583 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L581-L583

Added lines #L581 - L583 were not covered by tests
request: Request<RetrieveEventsRequest>,
) -> Result<Response<RetrieveEventsResponse>, Status> {
let query = request
.into_inner()
.query
.ok_or_else(|| Status::invalid_argument("Missing query argument"))?;

let events =
self.retrieve_events(query).await.map_err(|e| Status::internal(e.to_string()))?;

Ok(Response::new(events))
}

Check warning on line 595 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L585-L595

Added lines #L585 - L595 were not covered by tests
}

pub async fn new(
Expand Down
Loading