From 1839c50c24b42db3deec196c4ee320edf2dd51a8 Mon Sep 17 00:00:00 2001 From: Christian Kamm Date: Thu, 23 Sep 2021 08:59:32 +0200 Subject: [PATCH 1/5] Performance: Avoid converting responses to serde_json::Value Previously, all responses were converted to serde_json::Value before being serialized to string. Since jsonrpc does not inspect the result object in any way, this step could be skipped. Now, result objects are serialized to string much earlier and the Value step is skipped. This patch has large performance benefits for huge responses: In tests with 4.5 GB responses, the jsonrpc serialization overhead after returning from an rpc function was reduced by around 35%. Which means several seconds of speed-up in response times. To accomplish this while mostly retaining API compatibility, the traits RpcMethod, RpcMethodSync, RpcMethodSimple are now generic in their return type and are wrapped when added to an io handler. There's now a distinction between the parsed representation of jsonrpc responses (Output/Response) and result of rpc functions calls (WrapOutput/WrapResponse) to allow the latter to carry the pre-serialized strings. Review notes: - I'm not happy with the WrapOutput / WrapResponse names, glad for suggestions. - Similarly, rpc_wrap must be renamed and moved. - The http handler health request is now awkward, and must extract the result/error from the already-serialized response. Probably worth it. - The largest API breakage I could see is in the middleware, where the futures now return WrapOutput/WrapResult instead of Output/Result. - One could make WrapOutput just be String, but having a separate type is likely easier to read. See #212 --- core-client/transports/src/transports/http.rs | 6 +- core/examples/middlewares.rs | 2 +- core/src/calls.rs | 56 +++++++--- core/src/delegates.rs | 31 +++--- core/src/io.rs | 96 +++++++++++------ core/src/middleware.rs | 26 ++--- core/src/types/mod.rs | 2 +- core/src/types/response.rs | 100 ++++++++++++------ derive/src/to_delegate.rs | 2 - derive/tests/client.rs | 6 +- http/src/handler.rs | 46 ++++---- http/src/tests.rs | 6 +- pubsub/src/delegates.rs | 11 +- test/src/lib.rs | 25 ++--- 14 files changed, 261 insertions(+), 154 deletions(-) diff --git a/core-client/transports/src/transports/http.rs b/core-client/transports/src/transports/http.rs index 248b41890..00c15f219 100644 --- a/core-client/transports/src/transports/http.rs +++ b/core-client/transports/src/transports/http.rs @@ -139,12 +139,16 @@ mod tests { } fn io() -> IoHandler { + use jsonrpc_core::Result; + let mut io = IoHandler::default(); io.add_sync_method("hello", |params: Params| match params.parse::<(String,)>() { Ok((msg,)) => Ok(Value::String(format!("hello {}", msg))), _ => Ok(Value::String("world".into())), }); - io.add_sync_method("fail", |_: Params| Err(Error::new(ErrorCode::ServerError(-34)))); + io.add_sync_method("fail", |_: Params| -> Result { + Err(Error::new(ErrorCode::ServerError(-34))) + }); io.add_notification("notify", |params: Params| { let (value,) = params.parse::<(u64,)>().expect("expected one u64 as param"); assert_eq!(value, 12); diff --git a/core/examples/middlewares.rs b/core/examples/middlewares.rs index 6d25352af..54b5ea7a9 100644 --- a/core/examples/middlewares.rs +++ b/core/examples/middlewares.rs @@ -17,7 +17,7 @@ impl Middleware for MyMiddleware { fn on_request(&self, request: Request, meta: Meta, next: F) -> Either where F: FnOnce(Request, Meta) -> X + Send, - X: Future> + Send + 'static, + X: Future> + Send + 'static, { let start = Instant::now(); let request_number = self.0.fetch_add(1, atomic::Ordering::SeqCst); diff --git a/core/src/calls.rs b/core/src/calls.rs index dfc8dcb16..f31a24993 100644 --- a/core/src/calls.rs +++ b/core/src/calls.rs @@ -1,5 +1,7 @@ -use crate::types::{Error, Params, Value}; +use crate::types::{Error, Id, Params, Value, Version, WrapOutput}; use crate::BoxFuture; +use futures_util::{self, future, FutureExt}; +use serde::Serialize; use std::fmt; use std::future::Future; use std::sync::Arc; @@ -30,23 +32,23 @@ impl WrapFuture for BoxFuture> { } /// A synchronous or asynchronous method. -pub trait RpcMethodSync: Send + Sync + 'static { +pub trait RpcMethodSync: Send + Sync + 'static { /// Call method - fn call(&self, params: Params) -> BoxFuture>; + fn call(&self, params: Params) -> BoxFuture>; } /// Asynchronous Method -pub trait RpcMethodSimple: Send + Sync + 'static { +pub trait RpcMethodSimple: Send + Sync + 'static { /// Output future - type Out: Future> + Send; + type Out: Future> + Send; /// Call method fn call(&self, params: Params) -> Self::Out; } /// Asynchronous Method with Metadata -pub trait RpcMethod: Send + Sync + 'static { +pub trait RpcMethod: Send + Sync + 'static { /// Call method - fn call(&self, params: Params, meta: T) -> BoxFuture>; + fn call(&self, params: Params, meta: T) -> BoxFuture>; } /// Notification @@ -61,11 +63,22 @@ pub trait RpcNotification: Send + Sync + 'static { fn execute(&self, params: Params, meta: T); } +pub trait RpcMethodWrapped: Send + Sync + 'static { + fn call(&self, params: Params, meta: T, jsonrpc: Option, id: Id) -> BoxFuture>; +} + +pub fn rpc_wrap>(f: F) -> Arc> { + Arc::new(move |params: Params, meta: T, jsonrpc: Option, id: Id| { + let result = f.call(params, meta); + result.then(move |r| future::ready(Some(WrapOutput::from(r, id, jsonrpc)))) + }) +} + /// Possible Remote Procedures with Metadata #[derive(Clone)] pub enum RemoteProcedure { /// A method call - Method(Arc>), + Method(Arc>), /// A notification Notification(Arc>), /// An alias to other method, @@ -83,10 +96,10 @@ impl fmt::Debug for RemoteProcedure { } } -impl RpcMethodSimple for F +impl RpcMethodSimple for F where F: Fn(Params) -> X, - X: Future>, + X: Future>, { type Out = X; fn call(&self, params: Params) -> Self::Out { @@ -94,12 +107,12 @@ where } } -impl RpcMethodSync for F +impl RpcMethodSync for F where F: Fn(Params) -> X, - X: WrapFuture, + X: WrapFuture, { - fn call(&self, params: Params) -> BoxFuture> { + fn call(&self, params: Params) -> BoxFuture> { self(params).into_future() } } @@ -113,13 +126,13 @@ where } } -impl RpcMethod for F +impl RpcMethod for F where T: Metadata, F: Fn(Params, T) -> X, - X: Future>, + X: Future>, { - fn call(&self, params: Params, meta: T) -> BoxFuture> { + fn call(&self, params: Params, meta: T) -> BoxFuture> { Box::pin(self(params, meta)) } } @@ -133,3 +146,14 @@ where self(params, meta) } } + +impl RpcMethodWrapped for F +where + T: Metadata, + F: Fn(Params, T, Option, Id) -> X, + X: Future>, +{ + fn call(&self, params: Params, meta: T, jsonrpc: Option, id: Id) -> BoxFuture> { + Box::pin(self(params, meta, jsonrpc, id)) + } +} diff --git a/core/src/delegates.rs b/core/src/delegates.rs index 15b60e65a..b0ed47c05 100644 --- a/core/src/delegates.rs +++ b/core/src/delegates.rs @@ -1,11 +1,12 @@ //! Delegate rpc calls +use serde::Serialize; use std::collections::HashMap; use std::future::Future; use std::sync::Arc; -use crate::calls::{Metadata, RemoteProcedure, RpcMethod, RpcNotification}; -use crate::types::{Error, Params, Value}; +use crate::calls::{rpc_wrap, Metadata, RemoteProcedure, RpcMethod, RpcNotification}; +use crate::types::{Error, Params}; use crate::BoxFuture; struct DelegateAsyncMethod { @@ -13,15 +14,15 @@ struct DelegateAsyncMethod { closure: F, } -impl RpcMethod for DelegateAsyncMethod +impl RpcMethod for DelegateAsyncMethod where M: Metadata, F: Fn(&T, Params) -> I, - I: Future> + Send + 'static, + I: Future> + Send + 'static, T: Send + Sync + 'static, F: Send + Sync + 'static, { - fn call(&self, params: Params, _meta: M) -> BoxFuture> { + fn call(&self, params: Params, _meta: M) -> BoxFuture> { let closure = &self.closure; Box::pin(closure(&self.delegate, params)) } @@ -32,15 +33,15 @@ struct DelegateMethodWithMeta { closure: F, } -impl RpcMethod for DelegateMethodWithMeta +impl RpcMethod for DelegateMethodWithMeta where M: Metadata, F: Fn(&T, Params, M) -> I, - I: Future> + Send + 'static, + I: Future> + Send + 'static, T: Send + Sync + 'static, F: Send + Sync + 'static, { - fn call(&self, params: Params, meta: M) -> BoxFuture> { + fn call(&self, params: Params, meta: M) -> BoxFuture> { let closure = &self.closure; Box::pin(closure(&self.delegate, params, meta)) } @@ -112,15 +113,16 @@ where } /// Adds async method to the delegate. - pub fn add_method(&mut self, name: &str, method: F) + pub fn add_method(&mut self, name: &str, method: F) where F: Fn(&T, Params) -> I, - I: Future> + Send + 'static, + I: Future> + Send + 'static, F: Send + Sync + 'static, + R: Serialize + Send + 'static, { self.methods.insert( name.into(), - RemoteProcedure::Method(Arc::new(DelegateAsyncMethod { + RemoteProcedure::Method(rpc_wrap(DelegateAsyncMethod { delegate: self.delegate.clone(), closure: method, })), @@ -128,15 +130,16 @@ where } /// Adds async method with metadata to the delegate. - pub fn add_method_with_meta(&mut self, name: &str, method: F) + pub fn add_method_with_meta(&mut self, name: &str, method: F) where F: Fn(&T, Params, M) -> I, - I: Future> + Send + 'static, + I: Future> + Send + 'static, F: Send + Sync + 'static, + R: Serialize + Send + 'static, { self.methods.insert( name.into(), - RemoteProcedure::Method(Arc::new(DelegateMethodWithMeta { + RemoteProcedure::Method(rpc_wrap(DelegateMethodWithMeta { delegate: self.delegate.clone(), closure: method, })), diff --git a/core/src/io.rs b/core/src/io.rs index c67ff2620..d8b234ade 100644 --- a/core/src/io.rs +++ b/core/src/io.rs @@ -1,3 +1,4 @@ +use serde::Serialize; use std::collections::{ hash_map::{IntoIter, Iter}, HashMap, @@ -10,33 +11,34 @@ use std::sync::Arc; use futures_util::{self, future, FutureExt}; use crate::calls::{ - Metadata, RemoteProcedure, RpcMethod, RpcMethodSimple, RpcMethodSync, RpcNotification, RpcNotificationSimple, + rpc_wrap, Metadata, RemoteProcedure, RpcMethod, RpcMethodSimple, RpcMethodSync, RpcMethodWrapped, RpcNotification, + RpcNotificationSimple, }; use crate::middleware::{self, Middleware}; -use crate::types::{Call, Output, Request, Response}; +use crate::types::{Call, Request, WrapOutput, WrapResponse}; use crate::types::{Error, ErrorCode, Version}; /// A type representing middleware or RPC response before serialization. -pub type FutureResponse = Pin> + Send>>; +pub type FutureResponse = Pin> + Send>>; /// A type representing middleware or RPC call output. -pub type FutureOutput = Pin> + Send>>; +pub type FutureOutput = Pin> + Send>>; /// A type representing future string response. pub type FutureResult = future::Map< - future::Either>, FutureRpcResult>, - fn(Option) -> Option, + future::Either>, FutureRpcResult>, + fn(Option) -> Option, >; /// A type representing a result of a single method call. -pub type FutureRpcOutput = future::Either>>>; +pub type FutureRpcOutput = future::Either>>>; -/// A type representing an optional `Response` for RPC `Request`. +/// A type representing an optional `WrapResponse` for RPC `Request`. pub type FutureRpcResult = future::Either< F, future::Either< - future::Map, fn(Option) -> Option>, - future::Map>, fn(Vec>) -> Option>, + future::Map, fn(Option) -> Option>, + future::Map>, fn(Vec>) -> Option>, >, >; @@ -145,17 +147,19 @@ impl> MetaIoHandler { /// Adds new supported synchronous method. /// /// A backward-compatible wrapper. - pub fn add_sync_method(&mut self, name: &str, method: F) + pub fn add_sync_method(&mut self, name: &str, method: F) where - F: RpcMethodSync, + F: RpcMethodSync, + R: Serialize + Send + 'static, { self.add_method(name, move |params| method.call(params)) } /// Adds new supported asynchronous method. - pub fn add_method(&mut self, name: &str, method: F) + pub fn add_method(&mut self, name: &str, method: F) where - F: RpcMethodSimple, + F: RpcMethodSimple, + R: Serialize + Send + 'static, { self.add_method_with_meta(name, move |params, _meta| method.call(params)) } @@ -169,12 +173,13 @@ impl> MetaIoHandler { } /// Adds new supported asynchronous method with metadata support. - pub fn add_method_with_meta(&mut self, name: &str, method: F) + pub fn add_method_with_meta(&mut self, name: &str, method: F) where - F: RpcMethod, + F: RpcMethod, + R: Serialize + Send + 'static, { self.methods - .insert(name.into(), RemoteProcedure::Method(Arc::new(method))); + .insert(name.into(), RemoteProcedure::Method(rpc_wrap(method))); } /// Adds new supported notification with metadata support. @@ -205,7 +210,7 @@ impl> MetaIoHandler { /// Handle given request asynchronously. pub fn handle_request(&self, request: &str, meta: T) -> FutureResult { use self::future::Either::{Left, Right}; - fn as_string(response: Option) -> Option { + fn as_string(response: Option) -> Option { let res = response.map(write_response); debug!(target: "rpc", "Response: {}.", res.as_ref().unwrap_or(&"None".to_string())); res @@ -214,7 +219,7 @@ impl> MetaIoHandler { trace!(target: "rpc", "Request: {}.", request); let request = read_request(request); let result = match request { - Err(error) => Left(future::ready(Some(Response::from( + Err(error) => Left(future::ready(Some(WrapResponse::from( error, self.compatibility.default_version(), )))), @@ -228,16 +233,16 @@ impl> MetaIoHandler { pub fn handle_rpc_request(&self, request: Request, meta: T) -> FutureRpcResult { use self::future::Either::{Left, Right}; - fn output_as_response(output: Option) -> Option { - output.map(Response::Single) + fn output_as_response(output: Option) -> Option { + output.map(WrapResponse::Single) } - fn outputs_as_batch(outs: Vec>) -> Option { + fn outputs_as_batch(outs: Vec>) -> Option { let outs: Vec<_> = outs.into_iter().flatten().collect(); if outs.is_empty() { None } else { - Some(Response::Batch(outs)) + Some(WrapResponse::Batch(outs)) } } @@ -245,7 +250,7 @@ impl> MetaIoHandler { .on_request(request, meta, |request, meta| match request { Request::Single(call) => Left( self.handle_call(call, meta) - .map(output_as_response as fn(Option) -> Option), + .map(output_as_response as fn(Option) -> Option), ), Request::Batch(calls) => { let futures: Vec<_> = calls @@ -253,7 +258,8 @@ impl> MetaIoHandler { .map(move |call| self.handle_call(call, meta.clone())) .collect(); Right( - future::join_all(futures).map(outputs_as_batch as fn(Vec>) -> Option), + future::join_all(futures) + .map(outputs_as_batch as fn(Vec>) -> Option), ) } }) @@ -270,7 +276,8 @@ impl> MetaIoHandler { let jsonrpc = method.jsonrpc; let valid_version = self.compatibility.is_version_valid(jsonrpc); - let call_method = |method: &Arc>| method.call(params, meta); + let method_id = id.clone(); + let call_method = |method: &Arc>| method.call(params, meta, jsonrpc, method_id); let result = match (valid_version, self.methods.get(&method.method)) { (false, _) => Err(Error::invalid_version()), @@ -283,10 +290,8 @@ impl> MetaIoHandler { }; match result { - Ok(result) => Left(Box::pin( - result.then(move |result| future::ready(Some(Output::from(result, id, jsonrpc)))), - ) as _), - Err(err) => Right(future::ready(Some(Output::from(Err(err), id, jsonrpc)))), + Ok(result) => Left(result), + Err(err) => Right(future::ready(Some(WrapOutput::from_error(err, id, jsonrpc)))), } } Call::Notification(notification) => { @@ -310,7 +315,7 @@ impl> MetaIoHandler { Right(future::ready(None)) } - Call::Invalid { id } => Right(future::ready(Some(Output::invalid_request( + Call::Invalid { id } => Right(future::ready(Some(WrapOutput::invalid_request( id, self.compatibility.default_version(), )))), @@ -475,9 +480,32 @@ fn read_request(request_str: &str) -> Result { crate::serde_from_str(request_str).map_err(|_| Error::new(ErrorCode::ParseError)) } -fn write_response(response: Response) -> String { - // this should never fail - serde_json::to_string(&response).unwrap() +fn write_response(response: WrapResponse) -> String { + match response { + WrapResponse::Single(output) => output.response, + WrapResponse::Batch(outputs) => { + if outputs.len() == 0 { + return String::from("[]"); + } + + // "[" and comma-separated outputs and "]" + let commas = outputs.len() - 1; + let brackets = 2; + let length = brackets + commas + outputs.iter().map(|output| output.response.len()).sum::(); + + // Not elegant, but guaranteed to only copy responses once + let mut response = String::with_capacity(length); + response.push('['); + for output in outputs { + response.push_str(&output.response); + response.push(','); + } + response.pop(); + response.push(']'); + + response + } + } } #[cfg(test)] diff --git a/core/src/middleware.rs b/core/src/middleware.rs index 308a787c4..2792b3768 100644 --- a/core/src/middleware.rs +++ b/core/src/middleware.rs @@ -1,7 +1,7 @@ //! `IoHandler` middlewares use crate::calls::Metadata; -use crate::types::{Call, Output, Request, Response}; +use crate::types::{Call, Request, WrapOutput, WrapResponse}; use futures_util::future::Either; use std::future::Future; use std::pin::Pin; @@ -9,10 +9,10 @@ use std::pin::Pin; /// RPC middleware pub trait Middleware: Send + Sync + 'static { /// A returned request future. - type Future: Future> + Send + 'static; + type Future: Future> + Send + 'static; /// A returned call future. - type CallFuture: Future> + Send + 'static; + type CallFuture: Future> + Send + 'static; /// Method invoked on each request. /// Allows you to either respond directly (without executing RPC call) @@ -20,7 +20,7 @@ pub trait Middleware: Send + Sync + 'static { fn on_request(&self, request: Request, meta: M, next: F) -> Either where F: Fn(Request, M) -> X + Send + Sync, - X: Future> + Send + 'static, + X: Future> + Send + 'static, { Either::Right(next(request, meta)) } @@ -31,16 +31,16 @@ pub trait Middleware: Send + Sync + 'static { fn on_call(&self, call: Call, meta: M, next: F) -> Either where F: Fn(Call, M) -> X + Send + Sync, - X: Future> + Send + 'static, + X: Future> + Send + 'static, { Either::Right(next(call, meta)) } } /// Dummy future used as a noop result of middleware. -pub type NoopFuture = Pin> + Send>>; +pub type NoopFuture = Pin> + Send>>; /// Dummy future used as a noop call result of middleware. -pub type NoopCallFuture = Pin> + Send>>; +pub type NoopCallFuture = Pin> + Send>>; /// No-op middleware implementation #[derive(Clone, Debug, Default)] @@ -57,7 +57,7 @@ impl, B: Middleware> Middleware for (A, B) { fn on_request(&self, request: Request, meta: M, process: F) -> Either where F: Fn(Request, M) -> X + Send + Sync, - X: Future> + Send + 'static, + X: Future> + Send + 'static, { repack(self.0.on_request(request, meta, |request, meta| { self.1.on_request(request, meta, &process) @@ -67,7 +67,7 @@ impl, B: Middleware> Middleware for (A, B) { fn on_call(&self, call: Call, meta: M, process: F) -> Either where F: Fn(Call, M) -> X + Send + Sync, - X: Future> + Send + 'static, + X: Future> + Send + 'static, { repack( self.0 @@ -83,7 +83,7 @@ impl, B: Middleware, C: Middleware> Middlewa fn on_request(&self, request: Request, meta: M, process: F) -> Either where F: Fn(Request, M) -> X + Send + Sync, - X: Future> + Send + 'static, + X: Future> + Send + 'static, { repack(self.0.on_request(request, meta, |request, meta| { repack(self.1.on_request(request, meta, |request, meta| { @@ -95,7 +95,7 @@ impl, B: Middleware, C: Middleware> Middlewa fn on_call(&self, call: Call, meta: M, process: F) -> Either where F: Fn(Call, M) -> X + Send + Sync, - X: Future> + Send + 'static, + X: Future> + Send + 'static, { repack(self.0.on_call(call, meta, |call, meta| { repack( @@ -115,7 +115,7 @@ impl, B: Middleware, C: Middleware, D: Middl fn on_request(&self, request: Request, meta: M, process: F) -> Either where F: Fn(Request, M) -> X + Send + Sync, - X: Future> + Send + 'static, + X: Future> + Send + 'static, { repack(self.0.on_request(request, meta, |request, meta| { repack(self.1.on_request(request, meta, |request, meta| { @@ -129,7 +129,7 @@ impl, B: Middleware, C: Middleware, D: Middl fn on_call(&self, call: Call, meta: M, process: F) -> Either where F: Fn(Call, M) -> X + Send + Sync, - X: Future> + Send + 'static, + X: Future> + Send + 'static, { repack(self.0.on_call(call, meta, |call, meta| { repack(self.1.on_call(call, meta, |call, meta| { diff --git a/core/src/types/mod.rs b/core/src/types/mod.rs index cb4a28e95..9e947f921 100644 --- a/core/src/types/mod.rs +++ b/core/src/types/mod.rs @@ -15,5 +15,5 @@ pub use self::error::{Error, ErrorCode}; pub use self::id::Id; pub use self::params::Params; pub use self::request::{Call, MethodCall, Notification, Request}; -pub use self::response::{Failure, Output, Response, Success}; +pub use self::response::{Failure, Output, Response, Success, WrapOutput, WrapResponse}; pub use self::version::Version; diff --git a/core/src/types/response.rs b/core/src/types/response.rs index 5c45e5c04..b54602967 100644 --- a/core/src/types/response.rs +++ b/core/src/types/response.rs @@ -1,16 +1,17 @@ //! jsonrpc response use super::{Error, ErrorCode, Id, Value, Version}; use crate::Result as CoreResult; +use serde::Serialize; /// Successful response #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] #[serde(deny_unknown_fields)] -pub struct Success { +pub struct Success { /// Protocol version #[serde(skip_serializing_if = "Option::is_none")] pub jsonrpc: Option, /// Result - pub result: Value, + pub result: T, /// Correlation id pub id: Id, } @@ -28,55 +29,38 @@ pub struct Failure { pub id: Id, } -/// Represents output - failure or success +/// Formatted JSON RPC response #[derive(Debug, PartialEq, Clone, Deserialize, Serialize)] #[serde(deny_unknown_fields)] #[serde(untagged)] -pub enum Output { +pub enum Output { /// Success - Success(Success), + Success(Success), /// Failure Failure(Failure), } -impl Output { - /// Creates new output given `Result`, `Id` and `Version`. - pub fn from(result: CoreResult, id: Id, jsonrpc: Option) -> Self { - match result { - Ok(result) => Output::Success(Success { jsonrpc, result, id }), - Err(error) => Output::Failure(Failure { jsonrpc, error, id }), - } - } - - /// Creates new failure output indicating malformed request. - pub fn invalid_request(id: Id, jsonrpc: Option) -> Self { - Output::Failure(Failure { - id, - jsonrpc, - error: Error::new(ErrorCode::InvalidRequest), - }) - } - +impl Output { /// Get the jsonrpc protocol version. pub fn version(&self) -> Option { match *self { - Output::Success(ref s) => s.jsonrpc, - Output::Failure(ref f) => f.jsonrpc, + Self::Success(ref s) => s.jsonrpc, + Self::Failure(ref f) => f.jsonrpc, } } /// Get the correlation id. pub fn id(&self) -> &Id { match *self { - Output::Success(ref s) => &s.id, - Output::Failure(ref f) => &f.id, + Self::Success(ref s) => &s.id, + Self::Failure(ref f) => &f.id, } } } -impl From for CoreResult { +impl From> for CoreResult { /// Convert into a result. Will be `Ok` if it is a `Success` and `Err` if `Failure`. - fn from(output: Output) -> CoreResult { + fn from(output: Output) -> CoreResult { match output { Output::Success(s) => Ok(s.result), Output::Failure(f) => Err(f.error), @@ -85,7 +69,7 @@ impl From for CoreResult { } /// Synchronous response -#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +#[derive(Debug, PartialEq, Clone, Deserialize, Serialize)] #[serde(deny_unknown_fields)] #[serde(untagged)] pub enum Response { @@ -130,6 +114,58 @@ impl From for Response { } } +/// Represents output - failure or success +#[derive(Debug, PartialEq, Clone)] +pub struct WrapOutput { + /// Response jsonrpc json + pub response: String, +} + +impl WrapOutput { + /// Creates new output given `Result`, `Id` and `Version`. + pub fn from(result: CoreResult, id: Id, jsonrpc: Option) -> Self + where + T: Serialize, + { + match result { + Ok(result) => { + let response = serde_json::to_string(&Success { jsonrpc, result, id }) + .expect("Expected always-serializable type; qed"); + WrapOutput { response } + } + Err(error) => Self::from_error(error, id, jsonrpc), + } + } + + /// Create new output from an `Error`, `Id` and `Version`. + pub fn from_error(error: Error, id: Id, jsonrpc: Option) -> Self { + let response = + serde_json::to_string(&Failure { jsonrpc, error, id }).expect("Expected always-serializable type; qed"); + WrapOutput { response } + } + + /// Creates new failure output indicating malformed request. + pub fn invalid_request(id: Id, jsonrpc: Option) -> Self { + Self::from_error(Error::new(ErrorCode::InvalidRequest), id, jsonrpc) + } +} + +/// Synchronous response +#[derive(Clone, Debug, PartialEq)] +pub enum WrapResponse { + /// Single response + Single(WrapOutput), + /// Response to batch request (batch of responses) + Batch(Vec), +} + +impl WrapResponse { + /// Creates new `Response` with given error and `Version` + pub fn from(error: Error, jsonrpc: Option) -> Self { + Self::Single(WrapOutput::from_error(error, Id::Null, jsonrpc)) + } +} + #[test] fn success_output_serialize() { use serde_json; @@ -167,7 +203,7 @@ fn success_output_deserialize() { fn failure_output_serialize() { use serde_json; - let fo = Output::Failure(Failure { + let fo: Output = Output::Failure(Failure { jsonrpc: Some(Version::V2), error: Error::parse_error(), id: Id::Num(1), @@ -184,7 +220,7 @@ fn failure_output_serialize() { fn failure_output_serialize_jsonrpc_1() { use serde_json; - let fo = Output::Failure(Failure { + let fo: Output = Output::Failure(Failure { jsonrpc: None, error: Error::parse_error(), id: Id::Num(1), diff --git a/derive/src/to_delegate.rs b/derive/src/to_delegate.rs index 7e9148a82..e0ff02767 100644 --- a/derive/src/to_delegate.rs +++ b/derive/src/to_delegate.rs @@ -281,8 +281,6 @@ impl RpcMethod { Ok((#(#tuple_fields, )*)) => { use self::_futures::{FutureExt, TryFutureExt}; let fut = self::_jsonrpc_core::WrapFuture::into_future((method)#method_call) - .map_ok(|value| _jsonrpc_core::to_value(value) - .expect("Expected always-serializable type; qed")) .map_err(Into::into as fn(_) -> _jsonrpc_core::Error); _futures::future::Either::Left(fut) }, diff --git a/derive/tests/client.rs b/derive/tests/client.rs index 8ac820ab0..986ff9a48 100644 --- a/derive/tests/client.rs +++ b/derive/tests/client.rs @@ -74,7 +74,9 @@ mod named_params { }); let mut handler = IoHandler::new(); - handler.add_sync_method("call_with_named", |params: Params| Ok(params.into())); + handler.add_sync_method("call_with_named", |params: Params| { + Ok(jsonrpc_core::Value::from(params)) + }); let (client, rpc_client) = local::connect::(handler); let fut = client @@ -112,7 +114,7 @@ mod raw_params { }); let mut handler = IoHandler::new(); - handler.add_sync_method("call_raw", |params: Params| Ok(params.into())); + handler.add_sync_method("call_raw", |params: Params| Ok(jsonrpc_core::Value::from(params))); let (client, rpc_client) = local::connect::(handler); let fut = client diff --git a/http/src/handler.rs b/http/src/handler.rs index 0466ee844..0193cf95b 100644 --- a/http/src/handler.rs +++ b/http/src/handler.rs @@ -395,33 +395,41 @@ where } fn process_health(&self, method: String, metadata: M) -> Result, hyper::Error> { - use self::core::types::{Call, Failure, Id, MethodCall, Output, Params, Request, Success, Version}; + use self::core::types::{Call, Failure, Id, MethodCall, Output, Params, Success, Version}; // Create a request - let call = Request::Single(Call::MethodCall(MethodCall { + let call = Call::MethodCall(MethodCall { jsonrpc: Some(Version::V2), method, params: Params::None, id: Id::Num(1), - })); + }); let response = match self.jsonrpc_handler.upgrade() { - Some(h) => h.handler.handle_rpc_request(call, metadata), + Some(h) => h.handler.handle_call(call, metadata), None => return Ok(RpcPollState::Ready(RpcHandlerState::Writing(Response::closing()))), }; Ok(RpcPollState::Ready(RpcHandlerState::WaitingForResponse(Box::pin( async { match response.await { - Some(core::Response::Single(Output::Success(Success { result, .. }))) => { - let result = serde_json::to_string(&result).expect("Serialization of result is infallible;qed"); - - Response::ok(result) - } - Some(core::Response::Single(Output::Failure(Failure { error, .. }))) => { - let result = serde_json::to_string(&error).expect("Serialization of error is infallible;qed"); - - Response::service_unavailable(result) + Some(wrap_output) => { + // Extract the "response" or "error" field from the response json which has already been serialized. + // Could use serde_json's RawValue to avoid parsing inside data. + let output: Output = serde_json::from_str(&wrap_output.response) + .expect("Deserialization of serialized response is infallible;qed"); + match output { + Output::Success(Success { result, .. }) => { + let out = serde_json::to_string(&result) + .expect("Serialization of deserialized result is infallible;qed"); + Response::ok(out) + } + Output::Failure(Failure { error, .. }) => { + let result = serde_json::to_string(&error) + .expect("Serialization of deserialized result is infallible;qed"); + Response::service_unavailable(result) + } + } } e => Response::internal_error(format!("Invalid response for health request: {:?}", e)), } @@ -430,7 +438,7 @@ where } fn process_rest(&self, uri: hyper::Uri, metadata: M) -> Result, hyper::Error> { - use self::core::types::{Call, Id, MethodCall, Params, Request, Value, Version}; + use self::core::types::{Call, Id, MethodCall, Params, Value, Version}; // skip the initial / let mut it = uri.path().split('/').skip(1); @@ -446,22 +454,20 @@ where } // Create a request - let call = Request::Single(Call::MethodCall(MethodCall { + let call = Call::MethodCall(MethodCall { jsonrpc: Some(Version::V2), method: method.into(), params: Params::Array(params), id: Id::Num(1), - })); + }); let response = match self.jsonrpc_handler.upgrade() { - Some(h) => h.handler.handle_rpc_request(call, metadata), + Some(h) => h.handler.handle_call(call, metadata), None => return Ok(RpcPollState::Ready(RpcHandlerState::Writing(Response::closing()))), }; Ok(RpcPollState::Ready(RpcHandlerState::Waiting(Box::pin(async { - response - .await - .map(|x| serde_json::to_string(&x).expect("Serialization of response is infallible;qed")) + response.await.map(|x| x.response) })))) } diff --git a/http/src/tests.rs b/http/src/tests.rs index 302642599..23a053af1 100644 --- a/http/src/tests.rs +++ b/http/src/tests.rs @@ -1,6 +1,6 @@ use jsonrpc_core; -use self::jsonrpc_core::{Error, ErrorCode, IoHandler, Params, Value}; +use self::jsonrpc_core::{Error, ErrorCode, IoHandler, Params, Result, Value}; use std::io::{Read, Write}; use std::net::TcpStream; use std::str::Lines; @@ -58,7 +58,9 @@ fn io() -> IoHandler { Ok((num,)) => Ok(Value::String(format!("world: {}", num))), _ => Ok(Value::String("world".into())), }); - io.add_sync_method("fail", |_: Params| Err(Error::new(ErrorCode::ServerError(-34)))); + io.add_sync_method("fail", |_: Params| -> Result { + Err(Error::new(ErrorCode::ServerError(-34))) + }); io.add_method("hello_async", |_params: Params| { futures::future::ready(Ok(Value::String("world".into()))) }); diff --git a/pubsub/src/delegates.rs b/pubsub/src/delegates.rs index 7df77e079..b26ea3849 100644 --- a/pubsub/src/delegates.rs +++ b/pubsub/src/delegates.rs @@ -1,3 +1,4 @@ +use serde::Serialize; use std::marker::PhantomData; use std::sync::Arc; @@ -98,21 +99,23 @@ where // TODO [ToDr] Consider sync? /// Adds async method to the delegate. - pub fn add_method(&mut self, name: &str, method: F) + pub fn add_method(&mut self, name: &str, method: F) where F: Fn(&T, Params) -> I, - I: Future> + Send + 'static, + I: Future> + Send + 'static, F: Send + Sync + 'static, + R: Serialize + Send + 'static, { self.inner.add_method(name, method) } /// Adds async method with metadata to the delegate. - pub fn add_method_with_meta(&mut self, name: &str, method: F) + pub fn add_method_with_meta(&mut self, name: &str, method: F) where F: Fn(&T, Params, M) -> I, - I: Future> + Send + 'static, + I: Future> + Send + 'static, F: Send + Sync + 'static, + R: Serialize + Send + 'static, { self.inner.add_method_with_meta(name, method) } diff --git a/test/src/lib.rs b/test/src/lib.rs index 3143cc2c5..e0de55668 100644 --- a/test/src/lib.rs +++ b/test/src/lib.rs @@ -30,7 +30,7 @@ //! // You can also test RPC created without macros: //! let rpc = { //! let mut io = IoHandler::new(); -//! io.add_sync_method("rpc_test_method", |_| { +//! io.add_sync_method("rpc_test_method", |_| -> Result { //! Err(Error::internal_error()) //! }); //! test::Rpc::from(io) @@ -118,17 +118,18 @@ impl Rpc { .expect("We are sending a method call not notification."); // extract interesting part from the response - let extracted = match rpc::serde_from_str(&response).expect("We will always get a single output.") { - response::Output::Success(response::Success { result, .. }) => match encoding { - Encoding::Compact => serde_json::to_string(&result), - Encoding::Pretty => serde_json::to_string_pretty(&result), - }, - response::Output::Failure(response::Failure { error, .. }) => match encoding { - Encoding::Compact => serde_json::to_string(&error), - Encoding::Pretty => serde_json::to_string_pretty(&error), - }, - } - .expect("Serialization is infallible; qed"); + let extracted = + match rpc::serde_from_str::(&response).expect("We will always get a single output.") { + response::Output::Success(response::Success { result, .. }) => match encoding { + Encoding::Compact => serde_json::to_string(&result), + Encoding::Pretty => serde_json::to_string_pretty(&result), + }, + response::Output::Failure(response::Failure { error, .. }) => match encoding { + Encoding::Compact => serde_json::to_string(&error), + Encoding::Pretty => serde_json::to_string_pretty(&error), + }, + } + .expect("Serialization is infallible; qed"); println!("\n{}\n --> {}\n", request, extracted); From e2d86df45bd757149ba1684b29b2affb4b2e5021 Mon Sep 17 00:00:00 2001 From: Christian Kamm Date: Mon, 1 Nov 2021 09:54:43 +0100 Subject: [PATCH 2/5] review: renames - WrapOutput -> SerializedOutput - WrapResponse -> SerializedResponse - RpcMethodWrapped -> RpcMethodWithSerializedOutput --- core/examples/middlewares.rs | 2 +- core/src/calls.rs | 20 +++++++------ core/src/io.rs | 54 +++++++++++++++++++----------------- core/src/middleware.rs | 26 ++++++++--------- core/src/types/mod.rs | 2 +- core/src/types/response.rs | 24 ++++++++-------- 6 files changed, 68 insertions(+), 60 deletions(-) diff --git a/core/examples/middlewares.rs b/core/examples/middlewares.rs index 54b5ea7a9..ccd1b003f 100644 --- a/core/examples/middlewares.rs +++ b/core/examples/middlewares.rs @@ -17,7 +17,7 @@ impl Middleware for MyMiddleware { fn on_request(&self, request: Request, meta: Meta, next: F) -> Either where F: FnOnce(Request, Meta) -> X + Send, - X: Future> + Send + 'static, + X: Future> + Send + 'static, { let start = Instant::now(); let request_number = self.0.fetch_add(1, atomic::Ordering::SeqCst); diff --git a/core/src/calls.rs b/core/src/calls.rs index f31a24993..0ed6f682f 100644 --- a/core/src/calls.rs +++ b/core/src/calls.rs @@ -1,4 +1,4 @@ -use crate::types::{Error, Id, Params, Value, Version, WrapOutput}; +use crate::types::{Error, Id, Params, SerializedOutput, Value, Version}; use crate::BoxFuture; use futures_util::{self, future, FutureExt}; use serde::Serialize; @@ -63,14 +63,16 @@ pub trait RpcNotification: Send + Sync + 'static { fn execute(&self, params: Params, meta: T); } -pub trait RpcMethodWrapped: Send + Sync + 'static { - fn call(&self, params: Params, meta: T, jsonrpc: Option, id: Id) -> BoxFuture>; +pub trait RpcMethodWithSerializedOutput: Send + Sync + 'static { + fn call(&self, params: Params, meta: T, jsonrpc: Option, id: Id) -> BoxFuture>; } -pub fn rpc_wrap>(f: F) -> Arc> { +pub fn rpc_wrap>( + f: F, +) -> Arc> { Arc::new(move |params: Params, meta: T, jsonrpc: Option, id: Id| { let result = f.call(params, meta); - result.then(move |r| future::ready(Some(WrapOutput::from(r, id, jsonrpc)))) + result.then(move |r| future::ready(Some(SerializedOutput::from(r, id, jsonrpc)))) }) } @@ -78,7 +80,7 @@ pub fn rpc_wrap>( #[derive(Clone)] pub enum RemoteProcedure { /// A method call - Method(Arc>), + Method(Arc>), /// A notification Notification(Arc>), /// An alias to other method, @@ -147,13 +149,13 @@ where } } -impl RpcMethodWrapped for F +impl RpcMethodWithSerializedOutput for F where T: Metadata, F: Fn(Params, T, Option, Id) -> X, - X: Future>, + X: Future>, { - fn call(&self, params: Params, meta: T, jsonrpc: Option, id: Id) -> BoxFuture> { + fn call(&self, params: Params, meta: T, jsonrpc: Option, id: Id) -> BoxFuture> { Box::pin(self(params, meta, jsonrpc, id)) } } diff --git a/core/src/io.rs b/core/src/io.rs index d8b234ade..297325e84 100644 --- a/core/src/io.rs +++ b/core/src/io.rs @@ -11,34 +11,37 @@ use std::sync::Arc; use futures_util::{self, future, FutureExt}; use crate::calls::{ - rpc_wrap, Metadata, RemoteProcedure, RpcMethod, RpcMethodSimple, RpcMethodSync, RpcMethodWrapped, RpcNotification, - RpcNotificationSimple, + rpc_wrap, Metadata, RemoteProcedure, RpcMethod, RpcMethodSimple, RpcMethodSync, RpcMethodWithSerializedOutput, + RpcNotification, RpcNotificationSimple, }; use crate::middleware::{self, Middleware}; -use crate::types::{Call, Request, WrapOutput, WrapResponse}; +use crate::types::{Call, Request, SerializedOutput, SerializedResponse}; use crate::types::{Error, ErrorCode, Version}; /// A type representing middleware or RPC response before serialization. -pub type FutureResponse = Pin> + Send>>; +pub type FutureResponse = Pin> + Send>>; /// A type representing middleware or RPC call output. -pub type FutureOutput = Pin> + Send>>; +pub type FutureOutput = Pin> + Send>>; /// A type representing future string response. pub type FutureResult = future::Map< - future::Either>, FutureRpcResult>, - fn(Option) -> Option, + future::Either>, FutureRpcResult>, + fn(Option) -> Option, >; /// A type representing a result of a single method call. -pub type FutureRpcOutput = future::Either>>>; +pub type FutureRpcOutput = future::Either>>>; -/// A type representing an optional `WrapResponse` for RPC `Request`. +/// A type representing an optional `SerializedResponse` for RPC `Request`. pub type FutureRpcResult = future::Either< F, future::Either< - future::Map, fn(Option) -> Option>, - future::Map>, fn(Vec>) -> Option>, + future::Map, fn(Option) -> Option>, + future::Map< + future::JoinAll>, + fn(Vec>) -> Option, + >, >, >; @@ -210,7 +213,7 @@ impl> MetaIoHandler { /// Handle given request asynchronously. pub fn handle_request(&self, request: &str, meta: T) -> FutureResult { use self::future::Either::{Left, Right}; - fn as_string(response: Option) -> Option { + fn as_string(response: Option) -> Option { let res = response.map(write_response); debug!(target: "rpc", "Response: {}.", res.as_ref().unwrap_or(&"None".to_string())); res @@ -219,7 +222,7 @@ impl> MetaIoHandler { trace!(target: "rpc", "Request: {}.", request); let request = read_request(request); let result = match request { - Err(error) => Left(future::ready(Some(WrapResponse::from( + Err(error) => Left(future::ready(Some(SerializedResponse::from( error, self.compatibility.default_version(), )))), @@ -233,16 +236,16 @@ impl> MetaIoHandler { pub fn handle_rpc_request(&self, request: Request, meta: T) -> FutureRpcResult { use self::future::Either::{Left, Right}; - fn output_as_response(output: Option) -> Option { - output.map(WrapResponse::Single) + fn output_as_response(output: Option) -> Option { + output.map(SerializedResponse::Single) } - fn outputs_as_batch(outs: Vec>) -> Option { + fn outputs_as_batch(outs: Vec>) -> Option { let outs: Vec<_> = outs.into_iter().flatten().collect(); if outs.is_empty() { None } else { - Some(WrapResponse::Batch(outs)) + Some(SerializedResponse::Batch(outs)) } } @@ -250,7 +253,7 @@ impl> MetaIoHandler { .on_request(request, meta, |request, meta| match request { Request::Single(call) => Left( self.handle_call(call, meta) - .map(output_as_response as fn(Option) -> Option), + .map(output_as_response as fn(Option) -> Option), ), Request::Batch(calls) => { let futures: Vec<_> = calls @@ -259,7 +262,7 @@ impl> MetaIoHandler { .collect(); Right( future::join_all(futures) - .map(outputs_as_batch as fn(Vec>) -> Option), + .map(outputs_as_batch as fn(Vec>) -> Option), ) } }) @@ -277,7 +280,8 @@ impl> MetaIoHandler { let valid_version = self.compatibility.is_version_valid(jsonrpc); let method_id = id.clone(); - let call_method = |method: &Arc>| method.call(params, meta, jsonrpc, method_id); + let call_method = + |method: &Arc>| method.call(params, meta, jsonrpc, method_id); let result = match (valid_version, self.methods.get(&method.method)) { (false, _) => Err(Error::invalid_version()), @@ -291,7 +295,7 @@ impl> MetaIoHandler { match result { Ok(result) => Left(result), - Err(err) => Right(future::ready(Some(WrapOutput::from_error(err, id, jsonrpc)))), + Err(err) => Right(future::ready(Some(SerializedOutput::from_error(err, id, jsonrpc)))), } } Call::Notification(notification) => { @@ -315,7 +319,7 @@ impl> MetaIoHandler { Right(future::ready(None)) } - Call::Invalid { id } => Right(future::ready(Some(WrapOutput::invalid_request( + Call::Invalid { id } => Right(future::ready(Some(SerializedOutput::invalid_request( id, self.compatibility.default_version(), )))), @@ -480,10 +484,10 @@ fn read_request(request_str: &str) -> Result { crate::serde_from_str(request_str).map_err(|_| Error::new(ErrorCode::ParseError)) } -fn write_response(response: WrapResponse) -> String { +fn write_response(response: SerializedResponse) -> String { match response { - WrapResponse::Single(output) => output.response, - WrapResponse::Batch(outputs) => { + SerializedResponse::Single(output) => output.response, + SerializedResponse::Batch(outputs) => { if outputs.len() == 0 { return String::from("[]"); } diff --git a/core/src/middleware.rs b/core/src/middleware.rs index 2792b3768..32b0e5574 100644 --- a/core/src/middleware.rs +++ b/core/src/middleware.rs @@ -1,7 +1,7 @@ //! `IoHandler` middlewares use crate::calls::Metadata; -use crate::types::{Call, Request, WrapOutput, WrapResponse}; +use crate::types::{Call, Request, SerializedOutput, SerializedResponse}; use futures_util::future::Either; use std::future::Future; use std::pin::Pin; @@ -9,10 +9,10 @@ use std::pin::Pin; /// RPC middleware pub trait Middleware: Send + Sync + 'static { /// A returned request future. - type Future: Future> + Send + 'static; + type Future: Future> + Send + 'static; /// A returned call future. - type CallFuture: Future> + Send + 'static; + type CallFuture: Future> + Send + 'static; /// Method invoked on each request. /// Allows you to either respond directly (without executing RPC call) @@ -20,7 +20,7 @@ pub trait Middleware: Send + Sync + 'static { fn on_request(&self, request: Request, meta: M, next: F) -> Either where F: Fn(Request, M) -> X + Send + Sync, - X: Future> + Send + 'static, + X: Future> + Send + 'static, { Either::Right(next(request, meta)) } @@ -31,16 +31,16 @@ pub trait Middleware: Send + Sync + 'static { fn on_call(&self, call: Call, meta: M, next: F) -> Either where F: Fn(Call, M) -> X + Send + Sync, - X: Future> + Send + 'static, + X: Future> + Send + 'static, { Either::Right(next(call, meta)) } } /// Dummy future used as a noop result of middleware. -pub type NoopFuture = Pin> + Send>>; +pub type NoopFuture = Pin> + Send>>; /// Dummy future used as a noop call result of middleware. -pub type NoopCallFuture = Pin> + Send>>; +pub type NoopCallFuture = Pin> + Send>>; /// No-op middleware implementation #[derive(Clone, Debug, Default)] @@ -57,7 +57,7 @@ impl, B: Middleware> Middleware for (A, B) { fn on_request(&self, request: Request, meta: M, process: F) -> Either where F: Fn(Request, M) -> X + Send + Sync, - X: Future> + Send + 'static, + X: Future> + Send + 'static, { repack(self.0.on_request(request, meta, |request, meta| { self.1.on_request(request, meta, &process) @@ -67,7 +67,7 @@ impl, B: Middleware> Middleware for (A, B) { fn on_call(&self, call: Call, meta: M, process: F) -> Either where F: Fn(Call, M) -> X + Send + Sync, - X: Future> + Send + 'static, + X: Future> + Send + 'static, { repack( self.0 @@ -83,7 +83,7 @@ impl, B: Middleware, C: Middleware> Middlewa fn on_request(&self, request: Request, meta: M, process: F) -> Either where F: Fn(Request, M) -> X + Send + Sync, - X: Future> + Send + 'static, + X: Future> + Send + 'static, { repack(self.0.on_request(request, meta, |request, meta| { repack(self.1.on_request(request, meta, |request, meta| { @@ -95,7 +95,7 @@ impl, B: Middleware, C: Middleware> Middlewa fn on_call(&self, call: Call, meta: M, process: F) -> Either where F: Fn(Call, M) -> X + Send + Sync, - X: Future> + Send + 'static, + X: Future> + Send + 'static, { repack(self.0.on_call(call, meta, |call, meta| { repack( @@ -115,7 +115,7 @@ impl, B: Middleware, C: Middleware, D: Middl fn on_request(&self, request: Request, meta: M, process: F) -> Either where F: Fn(Request, M) -> X + Send + Sync, - X: Future> + Send + 'static, + X: Future> + Send + 'static, { repack(self.0.on_request(request, meta, |request, meta| { repack(self.1.on_request(request, meta, |request, meta| { @@ -129,7 +129,7 @@ impl, B: Middleware, C: Middleware, D: Middl fn on_call(&self, call: Call, meta: M, process: F) -> Either where F: Fn(Call, M) -> X + Send + Sync, - X: Future> + Send + 'static, + X: Future> + Send + 'static, { repack(self.0.on_call(call, meta, |call, meta| { repack(self.1.on_call(call, meta, |call, meta| { diff --git a/core/src/types/mod.rs b/core/src/types/mod.rs index 9e947f921..6c4c52b3a 100644 --- a/core/src/types/mod.rs +++ b/core/src/types/mod.rs @@ -15,5 +15,5 @@ pub use self::error::{Error, ErrorCode}; pub use self::id::Id; pub use self::params::Params; pub use self::request::{Call, MethodCall, Notification, Request}; -pub use self::response::{Failure, Output, Response, Success, WrapOutput, WrapResponse}; +pub use self::response::{Failure, Output, Response, SerializedOutput, SerializedResponse, Success}; pub use self::version::Version; diff --git a/core/src/types/response.rs b/core/src/types/response.rs index b54602967..8f922cddf 100644 --- a/core/src/types/response.rs +++ b/core/src/types/response.rs @@ -114,14 +114,16 @@ impl From for Response { } } -/// Represents output - failure or success +/// Represents output, failure or success +/// +/// This contains the full response string, including the jsonrpc envelope. #[derive(Debug, PartialEq, Clone)] -pub struct WrapOutput { +pub struct SerializedOutput { /// Response jsonrpc json pub response: String, } -impl WrapOutput { +impl SerializedOutput { /// Creates new output given `Result`, `Id` and `Version`. pub fn from(result: CoreResult, id: Id, jsonrpc: Option) -> Self where @@ -131,7 +133,7 @@ impl WrapOutput { Ok(result) => { let response = serde_json::to_string(&Success { jsonrpc, result, id }) .expect("Expected always-serializable type; qed"); - WrapOutput { response } + SerializedOutput { response } } Err(error) => Self::from_error(error, id, jsonrpc), } @@ -141,7 +143,7 @@ impl WrapOutput { pub fn from_error(error: Error, id: Id, jsonrpc: Option) -> Self { let response = serde_json::to_string(&Failure { jsonrpc, error, id }).expect("Expected always-serializable type; qed"); - WrapOutput { response } + SerializedOutput { response } } /// Creates new failure output indicating malformed request. @@ -150,19 +152,19 @@ impl WrapOutput { } } -/// Synchronous response +/// Synchronous response, pre-serialized #[derive(Clone, Debug, PartialEq)] -pub enum WrapResponse { +pub enum SerializedResponse { /// Single response - Single(WrapOutput), + Single(SerializedOutput), /// Response to batch request (batch of responses) - Batch(Vec), + Batch(Vec), } -impl WrapResponse { +impl SerializedResponse { /// Creates new `Response` with given error and `Version` pub fn from(error: Error, jsonrpc: Option) -> Self { - Self::Single(WrapOutput::from_error(error, Id::Null, jsonrpc)) + Self::Single(SerializedOutput::from_error(error, Id::Null, jsonrpc)) } } From aeeac66332631b1ec491d012516e132404a8f21b Mon Sep 17 00:00:00 2001 From: Christian Kamm Date: Tue, 2 Nov 2021 12:45:09 +0100 Subject: [PATCH 3/5] review: visibility and comments --- core/src/calls.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/calls.rs b/core/src/calls.rs index 0ed6f682f..d8cb02ba2 100644 --- a/core/src/calls.rs +++ b/core/src/calls.rs @@ -63,11 +63,13 @@ pub trait RpcNotification: Send + Sync + 'static { fn execute(&self, params: Params, meta: T); } +/// Asynchronous Method with Metadata, returning a serialized JSONRPC response pub trait RpcMethodWithSerializedOutput: Send + Sync + 'static { fn call(&self, params: Params, meta: T, jsonrpc: Option, id: Id) -> BoxFuture>; } -pub fn rpc_wrap>( +/// Wraps an RpcMethod into an RpcMethodWithSerializedOutput +pub(crate) fn rpc_wrap>( f: F, ) -> Arc> { Arc::new(move |params: Params, meta: T, jsonrpc: Option, id: Id| { From 9770c0ba2618f4c7e77205324a5dfd4f80c09fb8 Mon Sep 17 00:00:00 2001 From: Christian Kamm Date: Thu, 4 Nov 2021 07:11:09 +0100 Subject: [PATCH 4/5] Update core/src/calls.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Tomasz Drwięga --- core/src/calls.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/calls.rs b/core/src/calls.rs index d8cb02ba2..928598fd9 100644 --- a/core/src/calls.rs +++ b/core/src/calls.rs @@ -63,7 +63,7 @@ pub trait RpcNotification: Send + Sync + 'static { fn execute(&self, params: Params, meta: T); } -/// Asynchronous Method with Metadata, returning a serialized JSONRPC response +/// Asynchronous Method with Metadata, returning a serialized JSON-RPC response. pub trait RpcMethodWithSerializedOutput: Send + Sync + 'static { fn call(&self, params: Params, meta: T, jsonrpc: Option, id: Id) -> BoxFuture>; } From c3f9a2beb69a72ce7ad1e7787cf27de5d952bf26 Mon Sep 17 00:00:00 2001 From: Christian Kamm Date: Thu, 4 Nov 2021 07:11:30 +0100 Subject: [PATCH 5/5] Update core/src/calls.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Tomasz Drwięga --- core/src/calls.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/calls.rs b/core/src/calls.rs index 928598fd9..d516f4d8f 100644 --- a/core/src/calls.rs +++ b/core/src/calls.rs @@ -65,6 +65,7 @@ pub trait RpcNotification: Send + Sync + 'static { /// Asynchronous Method with Metadata, returning a serialized JSON-RPC response. pub trait RpcMethodWithSerializedOutput: Send + Sync + 'static { + /// Executes the RPC method and returns [`SerializedOutput`]. fn call(&self, params: Params, meta: T, jsonrpc: Option, id: Id) -> BoxFuture>; }