Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
xlc committed Oct 3, 2023
1 parent c55ab2d commit 0ab3ce7
Show file tree
Hide file tree
Showing 31 changed files with 181 additions and 551 deletions.
2 changes: 1 addition & 1 deletion .rustfmt.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
reorder_imports = true
max_width = 100
max_width = 120

ignore = [
"vendor/*",
Expand Down
12 changes: 3 additions & 9 deletions benches/bench/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@ pub const MIB: usize = 1024 * KIB;
pub const SLOW_CALL: Duration = Duration::from_millis(1);

/// Run jsonrpsee WebSocket server for benchmarks.
pub async fn ws_server(
handle: tokio::runtime::Handle,
url: &str,
) -> (String, jsonrpsee::server::ServerHandle) {
pub async fn ws_server(handle: tokio::runtime::Handle, url: &str) -> (String, jsonrpsee::server::ServerHandle) {
use jsonrpsee::{core::server::SubscriptionMessage, server::ServerBuilder};

let server = ServerBuilder::default()
Expand Down Expand Up @@ -58,8 +55,7 @@ pub async fn ws_server(
"chain_unsubscribeNewHeads",
|_params, pending, _ctx| async move {
let sink = pending.accept().await?;
let msg =
SubscriptionMessage::from_json(&serde_json::json!({ "number": "0x4321" }))?;
let msg = SubscriptionMessage::from_json(&serde_json::json!({ "number": "0x4321" }))?;
sink.send(msg).await?;
Ok(())
},
Expand All @@ -84,9 +80,7 @@ fn gen_rpc_module() -> jsonrpsee::RpcModule<()> {
.unwrap();

module
.register_method(SYNC_MEM_CALL, |_, _| {
Ok::<_, ErrorObjectOwned>("A".repeat(MIB))
})
.register_method(SYNC_MEM_CALL, |_, _| Ok::<_, ErrorObjectOwned>("A".repeat(MIB)))
.unwrap();

module
Expand Down
112 changes: 24 additions & 88 deletions benches/bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@ use helpers::{
};

use subway::{
config::{
Config, MergeStrategy, MethodParam, MiddlewaresConfig, RpcDefinitions, RpcMethod,
RpcSubscription,
},
config::{Config, MergeStrategy, MethodParam, MiddlewaresConfig, RpcDefinitions, RpcMethod, RpcSubscription},
extensions::{client::ClientConfig, server::ServerConfig, ExtensionsConfig},
server::start_server,
};
Expand Down Expand Up @@ -119,18 +116,10 @@ trait RequestBencher {

fn websocket_benches(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let (_url1, _server1) =
rt.block_on(helpers::ws_server(rt.handle().clone(), SERVER_ONE_ENDPOINT));
let (_url2, _server2) =
rt.block_on(helpers::ws_server(rt.handle().clone(), SERVER_TWO_ENDPOINT));
let (_url1, _server1) = rt.block_on(helpers::ws_server(rt.handle().clone(), SERVER_ONE_ENDPOINT));
let (_url2, _server2) = rt.block_on(helpers::ws_server(rt.handle().clone(), SERVER_TWO_ENDPOINT));
let (url, _server) = rt.block_on(server());
ws_custom_headers_handshake(
&rt,
crit,
&url,
"ws_custom_headers_handshake",
Self::REQUEST_TYPE,
);
ws_custom_headers_handshake(&rt, crit, &url, "ws_custom_headers_handshake", Self::REQUEST_TYPE);
ws_concurrent_conn_calls(
&rt,
crit,
Expand All @@ -140,22 +129,14 @@ trait RequestBencher {
&[2, 4, 8],
);
let client = Arc::new(rt.block_on(ws_client(&url)));
round_trip(
&rt,
crit,
client.clone(),
"ws_round_trip",
Self::REQUEST_TYPE,
);
round_trip(&rt, crit, client.clone(), "ws_round_trip", Self::REQUEST_TYPE);
batch_round_trip(&rt, crit, client, "ws_batch_requests", Self::REQUEST_TYPE);
}

fn websocket_benches_mid(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let (_url1, _server1) =
rt.block_on(helpers::ws_server(rt.handle().clone(), SERVER_ONE_ENDPOINT));
let (_url2, _server2) =
rt.block_on(helpers::ws_server(rt.handle().clone(), SERVER_TWO_ENDPOINT));
let (_url1, _server1) = rt.block_on(helpers::ws_server(rt.handle().clone(), SERVER_ONE_ENDPOINT));
let (_url2, _server2) = rt.block_on(helpers::ws_server(rt.handle().clone(), SERVER_TWO_ENDPOINT));
let (url, _server) = rt.block_on(server());
ws_concurrent_conn_calls(
&rt,
Expand All @@ -177,10 +158,8 @@ trait RequestBencher {

fn websocket_benches_slow(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let (_url1, _server1) =
rt.block_on(helpers::ws_server(rt.handle().clone(), SERVER_ONE_ENDPOINT));
let (_url2, _server2) =
rt.block_on(helpers::ws_server(rt.handle().clone(), SERVER_TWO_ENDPOINT));
let (_url1, _server1) = rt.block_on(helpers::ws_server(rt.handle().clone(), SERVER_ONE_ENDPOINT));
let (_url2, _server2) = rt.block_on(helpers::ws_server(rt.handle().clone(), SERVER_TWO_ENDPOINT));
let (url, _server) = rt.block_on(server());
ws_concurrent_conn_calls(
&rt,
Expand All @@ -194,21 +173,17 @@ trait RequestBencher {

fn subscriptions(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let (_url1, _server1) =
rt.block_on(helpers::ws_server(rt.handle().clone(), SERVER_ONE_ENDPOINT));
let (_url2, _server2) =
rt.block_on(helpers::ws_server(rt.handle().clone(), SERVER_TWO_ENDPOINT));
let (_url1, _server1) = rt.block_on(helpers::ws_server(rt.handle().clone(), SERVER_ONE_ENDPOINT));
let (_url2, _server2) = rt.block_on(helpers::ws_server(rt.handle().clone(), SERVER_TWO_ENDPOINT));
let (url, _server) = rt.block_on(server());
let client = Arc::new(rt.block_on(ws_client(&url)));
sub_round_trip(&rt, crit, client, "subscriptions");
}

fn websocket_benches_inject(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let (_url1, _server1) =
rt.block_on(helpers::ws_server(rt.handle().clone(), SERVER_ONE_ENDPOINT));
let (_url2, _server2) =
rt.block_on(helpers::ws_server(rt.handle().clone(), SERVER_TWO_ENDPOINT));
let (_url1, _server1) = rt.block_on(helpers::ws_server(rt.handle().clone(), SERVER_ONE_ENDPOINT));
let (_url2, _server2) = rt.block_on(helpers::ws_server(rt.handle().clone(), SERVER_TWO_ENDPOINT));
let (url, _server) = rt.block_on(server());
let client = Arc::new(rt.block_on(ws_client(&url)));
ws_inject_calls(&rt, crit, client, "ws_inject_calls", Self::REQUEST_TYPE);
Expand Down Expand Up @@ -406,11 +381,7 @@ fn ws_concurrent_conn_subs(

for _ in 0..10 {
let fut = client
.subscribe::<String, _>(
SUB_METHOD_NAME,
rpc_params![],
UNSUB_METHOD_NAME,
)
.subscribe::<String, _>(SUB_METHOD_NAME, rpc_params![], UNSUB_METHOD_NAME)
.then(|sub| async move {
let mut s = sub.unwrap();

Expand All @@ -432,13 +403,7 @@ fn ws_concurrent_conn_subs(
}

/// Bench WS handshake with different header sizes.
fn ws_custom_headers_handshake(
rt: &TokioRuntime,
crit: &mut Criterion,
url: &str,
name: &str,
request: RequestType,
) {
fn ws_custom_headers_handshake(rt: &TokioRuntime, crit: &mut Criterion, url: &str, name: &str, request: RequestType) {
let mut group = crit.benchmark_group(request.group_name(name));
for header_size in [0, KIB, 2 * KIB, 4 * KIB] {
group.bench_function(format!("{}kb", header_size / KIB), |b| {
Expand All @@ -455,23 +420,12 @@ fn ws_custom_headers_handshake(
group.finish();
}

fn round_trip(
rt: &TokioRuntime,
crit: &mut Criterion,
client: Arc<impl ClientT>,
name: &str,
request: RequestType,
) {
fn round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl ClientT>, name: &str, request: RequestType) {
for method in request.methods() {
let bench_name = format!("{}/{}", name, method);
crit.bench_function(&request.group_name(&bench_name), |b| {
b.to_async(rt).iter(|| async {
black_box(
client
.request::<String, _>(method, rpc_params![])
.await
.unwrap(),
);
black_box(client.request::<String, _>(method, rpc_params![]).await.unwrap());
})
});
}
Expand All @@ -497,24 +451,15 @@ fn batch_round_trip(
}

group.throughput(Throughput::Elements(*batch_size as u64));
group.bench_with_input(
BenchmarkId::from_parameter(batch_size),
batch_size,
|b, _| {
b.to_async(rt)
.iter(|| async { client.batch_request::<String>(batch.clone()).await.unwrap() })
},
);
group.bench_with_input(BenchmarkId::from_parameter(batch_size), batch_size, |b, _| {
b.to_async(rt)
.iter(|| async { client.batch_request::<String>(batch.clone()).await.unwrap() })
});
}
group.finish();
}

fn sub_round_trip(
rt: &TokioRuntime,
crit: &mut Criterion,
client: Arc<impl SubscriptionClientT>,
name: &str,
) {
fn sub_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl SubscriptionClientT>, name: &str) {
let mut group = crit.benchmark_group(name);
group.bench_function("subscribe", |b| {
b.to_async(rt).iter_with_large_drop(|| async {
Expand All @@ -534,11 +479,7 @@ fn sub_round_trip(
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
client
.subscribe::<String, _>(
SUB_METHOD_NAME,
rpc_params![],
UNSUB_METHOD_NAME,
)
.subscribe::<String, _>(SUB_METHOD_NAME, rpc_params![], UNSUB_METHOD_NAME)
.await
.unwrap()
})
Expand Down Expand Up @@ -584,12 +525,7 @@ fn ws_inject_calls(
let bench_name = format!("{}/{}", name, method);
crit.bench_function(&request.group_name(&bench_name), |b| {
b.to_async(rt).iter(|| async {
black_box(
client
.request::<String, _>(method, rpc_params![0_u64])
.await
.unwrap(),
);
black_box(client.request::<String, _>(method, rpc_params![0_u64]).await.unwrap());
})
});
}
20 changes: 6 additions & 14 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ impl From<RpcOptions> for RpcDefinitions {
"eth" | "ethereum" => serde_yaml::from_str(ETHEREUM_CONFIG).unwrap(),
_ => {
let file = fs::File::open(path).expect("Invalid rpc config path");
let defs: RpcDefinitionsWithBase =
serde_yaml::from_reader(file).expect("Invalid rpc config file");
let defs: RpcDefinitionsWithBase = serde_yaml::from_reader(file).expect("Invalid rpc config file");
defs.into()
}
},
Expand Down Expand Up @@ -150,10 +149,9 @@ impl From<ParseConfig> for Config {
pub fn read_config() -> Result<Config, String> {
let cmd = Command::parse();

let config =
fs::File::open(cmd.config).map_err(|e| format!("Unable to open config file: {e}"))?;
let config: ParseConfig = serde_yaml::from_reader(&config)
.map_err(|e| format!("Unable to parse config file: {e}"))?;
let config = fs::File::open(cmd.config).map_err(|e| format!("Unable to open config file: {e}"))?;
let config: ParseConfig =
serde_yaml::from_reader(&config).map_err(|e| format!("Unable to parse config file: {e}"))?;
let mut config: Config = config.into();

if let Ok(endpoints) = std::env::var("ENDPOINTS") {
Expand Down Expand Up @@ -196,21 +194,15 @@ fn validate_config(config: &Config) -> Result<(), String> {
// TODO: validate logic should be in each individual extensions
// validate endpoints
for endpoint in &config.extensions.client.as_ref().unwrap().endpoints {
if endpoint
.parse::<jsonrpsee::client_transport::ws::Uri>()
.is_err()
{
if endpoint.parse::<jsonrpsee::client_transport::ws::Uri>().is_err() {
return Err(format!("Invalid endpoint {}", endpoint));
}
}

// ensure each method has only one param with inject=true
for method in &config.rpcs.methods {
if method.params.iter().filter(|x| x.inject).count() > 1 {
return Err(format!(
"Method {} has more than one inject param",
method.method
));
return Err(format!("Method {} has more than one inject param", method.method));
}
}

Expand Down
16 changes: 3 additions & 13 deletions src/extension/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,13 @@ use crate::utils::TypeRegistryRef;
pub trait Extension: Sized {
type Config: serde::Deserialize<'static>;

async fn from_config(
config: &Self::Config,
registry: &ExtensionRegistry,
) -> Result<Self, anyhow::Error>;
async fn from_config(config: &Self::Config, registry: &ExtensionRegistry) -> Result<Self, anyhow::Error>;
}

#[async_trait]
pub trait ExtensionBuilder {
fn has(&self, type_id: TypeId) -> bool;
async fn build(
&self,
type_id: TypeId,
registry: &ExtensionRegistry,
) -> anyhow::Result<Arc<dyn Any + Send + Sync>>;
async fn build(&self, type_id: TypeId, registry: &ExtensionRegistry) -> anyhow::Result<Arc<dyn Any + Send + Sync>>;
}

pub struct ExtensionRegistry {
Expand All @@ -33,10 +26,7 @@ pub struct ExtensionRegistry {
}

impl ExtensionRegistry {
pub fn new(
registry: TypeRegistryRef,
builder: Arc<dyn ExtensionBuilder + Send + Sync>,
) -> Self {
pub fn new(registry: TypeRegistryRef, builder: Arc<dyn ExtensionBuilder + Send + Sync>) -> Self {
Self { registry, builder }
}

Expand Down
Loading

0 comments on commit 0ab3ce7

Please sign in to comment.