Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restart torii server on new model registration #971

Merged
merged 3 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ impl Sql {
let mut model_idx = 0_usize;
self.build_register_queries_recursive(&model, vec![model.name()], &mut model_idx);

self.execute().await?;

// Since previous query has not been executed, we have to make sure created_at exists
let created_at: DateTime<Utc> =
match sqlx::query("SELECT created_at FROM models WHERE id = ?")
Expand Down
2 changes: 1 addition & 1 deletion crates/torii/graphql/src/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use self::connection::edge::EdgeObject;
use self::connection::ConnectionObject;
use crate::types::{TypeMapping, ValueMapping};

pub trait ObjectTrait {
pub trait ObjectTrait: Send + Sync {
// Name of the graphql object (eg "player")
fn name(&self) -> &str;

Expand Down
16 changes: 13 additions & 3 deletions crates/torii/server/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
mod server;

use std::net::SocketAddr;
use std::sync::Arc;

use clap::Parser;
use server::Server;
use sqlx::sqlite::SqlitePoolOptions;
use starknet::core::types::FieldElement;
use starknet::providers::jsonrpc::HttpTransport;
Expand All @@ -17,8 +20,6 @@ use tracing::error;
use tracing_subscriber::fmt;
use url::Url;

mod server;

/// Dojo World Indexer
#[derive(Parser, Debug)]
#[command(name = "torii", author, version, about, long_about = None)]
Expand Down Expand Up @@ -102,14 +103,23 @@ async fn main() -> anyhow::Result<()> {

let addr: SocketAddr = format!("{}:{}", args.host, args.port).parse()?;

let server = Server::new(
addr,
pool,
block_receiver,
args.world_address,
Arc::clone(&provider),
args.allowed_origins,
);

tokio::select! {
res = engine.start(cts) => {
if let Err(e) = res {
error!("Indexer failed with error: {e}");
}
}

res = server::spawn_server(&addr, &pool, args.world_address, block_receiver, Arc::clone(&provider), args.allowed_origins) => {
res = server.start() => {
if let Err(e) = res {
error!("Server failed with error: {e}");
}
Expand Down
92 changes: 77 additions & 15 deletions crates/torii/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,97 @@ use sqlx::{Pool, Sqlite};
use starknet::providers::jsonrpc::HttpTransport;
use starknet::providers::JsonRpcClient;
use starknet_crypto::FieldElement;
use tokio::sync::mpsc::Receiver as BoundedReceiver;
use tokio::sync::mpsc::Receiver;
use tokio::sync::Notify;
use tokio_stream::StreamExt;
use torii_core::simple_broker::SimpleBroker;
use torii_core::types::Model;
use torii_grpc::protos;
use torii_grpc::server::DojoWorld;
use warp::filters::cors::Builder;
use warp::Filter;

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

pub struct Server {
addr: SocketAddr,
pool: Pool<Sqlite>,
world: DojoWorld,
allowed_origins: Vec<String>,
}

impl Server {
pub fn new(
addr: SocketAddr,
pool: Pool<Sqlite>,
block_rx: Receiver<u64>,
world_address: FieldElement,
provider: Arc<JsonRpcClient<HttpTransport>>,
allowed_origins: Vec<String>,
) -> Self {
let world =
torii_grpc::server::DojoWorld::new(pool.clone(), block_rx, world_address, provider);

Self { addr, pool, world, allowed_origins }
}

pub async fn start(&self) -> anyhow::Result<()> {
let notify_restart = Arc::new(Notify::new());

tokio::spawn(model_registered_listener(notify_restart.clone()));

loop {
let server_handle = tokio::spawn(spawn(
self.addr,
self.pool.clone(),
self.world.clone(),
notify_restart.clone(),
self.allowed_origins.clone(),
));

match server_handle.await {
Ok(Ok(_)) => {
// server graceful shutdown, restart
continue;
}
Ok(Err(e)) => {
return Err(e);
}
Err(e) => return Err(e.into()),
};
}
}
}

async fn model_registered_listener(notify_restart: Arc<Notify>) {
while (SimpleBroker::<Model>::subscribe().next().await).is_some() {
notify_restart.notify_one();
}
}

// TODO: check if there's a nicer way to implement this
pub async fn spawn_server(
addr: &SocketAddr,
pool: &Pool<Sqlite>,
world_address: FieldElement,
block_receiver: BoundedReceiver<u64>,
provider: Arc<JsonRpcClient<HttpTransport>>,
async fn spawn(
addr: SocketAddr,
pool: Pool<Sqlite>,
dojo_world: DojoWorld,
notify_restart: Arc<Notify>,
allowed_origins: Vec<String>,
) -> anyhow::Result<()> {
let world_server =
torii_grpc::server::DojoWorld::new(pool.clone(), block_receiver, world_address, provider);

let base_route =
warp::path::end().map(|| warp::reply::json(&serde_json::json!({ "success": true })));
let routes = torii_graphql::route::filter(pool)
let base_route = warp::path::end()
.and(warp::get())
.map(|| warp::reply::json(&serde_json::json!({ "success": true })));
let routes = torii_graphql::route::filter(&pool)
.await
.or(base_route)
.with(configure_cors(&allowed_origins));

let warp = warp::service(routes);

// TODO: apply allowed_origins to tonic grpc
let tonic = tonic_web::enable(protos::world::world_server::WorldServer::new(world_server));
let tonic =
tonic_web::enable(protos::world::world_server::WorldServer::new(dojo_world.clone()));

hyper::Server::bind(addr)
hyper::Server::bind(&addr)
.serve(make_service_fn(move |_| {
let mut tonic = tonic.clone();
let mut warp = warp.clone();
Expand Down Expand Up @@ -84,6 +143,9 @@ pub async fn spawn_server(
},
)))
}))
.with_graceful_shutdown(async {
notify_restart.notified().await;
})
.await?;

Ok(())
Expand Down