From 68b5faab716c36152f9326d124d0be11b35ef464 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Tue, 26 Sep 2023 20:39:02 +0700 Subject: [PATCH] refactor(torii-server): combine `tonic` and `warp` (#926) --- Cargo.lock | 72 ++++++++++++------- Cargo.toml | 9 +++ crates/torii/grpc/Cargo.toml | 7 +- crates/torii/grpc/src/lib.rs | 56 ++++++++++++++- crates/torii/grpc/src/route.rs | 90 ----------------------- crates/torii/server/Cargo.toml | 10 ++- crates/torii/server/src/cli.rs | 32 ++++----- crates/torii/server/src/server.rs | 116 ++++++++++++++++++++++++++++++ 8 files changed, 254 insertions(+), 138 deletions(-) delete mode 100644 crates/torii/grpc/src/route.rs create mode 100644 crates/torii/server/src/server.rs diff --git a/Cargo.lock b/Cargo.lock index 06c761edf2..cd2a793654 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5013,12 +5013,12 @@ dependencies = [ [[package]] name = "prettyplease" -version = "0.1.25" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c8646e95016a7a6c4adea95bafa8a16baab64b583356217f2c85db4a39d9a86" +checksum = "ae005bd773ab59b4725093fd7df83fd7892f7d8eafb48dbd7de6e024e4215f9d" dependencies = [ "proc-macro2", - "syn 1.0.109", + "syn 2.0.32", ] [[package]] @@ -5089,9 +5089,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.11.9" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" +checksum = "f4fdd22f3b9c31b53c060df4a0613a1c7f062d4115a2b984dd15b1858f7e340d" dependencies = [ "bytes", "prost-derive", @@ -5099,44 +5099,44 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.11.9" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270" +checksum = "8bdf592881d821b83d471f8af290226c8d51402259e9bb5be7f9f8bdebbb11ac" dependencies = [ "bytes", "heck 0.4.1", - "itertools 0.10.5", - "lazy_static", + "itertools 0.11.0", "log", "multimap", + "once_cell", "petgraph", "prettyplease", "prost", "prost-types", "regex", - "syn 1.0.109", + "syn 2.0.32", "tempfile", "which", ] [[package]] name = "prost-derive" -version = "0.11.9" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" +checksum = "265baba7fabd416cf5078179f7d2cbeca4ce7a9041111900675ea7c4cb8a4c32" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.11.0", "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.32", ] [[package]] name = "prost-types" -version = "0.11.9" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13" +checksum = "e081b29f63d83a4bc75cfc9f3fe424f9156cf92d8a4f0c9407cce9a1b67327cf" dependencies = [ "prost", ] @@ -6802,16 +6802,15 @@ dependencies = [ [[package]] name = "tonic" -version = "0.9.2" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" +checksum = "14c00bc15e49625f3d2f20b17082601e5e17cf27ead69e805174026c194b6664" dependencies = [ + "async-stream", "async-trait", "axum", "base64 0.21.4", "bytes", - "futures-core", - "futures-util", "h2", "http", "http-body", @@ -6830,15 +6829,35 @@ dependencies = [ [[package]] name = "tonic-build" -version = "0.9.2" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6fdaae4c2c638bb70fe42803a26fbd6fc6ac8c72f5c59f67ecc2a2dcabf4b07" +checksum = "c9d37bb15da06ae9bb945963066baca6561b505af93a52e949a85d28558459a2" dependencies = [ "prettyplease", "proc-macro2", "prost-build", "quote", - "syn 1.0.109", + "syn 2.0.32", +] + +[[package]] +name = "tonic-web" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2953fe95664e86519e0d1c4bdd65007d93bc47a59c9af512280977aa9e46b871" +dependencies = [ + "base64 0.21.4", + "bytes", + "http", + "http-body", + "hyper", + "pin-project", + "tokio-stream", + "tonic", + "tower-http", + "tower-layer", + "tower-service", + "tracing", ] [[package]] @@ -6924,7 +6943,6 @@ dependencies = [ name = "torii-grpc" version = "0.2.1" dependencies = [ - "bytes", "prost", "sqlx", "tonic", @@ -6946,6 +6964,10 @@ dependencies = [ "ctrlc", "dojo-types", "dojo-world", + "either", + "http", + "http-body", + "hyper", "indexmap 1.9.3", "poem", "scarb", @@ -6957,10 +6979,12 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util", + "tonic-web", "torii-client", "torii-core", "torii-graphql", "torii-grpc", + "tower", "tracing", "tracing-subscriber", "url", diff --git a/Cargo.toml b/Cargo.toml index 5f4afc9399..60515d099e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,8 +85,17 @@ toml = "0.7.4" tracing = "0.1.34" tracing-subscriber = { version = "0.3.16", features = [ "env-filter" ] } url = "2.4.0" + +# server +hyper = "0.14.27" warp = "0.3" +# gRPC +prost = "0.12" +tonic = "0.10" +tonic-build = "0.10" +tonic-web = "0.10.1" + [patch."https://github.com/starkware-libs/blockifier"] blockifier = { git = "https://github.com/dojoengine/blockifier", rev = "f7df9ba" } diff --git a/crates/torii/grpc/Cargo.toml b/crates/torii/grpc/Cargo.toml index 8b25f0694c..7c86677292 100644 --- a/crates/torii/grpc/Cargo.toml +++ b/crates/torii/grpc/Cargo.toml @@ -6,11 +6,10 @@ repository.workspace = true version.workspace = true [dependencies] -bytes = "1.0" -prost = "0.11" +prost.workspace = true sqlx = { version = "0.6.2", features = [ "chrono", "macros", "offline", "runtime-actix-rustls", "sqlite", "uuid" ] } -tonic = "0.9" +tonic.workspace = true warp.workspace = true [build-dependencies] -tonic-build = "0.9" +tonic-build.workspace = true diff --git a/crates/torii/grpc/src/lib.rs b/crates/torii/grpc/src/lib.rs index b9bb57fef0..1b956ac6ee 100644 --- a/crates/torii/grpc/src/lib.rs +++ b/crates/torii/grpc/src/lib.rs @@ -1 +1,55 @@ -pub mod route; +use sqlx::{Pool, Sqlite}; +use tonic::{Request, Response, Status}; +use world::world_server::World; +use world::{MetaReply, MetaRequest}; + +pub mod world { + tonic::include_proto!("world"); +} + +#[derive(Clone, Debug)] +pub struct DojoWorld { + pool: Pool, +} + +impl DojoWorld { + pub fn new(pool: Pool) -> Self { + Self { pool } + } +} + +#[tonic::async_trait] +impl World for DojoWorld { + async fn meta( + &self, + request: Request, // Accept request of type MetaRequest + ) -> Result, Status> { + let id = request.into_inner().id; + + let (world_address, world_class_hash, executor_address, executor_class_hash): ( + String, + String, + String, + String, + ) = sqlx::query_as( + "SELECT world_address, world_class_hash, executor_address, executor_class_hash FROM \ + worlds WHERE id = ?", + ) + .bind(id) + .fetch_one(&self.pool) + .await + .map_err(|e| match e { + sqlx::Error::RowNotFound => Status::not_found("World not found"), + _ => Status::internal("Internal error"), + })?; + + let reply = world::MetaReply { + world_address, + world_class_hash, + executor_address, + executor_class_hash, + }; + + Ok(Response::new(reply)) // Send back our formatted greeting + } +} diff --git a/crates/torii/grpc/src/route.rs b/crates/torii/grpc/src/route.rs deleted file mode 100644 index e7ddf7de16..0000000000 --- a/crates/torii/grpc/src/route.rs +++ /dev/null @@ -1,90 +0,0 @@ -use bytes::{Bytes, BytesMut}; -use prost::Message; -use sqlx::{Pool, Sqlite}; -use tonic::{Request, Response, Status}; -use warp::Filter; -use world::world_server::World; -use world::{MetaReply, MetaRequest}; - -pub mod world { - tonic::include_proto!("world"); -} - -#[derive(Clone, Debug)] -pub struct DojoWorld { - pool: Pool, -} - -impl DojoWorld { - fn new(pool: Pool) -> Self { - Self { pool } - } -} - -#[tonic::async_trait] -impl World for DojoWorld { - async fn meta( - &self, - request: Request, // Accept request of type MetaRequest - ) -> Result, Status> { - let id = request.into_inner().id; - - let (world_address, world_class_hash, executor_address, executor_class_hash): ( - String, - String, - String, - String, - ) = sqlx::query_as( - "SELECT world_address, world_class_hash, executor_address, executor_class_hash FROM \ - worlds WHERE id = ?", - ) - .bind(id) - .fetch_one(&self.pool) - .await - .map_err(|e| match e { - sqlx::Error::RowNotFound => Status::not_found("World not found"), - _ => Status::internal("Internal error"), - })?; - - let reply = world::MetaReply { - world_address, - world_class_hash, - executor_address, - executor_class_hash, - }; - - Ok(Response::new(reply)) // Send back our formatted greeting - } -} - -#[derive(Debug)] -struct InvalidProtobufError; - -impl warp::reject::Reject for InvalidProtobufError {} - -pub fn filter( - pool: &Pool, -) -> impl Filter + Clone { - let world = DojoWorld::new(pool.clone()); - - warp::path("grpc").and(warp::post()).and(warp::body::bytes()).and_then(move |body: Bytes| { - let world = world.clone(); - async move { - let request = match MetaRequest::decode(body) { - Ok(req) => req, - Err(_) => return Err(warp::reject::custom(InvalidProtobufError)), - }; - let response = world.meta(tonic::Request::new(request)).await.unwrap(); - let meta_reply = response.into_inner(); - - let mut bytes = BytesMut::new(); - meta_reply.encode(&mut bytes).unwrap(); - - Ok::<_, warp::reject::Rejection>(warp::reply::with_header( - bytes.to_vec(), - "content-type", - "application/octet-stream", - )) - } - }) -} diff --git a/crates/torii/server/Cargo.toml b/crates/torii/server/Cargo.toml index c19c43946a..7ecd3f7e45 100644 --- a/crates/torii/server/Cargo.toml +++ b/crates/torii/server/Cargo.toml @@ -16,6 +16,8 @@ clap.workspace = true ctrlc = "3.2.5" dojo-types = { path = "../../dojo-types" } dojo-world = { path = "../../dojo-world" } +http = "0.2.9" +hyper.workspace = true indexmap = "1.9.3" poem = "1.3.48" scarb.workspace = true @@ -27,21 +29,25 @@ starknet.workspace = true tokio-stream = "0.1.11" tokio-util = "0.7.7" tokio.workspace = true +tonic-web.workspace = true torii-client = { path = "../client" } torii-core = { path = "../core" } torii-graphql = { path = "../graphql" } torii-grpc = { path = "../grpc" } +tower = "0.4.13" tracing-subscriber.workspace = true tracing.workspace = true url.workspace = true warp.workspace = true +http-body = "0.4.5" +either = "1.9.0" [dev-dependencies] camino.workspace = true [features] -default = [ "sqlite" ] -sqlite = [ "sqlx/sqlite" ] +default = ["sqlite"] +sqlite = ["sqlx/sqlite"] [[bin]] name = "torii" diff --git a/crates/torii/server/src/cli.rs b/crates/torii/server/src/cli.rs index 70461c4b07..9388e7ca43 100644 --- a/crates/torii/server/src/cli.rs +++ b/crates/torii/server/src/cli.rs @@ -1,4 +1,5 @@ use std::env; +use std::net::SocketAddr; use std::str::FromStr; use anyhow::{anyhow, Context}; @@ -21,13 +22,13 @@ use torii_core::sql::Sql; use tracing::error; use tracing_subscriber::fmt; use url::Url; -use warp::Filter; use crate::engine::Processors; use crate::indexer::Indexer; mod engine; mod indexer; +mod server; /// Dojo World Indexer #[derive(Parser, Debug)] @@ -51,12 +52,9 @@ struct Args { /// Host address for GraphQL/gRPC endpoints #[arg(long, default_value = "0.0.0.0")] host: String, - /// Port number for GraphQL endpoint + /// Port number for GraphQL/gRPC endpoints #[arg(long, default_value = "8080")] - graphql_port: u16, - /// Port number for gRPC endpoint - #[arg(long, default_value = "50051")] - grpc_port: u16, + port: u16, } #[tokio::main] @@ -108,23 +106,23 @@ async fn main() -> anyhow::Result<()> { let indexer = Indexer::new(&world, &db, &provider, processors, manifest, world_address, args.start_block); - let base_route = warp::path::end() - .and(warp::get()) - .map(|| warp::reply::json(&serde_json::json!({ "success": true }))); - let routes = torii_graphql::route::filter(&pool) - .await - .or(torii_grpc::route::filter(&pool)) - .or(base_route); - let server = warp::serve(routes); - let server = server.run((args.host.parse::()?, args.graphql_port)); + let addr = format!("{}:{}", args.host, args.port) + .parse::() + .expect("able to parse address"); tokio::select! { res = indexer.start() => { if let Err(e) = res { - error!("Indexer failed with error: {:?}", e); + error!("Indexer failed with error: {e}"); } } - _ = server => {} + + res = server::spawn_server(&addr, &pool) => { + if let Err(e) = res { + error!("Server failed with error: {e}"); + } + } + _ = tokio::signal::ctrl_c() => { println!("Received Ctrl+C, shutting down"); } diff --git a/crates/torii/server/src/server.rs b/crates/torii/server/src/server.rs new file mode 100644 index 0000000000..bfe61bab4c --- /dev/null +++ b/crates/torii/server/src/server.rs @@ -0,0 +1,116 @@ +use std::convert::Infallible; +use std::net::SocketAddr; +use std::pin::Pin; +use std::str::FromStr; +use std::task::Poll; + +use either::Either; +use hyper::service::{make_service_fn, Service}; +use hyper::Uri; +use sqlx::{Pool, Sqlite}; +use warp::Filter; + +type Error = Box; + +// TODO: check if there's a nicer way to implement this +pub async fn spawn_server(addr: &SocketAddr, pool: &Pool) -> anyhow::Result<()> { + let world_server = torii_grpc::DojoWorld::new(pool.clone()); + + let base_route = warp::path::end() + .and(warp::get()) + .map(|| warp::reply::json(&serde_json::json!({ "success": true }))); + let routes = torii_graphql::route::filter(pool).await.or(base_route); + + let warp = warp::service(routes); + let tonic = tonic_web::enable(torii_grpc::world::world_server::WorldServer::new(world_server)); + + hyper::Server::bind(addr) + .serve(make_service_fn(move |_| { + let mut tonic = tonic.clone(); + let mut warp = warp.clone(); + + std::future::ready(Ok::<_, Infallible>(tower::service_fn( + move |mut req: hyper::Request| { + let mut path_iter = req.uri().path().split('/').skip(1); + + // check the base path + match path_iter.next() { + // There's a bug in tonic client where the URI path is not respected in + // `Endpoint`, but this issue doesn't exist if `torii-client` is compiled to + // `wasm32-unknown-unknown`. See: https://github.com/hyperium/tonic/issues/1314 + Some("grpc") => { + let grpc_method = path_iter.collect::>().join("/"); + *req.uri_mut() = + Uri::from_str(&format!("/{grpc_method}")).expect("valid uri"); + + Either::Right({ + let res = tonic.call(req); + Box::pin(async move { + let res = res.await.map(|res| res.map(EitherBody::Right))?; + Ok::<_, Error>(res) + }) + }) + } + + _ => Either::Left({ + let res = warp.call(req); + Box::pin(async move { + let res = res.await.map(|res| res.map(EitherBody::Left))?; + Ok::<_, Error>(res) + }) + }), + } + }, + ))) + })) + .await?; + + Ok(()) +} + +enum EitherBody { + Left(A), + Right(B), +} + +impl http_body::Body for EitherBody +where + A: http_body::Body + Send + Unpin, + B: http_body::Body + Send + Unpin, + A::Error: Into, + B::Error: Into, +{ + type Data = A::Data; + type Error = Box; + + fn is_end_stream(&self) -> bool { + match self { + EitherBody::Left(b) => b.is_end_stream(), + EitherBody::Right(b) => b.is_end_stream(), + } + } + + fn poll_data( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll>> { + match self.get_mut() { + EitherBody::Left(b) => Pin::new(b).poll_data(cx).map(map_option_err), + EitherBody::Right(b) => Pin::new(b).poll_data(cx).map(map_option_err), + } + } + + fn poll_trailers( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll, Self::Error>> { + match self.get_mut() { + EitherBody::Left(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into), + EitherBody::Right(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into), + } + } +} + +fn map_option_err>(err: Option>) -> Option> { + err.map(|e| e.map_err(Into::into)) +}