Skip to content

Commit

Permalink
flatten rpc server building logic
Browse files Browse the repository at this point in the history
  • Loading branch information
kariy committed Dec 10, 2024
1 parent 71db0b4 commit d281cb4
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 46 deletions.
2 changes: 1 addition & 1 deletion crates/katana/node/src/exit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl<'a> NodeStoppedFuture<'a> {
pub(crate) fn new(handle: &'a LaunchedNode) -> Self {
let fut = Box::pin(async {
handle.node.task_manager.wait_for_shutdown().await;
handle.stop().await?;
handle.rpc.handle.clone().stopped().await;

Check warning on line 21 in crates/katana/node/src/exit.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/node/src/exit.rs#L21

Added line #L21 was not covered by tests
Ok(())
});
Self { fut }
Expand Down
81 changes: 36 additions & 45 deletions crates/katana/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::future::IntoFuture;
use std::sync::Arc;

use anyhow::Result;
use config::rpc::{ApiKind, RpcConfig};
use config::rpc::ApiKind;
use config::Config;
use dojo_metrics::exporters::prometheus::PrometheusRecorder;
use dojo_metrics::{Report, Server as MetricsServer};
Expand All @@ -28,7 +28,7 @@ use katana_core::env::BlockContextGenerator;
use katana_core::service::block_producer::BlockProducer;
use katana_db::mdbx::DbEnv;
use katana_executor::implementation::blockifier::BlockifierFactory;
use katana_executor::{ExecutionFlags, ExecutorFactory};
use katana_executor::ExecutionFlags;
use katana_pool::ordering::FiFo;
use katana_pool::TxPool;
use katana_primitives::block::GasPrices;
Expand Down Expand Up @@ -83,18 +83,18 @@ impl LaunchedNode {
pub struct Node {
pub pool: TxPool,
pub db: Option<DbEnv>,
pub rpc_server: RpcServer,
pub task_manager: TaskManager,
pub backend: Arc<Backend<BlockifierFactory>>,
pub block_producer: BlockProducer<BlockifierFactory>,
pub config: Arc<Config>,
forked_client: Option<ForkedClient>,
}

impl Node {
/// Start the node.
///
/// This method will start all the node process, running them until the node is stopped.
pub async fn launch(mut self) -> Result<LaunchedNode> {
pub async fn launch(self) -> Result<LaunchedNode> {
let chain = self.backend.chain_spec.id;
info!(%chain, "Starting node.");

Expand Down Expand Up @@ -135,16 +135,18 @@ impl Node {
.name("Sequencing")
.spawn(sequencing.into_future());

let node_components = (pool, backend, block_producer, self.forked_client.take());
let rpc = spawn(node_components, self.config.rpc.clone()).await?;
// --- start the rpc server

let rpc_handle = self.rpc_server.start(self.config.rpc.socket_addr()).await?;

// --- start the gas oracle worker task

if let Some(ref url) = self.config.l1_provider_url {
self.backend.gas_oracle.run_worker(self.task_manager.task_spawner());
info!(%url, "Gas Price Oracle started.");
};

Ok(LaunchedNode { node: self, rpc })
Ok(LaunchedNode { node: self, rpc: rpc_handle })
}
}

Expand Down Expand Up @@ -240,36 +242,18 @@ pub async fn build(mut config: Config) -> Result<Node> {
let validator = block_producer.validator();
let pool = TxPool::new(validator.clone(), FiFo::new());

let node = Node {
db,
pool,
backend,
forked_client,
block_producer,
config: Arc::new(config),
task_manager: TaskManager::current(),
};

Ok(node)
}

// Moved from `katana_rpc` crate
pub async fn spawn<EF: ExecutorFactory>(
node_components: (TxPool, Arc<Backend<EF>>, BlockProducer<EF>, Option<ForkedClient>),
config: RpcConfig,
) -> Result<RpcServerHandle> {
let (pool, backend, block_producer, forked_client) = node_components;
// --- build rpc server

let mut modules = RpcModule::new(());
let mut rpc_modules = RpcModule::new(());

let cors = Cors::new()
.allow_origins(config.cors_origins.clone())
// Allow `POST` when accessing the resource
.allow_methods([Method::POST, Method::GET])
.allow_headers([hyper::header::CONTENT_TYPE, "argent-client".parse().unwrap(), "argent-version".parse().unwrap()]);
.allow_origins(config.rpc.cors_origins.clone())
// Allow `POST` when accessing the resource
.allow_methods([Method::POST, Method::GET])
.allow_headers([hyper::header::CONTENT_TYPE, "argent-client".parse().unwrap(), "argent-version".parse().unwrap()]);

if config.apis.contains(&ApiKind::Starknet) {
let cfg = StarknetApiConfig { max_event_page_size: config.max_event_page_size };
if config.rpc.apis.contains(&ApiKind::Starknet) {
let cfg = StarknetApiConfig { max_event_page_size: config.rpc.max_event_page_size };

let api = if let Some(client) = forked_client {
StarknetApi::new_forked(
Expand All @@ -283,28 +267,35 @@ pub async fn spawn<EF: ExecutorFactory>(
StarknetApi::new(backend.clone(), pool.clone(), Some(block_producer.clone()), cfg)
};

modules.merge(StarknetApiServer::into_rpc(api.clone()))?;
modules.merge(StarknetWriteApiServer::into_rpc(api.clone()))?;
modules.merge(StarknetTraceApiServer::into_rpc(api))?;
rpc_modules.merge(StarknetApiServer::into_rpc(api.clone()))?;
rpc_modules.merge(StarknetWriteApiServer::into_rpc(api.clone()))?;
rpc_modules.merge(StarknetTraceApiServer::into_rpc(api))?;
}

if config.apis.contains(&ApiKind::Dev) {
if config.rpc.apis.contains(&ApiKind::Dev) {
let api = DevApi::new(backend.clone(), block_producer.clone());
modules.merge(api.into_rpc())?;
rpc_modules.merge(api.into_rpc())?;
}

if config.apis.contains(&ApiKind::Torii) {
if config.rpc.apis.contains(&ApiKind::Torii) {
let api = ToriiApi::new(backend.clone(), pool.clone(), block_producer.clone());
modules.merge(api.into_rpc())?;
rpc_modules.merge(api.into_rpc())?;
}

if config.apis.contains(&ApiKind::Saya) {
if config.rpc.apis.contains(&ApiKind::Saya) {
let api = SayaApi::new(backend.clone(), block_producer.clone());
modules.merge(api.into_rpc())?;
rpc_modules.merge(api.into_rpc())?;
}

let server = RpcServer::new().metrics().health_check().cors(cors).module(modules);
let handle = server.start(config.socket_addr()).await?;
let rpc_server = RpcServer::new().metrics().health_check().cors(cors).module(rpc_modules);

Ok(handle)
Ok(Node {
db,
pool,
backend,
rpc_server,
block_producer,
config: Arc::new(config),
task_manager: TaskManager::current(),
})
}

0 comments on commit d281cb4

Please sign in to comment.