diff --git a/Cargo.lock b/Cargo.lock index 4713f01c86..611c847cbc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4940,11 +4940,11 @@ dependencies = [ "assert_fs", "async-trait", "camino", - "jsonrpsee 0.16.3", "katana-core", "katana-executor", "katana-node", "katana-primitives", + "katana-rpc", "scarb", "scarb-ui", "serde", diff --git a/crates/dojo/test-utils/Cargo.toml b/crates/dojo/test-utils/Cargo.toml index fbff82a6bf..f2e47f07e5 100644 --- a/crates/dojo/test-utils/Cargo.toml +++ b/crates/dojo/test-utils/Cargo.toml @@ -10,11 +10,11 @@ anyhow.workspace = true assert_fs.workspace = true async-trait.workspace = true camino.workspace = true -jsonrpsee = { workspace = true, features = [ "server" ] } katana-core = { workspace = true } katana-executor = { workspace = true, features = [ "blockifier" ] } katana-node.workspace = true katana-primitives = { workspace = true } +katana-rpc.workspace = true scarb.workspace = true scarb-ui.workspace = true serde.workspace = true diff --git a/crates/dojo/test-utils/src/sequencer.rs b/crates/dojo/test-utils/src/sequencer.rs index e2fd3d5ad9..c2723a4564 100644 --- a/crates/dojo/test-utils/src/sequencer.rs +++ b/crates/dojo/test-utils/src/sequencer.rs @@ -1,7 +1,6 @@ use std::collections::HashSet; use std::sync::Arc; -use jsonrpsee::core::Error; use katana_core::backend::Backend; use katana_core::constants::DEFAULT_SEQUENCER_ADDRESS; use katana_executor::implementation::blockifier::BlockifierFactory; @@ -11,6 +10,7 @@ pub use katana_node::config::*; use katana_node::LaunchedNode; use katana_primitives::chain::ChainId; use katana_primitives::chain_spec::ChainSpec; +use katana_rpc::Error; use starknet::accounts::{ExecutionEncoding, SingleOwnerAccount}; use starknet::core::chain_id; use starknet::core::types::{BlockId, BlockTag, Felt}; @@ -42,7 +42,8 @@ impl TestSequencer { .await .expect("Failed to launch node"); - let url = Url::parse(&format!("http://{}", handle.rpc.addr)).expect("Failed to parse URL"); + let url = + Url::parse(&format!("http://{}", handle.rpc.addr())).expect("Failed to parse URL"); let account = handle.node.backend.chain_spec.genesis.accounts().next().unwrap(); let account = TestAccount { @@ -104,7 +105,7 @@ impl TestSequencer { } pub fn stop(self) -> Result<(), Error> { - self.handle.rpc.handle.stop() + self.handle.rpc.stop() } pub fn url(&self) -> Url { diff --git a/crates/katana/node/src/exit.rs b/crates/katana/node/src/exit.rs index de1e42c9f3..54f659ee29 100644 --- a/crates/katana/node/src/exit.rs +++ b/crates/katana/node/src/exit.rs @@ -18,7 +18,7 @@ impl<'a> NodeStoppedFuture<'a> { pub(crate) fn new(handle: &'a LaunchedNode) -> Self { let fut = Box::pin(async { handle.node.task_manager.wait_for_shutdown().await; - handle.stop().await?; + handle.rpc.clone().stopped().await; Ok(()) }); Self { fut } diff --git a/crates/katana/node/src/lib.rs b/crates/katana/node/src/lib.rs index f876bcfb05..922586c8da 100644 --- a/crates/katana/node/src/lib.rs +++ b/crates/katana/node/src/lib.rs @@ -11,7 +11,7 @@ use std::future::IntoFuture; use std::sync::Arc; use anyhow::Result; -use config::rpc::{ApiKind, RpcConfig}; +use config::rpc::ApiKind; use config::Config; use dojo_metrics::exporters::prometheus::PrometheusRecorder; use dojo_metrics::{Report, Server as MetricsServer}; @@ -28,7 +28,7 @@ use katana_core::env::BlockContextGenerator; use katana_core::service::block_producer::BlockProducer; use katana_db::mdbx::DbEnv; use katana_executor::implementation::blockifier::BlockifierFactory; -use katana_executor::{ExecutionFlags, ExecutorFactory}; +use katana_executor::ExecutionFlags; use katana_pool::ordering::FiFo; use katana_pool::TxPool; use katana_primitives::block::GasPrices; @@ -64,7 +64,7 @@ impl LaunchedNode { /// This will instruct the node to stop and wait until it has actually stop. pub async fn stop(&self) -> Result<()> { // TODO: wait for the rpc server to stop instead of just stopping it. - self.rpc.handle.stop()?; + self.rpc.stop()?; self.node.task_manager.shutdown().await; Ok(()) } @@ -83,18 +83,18 @@ impl LaunchedNode { pub struct Node { pub pool: TxPool, pub db: Option, + pub rpc_server: RpcServer, pub task_manager: TaskManager, pub backend: Arc>, pub block_producer: BlockProducer, pub config: Arc, - forked_client: Option, } impl Node { /// Start the node. /// /// This method will start all the node process, running them until the node is stopped. - pub async fn launch(mut self) -> Result { + pub async fn launch(self) -> Result { let chain = self.backend.chain_spec.id; info!(%chain, "Starting node."); @@ -135,16 +135,18 @@ impl Node { .name("Sequencing") .spawn(sequencing.into_future()); - let node_components = (pool, backend, block_producer, self.forked_client.take()); - let rpc = spawn(node_components, self.config.rpc.clone()).await?; + // --- start the rpc server + + let rpc_handle = self.rpc_server.start(self.config.rpc.socket_addr()).await?; // --- start the gas oracle worker task + if let Some(ref url) = self.config.l1_provider_url { self.backend.gas_oracle.run_worker(self.task_manager.task_spawner()); info!(%url, "Gas Price Oracle started."); }; - Ok(LaunchedNode { node: self, rpc }) + Ok(LaunchedNode { node: self, rpc: rpc_handle }) } } @@ -240,36 +242,18 @@ pub async fn build(mut config: Config) -> Result { let validator = block_producer.validator(); let pool = TxPool::new(validator.clone(), FiFo::new()); - let node = Node { - db, - pool, - backend, - forked_client, - block_producer, - config: Arc::new(config), - task_manager: TaskManager::current(), - }; - - Ok(node) -} - -// Moved from `katana_rpc` crate -pub async fn spawn( - node_components: (TxPool, Arc>, BlockProducer, Option), - config: RpcConfig, -) -> Result { - let (pool, backend, block_producer, forked_client) = node_components; + // --- build rpc server - let mut modules = RpcModule::new(()); + let mut rpc_modules = RpcModule::new(()); let cors = Cors::new() - .allow_origins(config.cors_origins.clone()) - // Allow `POST` when accessing the resource - .allow_methods([Method::POST, Method::GET]) - .allow_headers([hyper::header::CONTENT_TYPE, "argent-client".parse().unwrap(), "argent-version".parse().unwrap()]); + .allow_origins(config.rpc.cors_origins.clone()) + // Allow `POST` when accessing the resource + .allow_methods([Method::POST, Method::GET]) + .allow_headers([hyper::header::CONTENT_TYPE, "argent-client".parse().unwrap(), "argent-version".parse().unwrap()]); - if config.apis.contains(&ApiKind::Starknet) { - let cfg = StarknetApiConfig { max_event_page_size: config.max_event_page_size }; + if config.rpc.apis.contains(&ApiKind::Starknet) { + let cfg = StarknetApiConfig { max_event_page_size: config.rpc.max_event_page_size }; let api = if let Some(client) = forked_client { StarknetApi::new_forked( @@ -283,28 +267,35 @@ pub async fn spawn( StarknetApi::new(backend.clone(), pool.clone(), Some(block_producer.clone()), cfg) }; - modules.merge(StarknetApiServer::into_rpc(api.clone()))?; - modules.merge(StarknetWriteApiServer::into_rpc(api.clone()))?; - modules.merge(StarknetTraceApiServer::into_rpc(api))?; + rpc_modules.merge(StarknetApiServer::into_rpc(api.clone()))?; + rpc_modules.merge(StarknetWriteApiServer::into_rpc(api.clone()))?; + rpc_modules.merge(StarknetTraceApiServer::into_rpc(api))?; } - if config.apis.contains(&ApiKind::Dev) { + if config.rpc.apis.contains(&ApiKind::Dev) { let api = DevApi::new(backend.clone(), block_producer.clone()); - modules.merge(api.into_rpc())?; + rpc_modules.merge(api.into_rpc())?; } - if config.apis.contains(&ApiKind::Torii) { + if config.rpc.apis.contains(&ApiKind::Torii) { let api = ToriiApi::new(backend.clone(), pool.clone(), block_producer.clone()); - modules.merge(api.into_rpc())?; + rpc_modules.merge(api.into_rpc())?; } - if config.apis.contains(&ApiKind::Saya) { + if config.rpc.apis.contains(&ApiKind::Saya) { let api = SayaApi::new(backend.clone(), block_producer.clone()); - modules.merge(api.into_rpc())?; + rpc_modules.merge(api.into_rpc())?; } - let server = RpcServer::new().metrics().health_check().cors(cors).module(modules); - let handle = server.start(config.socket_addr()).await?; + let rpc_server = RpcServer::new().metrics().health_check().cors(cors).module(rpc_modules); - Ok(handle) + Ok(Node { + db, + pool, + backend, + rpc_server, + block_producer, + config: Arc::new(config), + task_manager: TaskManager::current(), + }) } diff --git a/crates/katana/rpc/rpc/src/lib.rs b/crates/katana/rpc/rpc/src/lib.rs index 5a1230b39b..05811354a5 100644 --- a/crates/katana/rpc/rpc/src/lib.rs +++ b/crates/katana/rpc/rpc/src/lib.rs @@ -33,13 +33,17 @@ pub enum Error { AlreadyStopped, } -#[derive(Debug)] +/// The RPC server handle. +#[derive(Debug, Clone)] pub struct RpcServerHandle { - pub addr: SocketAddr, - pub handle: ServerHandle, + /// The actual address that the server is binded to. + addr: SocketAddr, + /// The handle to the spawned [`jsonrpsee::server::Server`]. + handle: ServerHandle, } impl RpcServerHandle { + /// Tell the server to stop without waiting for the server to stop. pub fn stop(&self) -> Result<(), Error> { self.handle.stop().map_err(|_| Error::AlreadyStopped) } @@ -48,6 +52,11 @@ impl RpcServerHandle { pub async fn stopped(self) { self.handle.stopped().await } + + /// Returns the socket address the server is listening on. + pub fn addr(&self) -> &SocketAddr { + &self.addr + } } #[derive(Debug)]