Skip to content

Commit

Permalink
feat: adding main module
Browse files Browse the repository at this point in the history
commit-id:8df0f3c6
  • Loading branch information
lev-starkware committed Jul 1, 2024
1 parent ba88c86 commit 72c8507
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 13 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 7 additions & 5 deletions crates/gateway/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl Gateway {
Gateway { config, app_state }
}

pub async fn run(self) -> Result<(), GatewayRunError> {
pub async fn run(&mut self) -> Result<(), GatewayRunError> {
// Parses the bind address from GatewayConfig, returning an error for invalid addresses.
let GatewayNetworkConfig { ip, port } = self.config.network_config;
let addr = SocketAddr::new(ip, port);
Expand All @@ -76,11 +76,11 @@ impl Gateway {
Ok(axum::Server::bind(&addr).serve(app.into_make_service()).await?)
}

pub fn app(self) -> Router {
pub fn app(&self) -> Router {
Router::new()
.route("/is_alive", get(is_alive))
.route("/add_tx", post(add_tx))
.with_state(self.app_state)
.with_state(self.app_state.clone())
}
}

Expand Down Expand Up @@ -201,8 +201,10 @@ pub fn create_gateway_server(gateway: Gateway) -> GatewayCommunicationServer {
#[async_trait]
impl ComponentRunner for Gateway {
async fn start(&mut self) -> Result<(), ComponentStartError> {
// TODO(Lev, 23/07/2024): Implement the real logic.
println!("Gateway::start()");
Ok(())
match self.run().await {
Ok(_) => Ok(()),
Err(_) => Err(ComponentStartError::InternalComponentError),
}
}
}
2 changes: 2 additions & 0 deletions crates/mempool_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ license.workspace = true
workspace = true

[dependencies]
anyhow.workspace = true
clap.workspace = true
const_format.workspace = true
futures.workspace = true
starknet_gateway = { path = "../gateway", version = "0.0" }
starknet_mempool_infra = { path = "../mempool_infra", version = "0.0" }
starknet_mempool = { path = "../mempool", version = "0.0" }
Expand Down
34 changes: 31 additions & 3 deletions crates/mempool_node/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,33 @@
use std::env::args;
use std::process::exit;

use papyrus_config::validators::config_validate;
use papyrus_config::ConfigError;
use starknet_mempool_node::communication::{create_node_channels, create_node_clients};
use starknet_mempool_node::components::create_components;
use starknet_mempool_node::config::MempoolNodeConfig;
use starknet_mempool_node::servers::{create_servers, run_server_components};

#[tokio::main]
async fn main() {
let my_string = "Main function placeholder";
println!("{}", my_string);
async fn main() -> anyhow::Result<()> {
let config = MempoolNodeConfig::load_and_process(args().collect());
if let Err(ConfigError::CommandInput(clap_err)) = config {
clap_err.exit();
}

let config = config?;
if let Err(error) = config_validate(&config) {
println!("Error: {}", error);
exit(1);
}

let channels = create_node_channels();
let clients = create_node_clients(&config, &channels);
let components = create_components(&config, &clients);
let servers = create_servers(&config, channels, components);

println!("Info: Starting components!");
run_server_components(&config, servers).await?;

Ok(())
}
55 changes: 51 additions & 4 deletions crates/mempool_node/src/servers.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// use std::future::pending;
// use std::pin::Pin;
use std::future::pending;
use std::pin::Pin;

// use futures::{Future, FutureExt};
use futures::{Future, FutureExt};
use starknet_gateway::gateway::create_gateway_server;
use starknet_mempool::communication::create_mempool_server;
use starknet_mempool_infra::component_server::ComponentServerStarter;
Expand Down Expand Up @@ -38,4 +38,51 @@ pub fn create_servers(
servers
}

// TODO (Lev): Implement the run server components function.
pub async fn run_server_components(
config: &MempoolNodeConfig,
servers: Servers,
) -> anyhow::Result<()> {
// Gateway component.
// let gateway: Gateway;
let gateway_future =
get_server_future("Gateway", config.components.gateway_component.execute, servers.gateway);

// Mempool component.
let mempool_future =
get_server_future("Mempool", config.components.mempool_component.execute, servers.mempool);

let gateway_handle = tokio::spawn(gateway_future);
let mempool_handle = tokio::spawn(mempool_future);

tokio::select! {
res = gateway_handle => {
println!("Error: Gateway Server stopped.");
res?
}
res = mempool_handle => {
println!("Error: Mempool Server stopped.");
res?
}
};
println!("Error: Servers ended with unexpected Ok.");

Ok(())
}

pub fn get_server_future(
name: &str,
execute_flag: bool,
server: Option<Box<dyn ComponentServerStarter>>,
) -> Pin<Box<dyn Future<Output = ()> + Send>> {
let server_future = match execute_flag {
true => {
let mut server = match server {
Some(server) => server,
_ => panic!("{} component is not initialized.", name),
};
async move { server.start().await }.boxed()
}
false => pending().boxed(),
};
server_future
}
2 changes: 1 addition & 1 deletion crates/tests-integration/src/integration_test_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl IntegrationTestSetup {
channel::<MempoolRequestAndResponseSender>(MEMPOOL_INVOCATIONS_QUEUE_SIZE);
// Build and run gateway; initialize a gateway client.
let gateway_mempool_client = MempoolClientImpl::new(tx_mempool.clone());
let gateway =
let mut gateway =
create_gateway(Arc::new(gateway_mempool_client), n_initialized_account_contracts).await;
let GatewayNetworkConfig { ip, port } = gateway.config.network_config;
let gateway_client = GatewayClient::new(SocketAddr::from((ip, port)));
Expand Down

0 comments on commit 72c8507

Please sign in to comment.