diff --git a/core-client/transports/src/transports/http.rs b/core-client/transports/src/transports/http.rs index 248b41890..00c15f219 100644 --- a/core-client/transports/src/transports/http.rs +++ b/core-client/transports/src/transports/http.rs @@ -139,12 +139,16 @@ mod tests { } fn io() -> IoHandler { + use jsonrpc_core::Result; + let mut io = IoHandler::default(); io.add_sync_method("hello", |params: Params| match params.parse::<(String,)>() { Ok((msg,)) => Ok(Value::String(format!("hello {}", msg))), _ => Ok(Value::String("world".into())), }); - io.add_sync_method("fail", |_: Params| Err(Error::new(ErrorCode::ServerError(-34)))); + io.add_sync_method("fail", |_: Params| -> Result { + Err(Error::new(ErrorCode::ServerError(-34))) + }); io.add_notification("notify", |params: Params| { let (value,) = params.parse::<(u64,)>().expect("expected one u64 as param"); assert_eq!(value, 12); diff --git a/core/examples/middlewares.rs b/core/examples/middlewares.rs index 6d25352af..ccd1b003f 100644 --- a/core/examples/middlewares.rs +++ b/core/examples/middlewares.rs @@ -17,7 +17,7 @@ impl Middleware for MyMiddleware { fn on_request(&self, request: Request, meta: Meta, next: F) -> Either where F: FnOnce(Request, Meta) -> X + Send, - X: Future> + Send + 'static, + X: Future> + Send + 'static, { let start = Instant::now(); let request_number = self.0.fetch_add(1, atomic::Ordering::SeqCst); diff --git a/core/src/calls.rs b/core/src/calls.rs index dfc8dcb16..d516f4d8f 100644 --- a/core/src/calls.rs +++ b/core/src/calls.rs @@ -1,5 +1,7 @@ -use crate::types::{Error, Params, Value}; +use crate::types::{Error, Id, Params, SerializedOutput, Value, Version}; use crate::BoxFuture; +use futures_util::{self, future, FutureExt}; +use serde::Serialize; use std::fmt; use std::future::Future; use std::sync::Arc; @@ -30,23 +32,23 @@ impl WrapFuture for BoxFuture> { } /// A synchronous or asynchronous method. -pub trait RpcMethodSync: Send + Sync + 'static { +pub trait RpcMethodSync: Send + Sync + 'static { /// Call method - fn call(&self, params: Params) -> BoxFuture>; + fn call(&self, params: Params) -> BoxFuture>; } /// Asynchronous Method -pub trait RpcMethodSimple: Send + Sync + 'static { +pub trait RpcMethodSimple: Send + Sync + 'static { /// Output future - type Out: Future> + Send; + type Out: Future> + Send; /// Call method fn call(&self, params: Params) -> Self::Out; } /// Asynchronous Method with Metadata -pub trait RpcMethod: Send + Sync + 'static { +pub trait RpcMethod: Send + Sync + 'static { /// Call method - fn call(&self, params: Params, meta: T) -> BoxFuture>; + fn call(&self, params: Params, meta: T) -> BoxFuture>; } /// Notification @@ -61,11 +63,27 @@ pub trait RpcNotification: Send + Sync + 'static { fn execute(&self, params: Params, meta: T); } +/// Asynchronous Method with Metadata, returning a serialized JSON-RPC response. +pub trait RpcMethodWithSerializedOutput: Send + Sync + 'static { + /// Executes the RPC method and returns [`SerializedOutput`]. + fn call(&self, params: Params, meta: T, jsonrpc: Option, id: Id) -> BoxFuture>; +} + +/// Wraps an RpcMethod into an RpcMethodWithSerializedOutput +pub(crate) fn rpc_wrap>( + f: F, +) -> Arc> { + Arc::new(move |params: Params, meta: T, jsonrpc: Option, id: Id| { + let result = f.call(params, meta); + result.then(move |r| future::ready(Some(SerializedOutput::from(r, id, jsonrpc)))) + }) +} + /// Possible Remote Procedures with Metadata #[derive(Clone)] pub enum RemoteProcedure { /// A method call - Method(Arc>), + Method(Arc>), /// A notification Notification(Arc>), /// An alias to other method, @@ -83,10 +101,10 @@ impl fmt::Debug for RemoteProcedure { } } -impl RpcMethodSimple for F +impl RpcMethodSimple for F where F: Fn(Params) -> X, - X: Future>, + X: Future>, { type Out = X; fn call(&self, params: Params) -> Self::Out { @@ -94,12 +112,12 @@ where } } -impl RpcMethodSync for F +impl RpcMethodSync for F where F: Fn(Params) -> X, - X: WrapFuture, + X: WrapFuture, { - fn call(&self, params: Params) -> BoxFuture> { + fn call(&self, params: Params) -> BoxFuture> { self(params).into_future() } } @@ -113,13 +131,13 @@ where } } -impl RpcMethod for F +impl RpcMethod for F where T: Metadata, F: Fn(Params, T) -> X, - X: Future>, + X: Future>, { - fn call(&self, params: Params, meta: T) -> BoxFuture> { + fn call(&self, params: Params, meta: T) -> BoxFuture> { Box::pin(self(params, meta)) } } @@ -133,3 +151,14 @@ where self(params, meta) } } + +impl RpcMethodWithSerializedOutput for F +where + T: Metadata, + F: Fn(Params, T, Option, Id) -> X, + X: Future>, +{ + fn call(&self, params: Params, meta: T, jsonrpc: Option, id: Id) -> BoxFuture> { + Box::pin(self(params, meta, jsonrpc, id)) + } +} diff --git a/core/src/delegates.rs b/core/src/delegates.rs index 15b60e65a..b0ed47c05 100644 --- a/core/src/delegates.rs +++ b/core/src/delegates.rs @@ -1,11 +1,12 @@ //! Delegate rpc calls +use serde::Serialize; use std::collections::HashMap; use std::future::Future; use std::sync::Arc; -use crate::calls::{Metadata, RemoteProcedure, RpcMethod, RpcNotification}; -use crate::types::{Error, Params, Value}; +use crate::calls::{rpc_wrap, Metadata, RemoteProcedure, RpcMethod, RpcNotification}; +use crate::types::{Error, Params}; use crate::BoxFuture; struct DelegateAsyncMethod { @@ -13,15 +14,15 @@ struct DelegateAsyncMethod { closure: F, } -impl RpcMethod for DelegateAsyncMethod +impl RpcMethod for DelegateAsyncMethod where M: Metadata, F: Fn(&T, Params) -> I, - I: Future> + Send + 'static, + I: Future> + Send + 'static, T: Send + Sync + 'static, F: Send + Sync + 'static, { - fn call(&self, params: Params, _meta: M) -> BoxFuture> { + fn call(&self, params: Params, _meta: M) -> BoxFuture> { let closure = &self.closure; Box::pin(closure(&self.delegate, params)) } @@ -32,15 +33,15 @@ struct DelegateMethodWithMeta { closure: F, } -impl RpcMethod for DelegateMethodWithMeta +impl RpcMethod for DelegateMethodWithMeta where M: Metadata, F: Fn(&T, Params, M) -> I, - I: Future> + Send + 'static, + I: Future> + Send + 'static, T: Send + Sync + 'static, F: Send + Sync + 'static, { - fn call(&self, params: Params, meta: M) -> BoxFuture> { + fn call(&self, params: Params, meta: M) -> BoxFuture> { let closure = &self.closure; Box::pin(closure(&self.delegate, params, meta)) } @@ -112,15 +113,16 @@ where } /// Adds async method to the delegate. - pub fn add_method(&mut self, name: &str, method: F) + pub fn add_method(&mut self, name: &str, method: F) where F: Fn(&T, Params) -> I, - I: Future> + Send + 'static, + I: Future> + Send + 'static, F: Send + Sync + 'static, + R: Serialize + Send + 'static, { self.methods.insert( name.into(), - RemoteProcedure::Method(Arc::new(DelegateAsyncMethod { + RemoteProcedure::Method(rpc_wrap(DelegateAsyncMethod { delegate: self.delegate.clone(), closure: method, })), @@ -128,15 +130,16 @@ where } /// Adds async method with metadata to the delegate. - pub fn add_method_with_meta(&mut self, name: &str, method: F) + pub fn add_method_with_meta(&mut self, name: &str, method: F) where F: Fn(&T, Params, M) -> I, - I: Future> + Send + 'static, + I: Future> + Send + 'static, F: Send + Sync + 'static, + R: Serialize + Send + 'static, { self.methods.insert( name.into(), - RemoteProcedure::Method(Arc::new(DelegateMethodWithMeta { + RemoteProcedure::Method(rpc_wrap(DelegateMethodWithMeta { delegate: self.delegate.clone(), closure: method, })), diff --git a/core/src/io.rs b/core/src/io.rs index c67ff2620..297325e84 100644 --- a/core/src/io.rs +++ b/core/src/io.rs @@ -1,3 +1,4 @@ +use serde::Serialize; use std::collections::{ hash_map::{IntoIter, Iter}, HashMap, @@ -10,33 +11,37 @@ use std::sync::Arc; use futures_util::{self, future, FutureExt}; use crate::calls::{ - Metadata, RemoteProcedure, RpcMethod, RpcMethodSimple, RpcMethodSync, RpcNotification, RpcNotificationSimple, + rpc_wrap, Metadata, RemoteProcedure, RpcMethod, RpcMethodSimple, RpcMethodSync, RpcMethodWithSerializedOutput, + RpcNotification, RpcNotificationSimple, }; use crate::middleware::{self, Middleware}; -use crate::types::{Call, Output, Request, Response}; +use crate::types::{Call, Request, SerializedOutput, SerializedResponse}; use crate::types::{Error, ErrorCode, Version}; /// A type representing middleware or RPC response before serialization. -pub type FutureResponse = Pin> + Send>>; +pub type FutureResponse = Pin> + Send>>; /// A type representing middleware or RPC call output. -pub type FutureOutput = Pin> + Send>>; +pub type FutureOutput = Pin> + Send>>; /// A type representing future string response. pub type FutureResult = future::Map< - future::Either>, FutureRpcResult>, - fn(Option) -> Option, + future::Either>, FutureRpcResult>, + fn(Option) -> Option, >; /// A type representing a result of a single method call. -pub type FutureRpcOutput = future::Either>>>; +pub type FutureRpcOutput = future::Either>>>; -/// A type representing an optional `Response` for RPC `Request`. +/// A type representing an optional `SerializedResponse` for RPC `Request`. pub type FutureRpcResult = future::Either< F, future::Either< - future::Map, fn(Option) -> Option>, - future::Map>, fn(Vec>) -> Option>, + future::Map, fn(Option) -> Option>, + future::Map< + future::JoinAll>, + fn(Vec>) -> Option, + >, >, >; @@ -145,17 +150,19 @@ impl> MetaIoHandler { /// Adds new supported synchronous method. /// /// A backward-compatible wrapper. - pub fn add_sync_method(&mut self, name: &str, method: F) + pub fn add_sync_method(&mut self, name: &str, method: F) where - F: RpcMethodSync, + F: RpcMethodSync, + R: Serialize + Send + 'static, { self.add_method(name, move |params| method.call(params)) } /// Adds new supported asynchronous method. - pub fn add_method(&mut self, name: &str, method: F) + pub fn add_method(&mut self, name: &str, method: F) where - F: RpcMethodSimple, + F: RpcMethodSimple, + R: Serialize + Send + 'static, { self.add_method_with_meta(name, move |params, _meta| method.call(params)) } @@ -169,12 +176,13 @@ impl> MetaIoHandler { } /// Adds new supported asynchronous method with metadata support. - pub fn add_method_with_meta(&mut self, name: &str, method: F) + pub fn add_method_with_meta(&mut self, name: &str, method: F) where - F: RpcMethod, + F: RpcMethod, + R: Serialize + Send + 'static, { self.methods - .insert(name.into(), RemoteProcedure::Method(Arc::new(method))); + .insert(name.into(), RemoteProcedure::Method(rpc_wrap(method))); } /// Adds new supported notification with metadata support. @@ -205,7 +213,7 @@ impl> MetaIoHandler { /// Handle given request asynchronously. pub fn handle_request(&self, request: &str, meta: T) -> FutureResult { use self::future::Either::{Left, Right}; - fn as_string(response: Option) -> Option { + fn as_string(response: Option) -> Option { let res = response.map(write_response); debug!(target: "rpc", "Response: {}.", res.as_ref().unwrap_or(&"None".to_string())); res @@ -214,7 +222,7 @@ impl> MetaIoHandler { trace!(target: "rpc", "Request: {}.", request); let request = read_request(request); let result = match request { - Err(error) => Left(future::ready(Some(Response::from( + Err(error) => Left(future::ready(Some(SerializedResponse::from( error, self.compatibility.default_version(), )))), @@ -228,16 +236,16 @@ impl> MetaIoHandler { pub fn handle_rpc_request(&self, request: Request, meta: T) -> FutureRpcResult { use self::future::Either::{Left, Right}; - fn output_as_response(output: Option) -> Option { - output.map(Response::Single) + fn output_as_response(output: Option) -> Option { + output.map(SerializedResponse::Single) } - fn outputs_as_batch(outs: Vec>) -> Option { + fn outputs_as_batch(outs: Vec>) -> Option { let outs: Vec<_> = outs.into_iter().flatten().collect(); if outs.is_empty() { None } else { - Some(Response::Batch(outs)) + Some(SerializedResponse::Batch(outs)) } } @@ -245,7 +253,7 @@ impl> MetaIoHandler { .on_request(request, meta, |request, meta| match request { Request::Single(call) => Left( self.handle_call(call, meta) - .map(output_as_response as fn(Option) -> Option), + .map(output_as_response as fn(Option) -> Option), ), Request::Batch(calls) => { let futures: Vec<_> = calls @@ -253,7 +261,8 @@ impl> MetaIoHandler { .map(move |call| self.handle_call(call, meta.clone())) .collect(); Right( - future::join_all(futures).map(outputs_as_batch as fn(Vec>) -> Option), + future::join_all(futures) + .map(outputs_as_batch as fn(Vec>) -> Option), ) } }) @@ -270,7 +279,9 @@ impl> MetaIoHandler { let jsonrpc = method.jsonrpc; let valid_version = self.compatibility.is_version_valid(jsonrpc); - let call_method = |method: &Arc>| method.call(params, meta); + let method_id = id.clone(); + let call_method = + |method: &Arc>| method.call(params, meta, jsonrpc, method_id); let result = match (valid_version, self.methods.get(&method.method)) { (false, _) => Err(Error::invalid_version()), @@ -283,10 +294,8 @@ impl> MetaIoHandler { }; match result { - Ok(result) => Left(Box::pin( - result.then(move |result| future::ready(Some(Output::from(result, id, jsonrpc)))), - ) as _), - Err(err) => Right(future::ready(Some(Output::from(Err(err), id, jsonrpc)))), + Ok(result) => Left(result), + Err(err) => Right(future::ready(Some(SerializedOutput::from_error(err, id, jsonrpc)))), } } Call::Notification(notification) => { @@ -310,7 +319,7 @@ impl> MetaIoHandler { Right(future::ready(None)) } - Call::Invalid { id } => Right(future::ready(Some(Output::invalid_request( + Call::Invalid { id } => Right(future::ready(Some(SerializedOutput::invalid_request( id, self.compatibility.default_version(), )))), @@ -475,9 +484,32 @@ fn read_request(request_str: &str) -> Result { crate::serde_from_str(request_str).map_err(|_| Error::new(ErrorCode::ParseError)) } -fn write_response(response: Response) -> String { - // this should never fail - serde_json::to_string(&response).unwrap() +fn write_response(response: SerializedResponse) -> String { + match response { + SerializedResponse::Single(output) => output.response, + SerializedResponse::Batch(outputs) => { + if outputs.len() == 0 { + return String::from("[]"); + } + + // "[" and comma-separated outputs and "]" + let commas = outputs.len() - 1; + let brackets = 2; + let length = brackets + commas + outputs.iter().map(|output| output.response.len()).sum::(); + + // Not elegant, but guaranteed to only copy responses once + let mut response = String::with_capacity(length); + response.push('['); + for output in outputs { + response.push_str(&output.response); + response.push(','); + } + response.pop(); + response.push(']'); + + response + } + } } #[cfg(test)] diff --git a/core/src/middleware.rs b/core/src/middleware.rs index 308a787c4..32b0e5574 100644 --- a/core/src/middleware.rs +++ b/core/src/middleware.rs @@ -1,7 +1,7 @@ //! `IoHandler` middlewares use crate::calls::Metadata; -use crate::types::{Call, Output, Request, Response}; +use crate::types::{Call, Request, SerializedOutput, SerializedResponse}; use futures_util::future::Either; use std::future::Future; use std::pin::Pin; @@ -9,10 +9,10 @@ use std::pin::Pin; /// RPC middleware pub trait Middleware: Send + Sync + 'static { /// A returned request future. - type Future: Future> + Send + 'static; + type Future: Future> + Send + 'static; /// A returned call future. - type CallFuture: Future> + Send + 'static; + type CallFuture: Future> + Send + 'static; /// Method invoked on each request. /// Allows you to either respond directly (without executing RPC call) @@ -20,7 +20,7 @@ pub trait Middleware: Send + Sync + 'static { fn on_request(&self, request: Request, meta: M, next: F) -> Either where F: Fn(Request, M) -> X + Send + Sync, - X: Future> + Send + 'static, + X: Future> + Send + 'static, { Either::Right(next(request, meta)) } @@ -31,16 +31,16 @@ pub trait Middleware: Send + Sync + 'static { fn on_call(&self, call: Call, meta: M, next: F) -> Either where F: Fn(Call, M) -> X + Send + Sync, - X: Future> + Send + 'static, + X: Future> + Send + 'static, { Either::Right(next(call, meta)) } } /// Dummy future used as a noop result of middleware. -pub type NoopFuture = Pin> + Send>>; +pub type NoopFuture = Pin> + Send>>; /// Dummy future used as a noop call result of middleware. -pub type NoopCallFuture = Pin> + Send>>; +pub type NoopCallFuture = Pin> + Send>>; /// No-op middleware implementation #[derive(Clone, Debug, Default)] @@ -57,7 +57,7 @@ impl, B: Middleware> Middleware for (A, B) { fn on_request(&self, request: Request, meta: M, process: F) -> Either where F: Fn(Request, M) -> X + Send + Sync, - X: Future> + Send + 'static, + X: Future> + Send + 'static, { repack(self.0.on_request(request, meta, |request, meta| { self.1.on_request(request, meta, &process) @@ -67,7 +67,7 @@ impl, B: Middleware> Middleware for (A, B) { fn on_call(&self, call: Call, meta: M, process: F) -> Either where F: Fn(Call, M) -> X + Send + Sync, - X: Future> + Send + 'static, + X: Future> + Send + 'static, { repack( self.0 @@ -83,7 +83,7 @@ impl, B: Middleware, C: Middleware> Middlewa fn on_request(&self, request: Request, meta: M, process: F) -> Either where F: Fn(Request, M) -> X + Send + Sync, - X: Future> + Send + 'static, + X: Future> + Send + 'static, { repack(self.0.on_request(request, meta, |request, meta| { repack(self.1.on_request(request, meta, |request, meta| { @@ -95,7 +95,7 @@ impl, B: Middleware, C: Middleware> Middlewa fn on_call(&self, call: Call, meta: M, process: F) -> Either where F: Fn(Call, M) -> X + Send + Sync, - X: Future> + Send + 'static, + X: Future> + Send + 'static, { repack(self.0.on_call(call, meta, |call, meta| { repack( @@ -115,7 +115,7 @@ impl, B: Middleware, C: Middleware, D: Middl fn on_request(&self, request: Request, meta: M, process: F) -> Either where F: Fn(Request, M) -> X + Send + Sync, - X: Future> + Send + 'static, + X: Future> + Send + 'static, { repack(self.0.on_request(request, meta, |request, meta| { repack(self.1.on_request(request, meta, |request, meta| { @@ -129,7 +129,7 @@ impl, B: Middleware, C: Middleware, D: Middl fn on_call(&self, call: Call, meta: M, process: F) -> Either where F: Fn(Call, M) -> X + Send + Sync, - X: Future> + Send + 'static, + X: Future> + Send + 'static, { repack(self.0.on_call(call, meta, |call, meta| { repack(self.1.on_call(call, meta, |call, meta| { diff --git a/core/src/types/mod.rs b/core/src/types/mod.rs index cb4a28e95..6c4c52b3a 100644 --- a/core/src/types/mod.rs +++ b/core/src/types/mod.rs @@ -15,5 +15,5 @@ pub use self::error::{Error, ErrorCode}; pub use self::id::Id; pub use self::params::Params; pub use self::request::{Call, MethodCall, Notification, Request}; -pub use self::response::{Failure, Output, Response, Success}; +pub use self::response::{Failure, Output, Response, SerializedOutput, SerializedResponse, Success}; pub use self::version::Version; diff --git a/core/src/types/response.rs b/core/src/types/response.rs index 5c45e5c04..8f922cddf 100644 --- a/core/src/types/response.rs +++ b/core/src/types/response.rs @@ -1,16 +1,17 @@ //! jsonrpc response use super::{Error, ErrorCode, Id, Value, Version}; use crate::Result as CoreResult; +use serde::Serialize; /// Successful response #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] #[serde(deny_unknown_fields)] -pub struct Success { +pub struct Success { /// Protocol version #[serde(skip_serializing_if = "Option::is_none")] pub jsonrpc: Option, /// Result - pub result: Value, + pub result: T, /// Correlation id pub id: Id, } @@ -28,55 +29,38 @@ pub struct Failure { pub id: Id, } -/// Represents output - failure or success +/// Formatted JSON RPC response #[derive(Debug, PartialEq, Clone, Deserialize, Serialize)] #[serde(deny_unknown_fields)] #[serde(untagged)] -pub enum Output { +pub enum Output { /// Success - Success(Success), + Success(Success), /// Failure Failure(Failure), } -impl Output { - /// Creates new output given `Result`, `Id` and `Version`. - pub fn from(result: CoreResult, id: Id, jsonrpc: Option) -> Self { - match result { - Ok(result) => Output::Success(Success { jsonrpc, result, id }), - Err(error) => Output::Failure(Failure { jsonrpc, error, id }), - } - } - - /// Creates new failure output indicating malformed request. - pub fn invalid_request(id: Id, jsonrpc: Option) -> Self { - Output::Failure(Failure { - id, - jsonrpc, - error: Error::new(ErrorCode::InvalidRequest), - }) - } - +impl Output { /// Get the jsonrpc protocol version. pub fn version(&self) -> Option { match *self { - Output::Success(ref s) => s.jsonrpc, - Output::Failure(ref f) => f.jsonrpc, + Self::Success(ref s) => s.jsonrpc, + Self::Failure(ref f) => f.jsonrpc, } } /// Get the correlation id. pub fn id(&self) -> &Id { match *self { - Output::Success(ref s) => &s.id, - Output::Failure(ref f) => &f.id, + Self::Success(ref s) => &s.id, + Self::Failure(ref f) => &f.id, } } } -impl From for CoreResult { +impl From> for CoreResult { /// Convert into a result. Will be `Ok` if it is a `Success` and `Err` if `Failure`. - fn from(output: Output) -> CoreResult { + fn from(output: Output) -> CoreResult { match output { Output::Success(s) => Ok(s.result), Output::Failure(f) => Err(f.error), @@ -85,7 +69,7 @@ impl From for CoreResult { } /// Synchronous response -#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +#[derive(Debug, PartialEq, Clone, Deserialize, Serialize)] #[serde(deny_unknown_fields)] #[serde(untagged)] pub enum Response { @@ -130,6 +114,60 @@ impl From for Response { } } +/// Represents output, failure or success +/// +/// This contains the full response string, including the jsonrpc envelope. +#[derive(Debug, PartialEq, Clone)] +pub struct SerializedOutput { + /// Response jsonrpc json + pub response: String, +} + +impl SerializedOutput { + /// Creates new output given `Result`, `Id` and `Version`. + pub fn from(result: CoreResult, id: Id, jsonrpc: Option) -> Self + where + T: Serialize, + { + match result { + Ok(result) => { + let response = serde_json::to_string(&Success { jsonrpc, result, id }) + .expect("Expected always-serializable type; qed"); + SerializedOutput { response } + } + Err(error) => Self::from_error(error, id, jsonrpc), + } + } + + /// Create new output from an `Error`, `Id` and `Version`. + pub fn from_error(error: Error, id: Id, jsonrpc: Option) -> Self { + let response = + serde_json::to_string(&Failure { jsonrpc, error, id }).expect("Expected always-serializable type; qed"); + SerializedOutput { response } + } + + /// Creates new failure output indicating malformed request. + pub fn invalid_request(id: Id, jsonrpc: Option) -> Self { + Self::from_error(Error::new(ErrorCode::InvalidRequest), id, jsonrpc) + } +} + +/// Synchronous response, pre-serialized +#[derive(Clone, Debug, PartialEq)] +pub enum SerializedResponse { + /// Single response + Single(SerializedOutput), + /// Response to batch request (batch of responses) + Batch(Vec), +} + +impl SerializedResponse { + /// Creates new `Response` with given error and `Version` + pub fn from(error: Error, jsonrpc: Option) -> Self { + Self::Single(SerializedOutput::from_error(error, Id::Null, jsonrpc)) + } +} + #[test] fn success_output_serialize() { use serde_json; @@ -167,7 +205,7 @@ fn success_output_deserialize() { fn failure_output_serialize() { use serde_json; - let fo = Output::Failure(Failure { + let fo: Output = Output::Failure(Failure { jsonrpc: Some(Version::V2), error: Error::parse_error(), id: Id::Num(1), @@ -184,7 +222,7 @@ fn failure_output_serialize() { fn failure_output_serialize_jsonrpc_1() { use serde_json; - let fo = Output::Failure(Failure { + let fo: Output = Output::Failure(Failure { jsonrpc: None, error: Error::parse_error(), id: Id::Num(1), diff --git a/derive/src/to_delegate.rs b/derive/src/to_delegate.rs index 7e9148a82..e0ff02767 100644 --- a/derive/src/to_delegate.rs +++ b/derive/src/to_delegate.rs @@ -281,8 +281,6 @@ impl RpcMethod { Ok((#(#tuple_fields, )*)) => { use self::_futures::{FutureExt, TryFutureExt}; let fut = self::_jsonrpc_core::WrapFuture::into_future((method)#method_call) - .map_ok(|value| _jsonrpc_core::to_value(value) - .expect("Expected always-serializable type; qed")) .map_err(Into::into as fn(_) -> _jsonrpc_core::Error); _futures::future::Either::Left(fut) }, diff --git a/derive/tests/client.rs b/derive/tests/client.rs index 8ac820ab0..986ff9a48 100644 --- a/derive/tests/client.rs +++ b/derive/tests/client.rs @@ -74,7 +74,9 @@ mod named_params { }); let mut handler = IoHandler::new(); - handler.add_sync_method("call_with_named", |params: Params| Ok(params.into())); + handler.add_sync_method("call_with_named", |params: Params| { + Ok(jsonrpc_core::Value::from(params)) + }); let (client, rpc_client) = local::connect::(handler); let fut = client @@ -112,7 +114,7 @@ mod raw_params { }); let mut handler = IoHandler::new(); - handler.add_sync_method("call_raw", |params: Params| Ok(params.into())); + handler.add_sync_method("call_raw", |params: Params| Ok(jsonrpc_core::Value::from(params))); let (client, rpc_client) = local::connect::(handler); let fut = client diff --git a/http/src/handler.rs b/http/src/handler.rs index 0466ee844..0193cf95b 100644 --- a/http/src/handler.rs +++ b/http/src/handler.rs @@ -395,33 +395,41 @@ where } fn process_health(&self, method: String, metadata: M) -> Result, hyper::Error> { - use self::core::types::{Call, Failure, Id, MethodCall, Output, Params, Request, Success, Version}; + use self::core::types::{Call, Failure, Id, MethodCall, Output, Params, Success, Version}; // Create a request - let call = Request::Single(Call::MethodCall(MethodCall { + let call = Call::MethodCall(MethodCall { jsonrpc: Some(Version::V2), method, params: Params::None, id: Id::Num(1), - })); + }); let response = match self.jsonrpc_handler.upgrade() { - Some(h) => h.handler.handle_rpc_request(call, metadata), + Some(h) => h.handler.handle_call(call, metadata), None => return Ok(RpcPollState::Ready(RpcHandlerState::Writing(Response::closing()))), }; Ok(RpcPollState::Ready(RpcHandlerState::WaitingForResponse(Box::pin( async { match response.await { - Some(core::Response::Single(Output::Success(Success { result, .. }))) => { - let result = serde_json::to_string(&result).expect("Serialization of result is infallible;qed"); - - Response::ok(result) - } - Some(core::Response::Single(Output::Failure(Failure { error, .. }))) => { - let result = serde_json::to_string(&error).expect("Serialization of error is infallible;qed"); - - Response::service_unavailable(result) + Some(wrap_output) => { + // Extract the "response" or "error" field from the response json which has already been serialized. + // Could use serde_json's RawValue to avoid parsing inside data. + let output: Output = serde_json::from_str(&wrap_output.response) + .expect("Deserialization of serialized response is infallible;qed"); + match output { + Output::Success(Success { result, .. }) => { + let out = serde_json::to_string(&result) + .expect("Serialization of deserialized result is infallible;qed"); + Response::ok(out) + } + Output::Failure(Failure { error, .. }) => { + let result = serde_json::to_string(&error) + .expect("Serialization of deserialized result is infallible;qed"); + Response::service_unavailable(result) + } + } } e => Response::internal_error(format!("Invalid response for health request: {:?}", e)), } @@ -430,7 +438,7 @@ where } fn process_rest(&self, uri: hyper::Uri, metadata: M) -> Result, hyper::Error> { - use self::core::types::{Call, Id, MethodCall, Params, Request, Value, Version}; + use self::core::types::{Call, Id, MethodCall, Params, Value, Version}; // skip the initial / let mut it = uri.path().split('/').skip(1); @@ -446,22 +454,20 @@ where } // Create a request - let call = Request::Single(Call::MethodCall(MethodCall { + let call = Call::MethodCall(MethodCall { jsonrpc: Some(Version::V2), method: method.into(), params: Params::Array(params), id: Id::Num(1), - })); + }); let response = match self.jsonrpc_handler.upgrade() { - Some(h) => h.handler.handle_rpc_request(call, metadata), + Some(h) => h.handler.handle_call(call, metadata), None => return Ok(RpcPollState::Ready(RpcHandlerState::Writing(Response::closing()))), }; Ok(RpcPollState::Ready(RpcHandlerState::Waiting(Box::pin(async { - response - .await - .map(|x| serde_json::to_string(&x).expect("Serialization of response is infallible;qed")) + response.await.map(|x| x.response) })))) } diff --git a/http/src/tests.rs b/http/src/tests.rs index 302642599..23a053af1 100644 --- a/http/src/tests.rs +++ b/http/src/tests.rs @@ -1,6 +1,6 @@ use jsonrpc_core; -use self::jsonrpc_core::{Error, ErrorCode, IoHandler, Params, Value}; +use self::jsonrpc_core::{Error, ErrorCode, IoHandler, Params, Result, Value}; use std::io::{Read, Write}; use std::net::TcpStream; use std::str::Lines; @@ -58,7 +58,9 @@ fn io() -> IoHandler { Ok((num,)) => Ok(Value::String(format!("world: {}", num))), _ => Ok(Value::String("world".into())), }); - io.add_sync_method("fail", |_: Params| Err(Error::new(ErrorCode::ServerError(-34)))); + io.add_sync_method("fail", |_: Params| -> Result { + Err(Error::new(ErrorCode::ServerError(-34))) + }); io.add_method("hello_async", |_params: Params| { futures::future::ready(Ok(Value::String("world".into()))) }); diff --git a/pubsub/src/delegates.rs b/pubsub/src/delegates.rs index 7df77e079..b26ea3849 100644 --- a/pubsub/src/delegates.rs +++ b/pubsub/src/delegates.rs @@ -1,3 +1,4 @@ +use serde::Serialize; use std::marker::PhantomData; use std::sync::Arc; @@ -98,21 +99,23 @@ where // TODO [ToDr] Consider sync? /// Adds async method to the delegate. - pub fn add_method(&mut self, name: &str, method: F) + pub fn add_method(&mut self, name: &str, method: F) where F: Fn(&T, Params) -> I, - I: Future> + Send + 'static, + I: Future> + Send + 'static, F: Send + Sync + 'static, + R: Serialize + Send + 'static, { self.inner.add_method(name, method) } /// Adds async method with metadata to the delegate. - pub fn add_method_with_meta(&mut self, name: &str, method: F) + pub fn add_method_with_meta(&mut self, name: &str, method: F) where F: Fn(&T, Params, M) -> I, - I: Future> + Send + 'static, + I: Future> + Send + 'static, F: Send + Sync + 'static, + R: Serialize + Send + 'static, { self.inner.add_method_with_meta(name, method) } diff --git a/test/src/lib.rs b/test/src/lib.rs index 3143cc2c5..e0de55668 100644 --- a/test/src/lib.rs +++ b/test/src/lib.rs @@ -30,7 +30,7 @@ //! // You can also test RPC created without macros: //! let rpc = { //! let mut io = IoHandler::new(); -//! io.add_sync_method("rpc_test_method", |_| { +//! io.add_sync_method("rpc_test_method", |_| -> Result { //! Err(Error::internal_error()) //! }); //! test::Rpc::from(io) @@ -118,17 +118,18 @@ impl Rpc { .expect("We are sending a method call not notification."); // extract interesting part from the response - let extracted = match rpc::serde_from_str(&response).expect("We will always get a single output.") { - response::Output::Success(response::Success { result, .. }) => match encoding { - Encoding::Compact => serde_json::to_string(&result), - Encoding::Pretty => serde_json::to_string_pretty(&result), - }, - response::Output::Failure(response::Failure { error, .. }) => match encoding { - Encoding::Compact => serde_json::to_string(&error), - Encoding::Pretty => serde_json::to_string_pretty(&error), - }, - } - .expect("Serialization is infallible; qed"); + let extracted = + match rpc::serde_from_str::(&response).expect("We will always get a single output.") { + response::Output::Success(response::Success { result, .. }) => match encoding { + Encoding::Compact => serde_json::to_string(&result), + Encoding::Pretty => serde_json::to_string_pretty(&result), + }, + response::Output::Failure(response::Failure { error, .. }) => match encoding { + Encoding::Compact => serde_json::to_string(&error), + Encoding::Pretty => serde_json::to_string_pretty(&error), + }, + } + .expect("Serialization is infallible; qed"); println!("\n{}\n --> {}\n", request, extracted);