Skip to content

Commit

Permalink
Remove BoxFuture from core and replace with manual future impl
Browse files Browse the repository at this point in the history
  • Loading branch information
faern committed Mar 7, 2018
1 parent 177e232 commit 5ccea9a
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 44 deletions.
122 changes: 86 additions & 36 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ extern crate serde;
#[cfg_attr(test, macro_use)]
extern crate serde_json;

use futures::Async;
use futures::future::Future;
use jsonrpc_core::types::{Id, MethodCall, Params, Version};
use serde_json::Value as JsonValue;
Expand Down Expand Up @@ -102,34 +103,95 @@ error_chain! {
}
}

/// A `Future` trait object.
pub type BoxFuture<T, E> = Box<Future<Item = T, Error = E> + Send>;

/// A lazy RPC call `Future`. The actual call has not been sent when an instance of this type
/// is returned from a client generated by the macro in this crate. This is a `Future` that, when
/// executed, performs the RPC call.
pub struct RpcRequest<T>(BoxFuture<T, Error>);
pub struct RpcRequest<T, F>(::std::result::Result<InnerRpcRequest<T, F>, Option<Error>>);

impl<T> RpcRequest<T> {
impl<T, E, F> RpcRequest<T, F>
where
T: serde::de::DeserializeOwned + Send + 'static,
E: ::std::error::Error + Send + 'static,
F: Future<Item = Vec<u8>, Error = E> + Send + 'static,
{
/// Consume this RPC request and run it synchronously. This blocks until the RPC call is done,
/// then the result of the call is returned.
pub fn call(self) -> Result<T> {
self.0.wait()
self.wait()
}
}

impl<T> Future for RpcRequest<T> {
impl<T, E, F> Future for RpcRequest<T, F>
where
T: serde::de::DeserializeOwned + Send + 'static,
E: ::std::error::Error + Send + 'static,
F: Future<Item = Vec<u8>, Error = E> + Send + 'static,
{
type Item = T;
type Error = Error;

fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
self.0.poll()
match self.0 {
Ok(ref mut inner) => inner.poll(),
Err(ref mut error_option) => Err(error_option
.take()
.expect("Cannot call RpcRequest poll twice when in error state")),
}
}
}

struct InnerRpcRequest<T, F> {
transport_future: F,
method: String,
id: Id,
_marker: ::std::marker::PhantomData<T>,
}

impl<T, F> InnerRpcRequest<T, F> {
fn new(transport_future: F, method: String, id: Id) -> Self {
Self {
transport_future,
method,
id,
_marker: ::std::marker::PhantomData,
}
}
}

impl<T, E, F> Future for InnerRpcRequest<T, F>
where
T: serde::de::DeserializeOwned + Send + 'static,
E: ::std::error::Error + Send + 'static,
F: Future<Item = Vec<u8>, Error = E> + Send + 'static,
{
type Item = T;
type Error = Error;

fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
match self.transport_future.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(response_raw)) => {
trace!(
"Deserializing {} byte response to method \"{}\" with id {:?}",
response_raw.len(),
self.method,
self.id
);
response::parse::<T>(&response_raw, &self.id).map(|t| Async::Ready(t))
}
Err(e) => Err(Error::with_chain(e, ErrorKind::TransportError)),
}
}
}


/// Trait for types acting as a transport layer for the JSON-RPC 2.0 clients generated by the
/// `jsonrpc_client` macro.
pub trait Transport {
/// The future type this transport returns on send operations.
type Future: Future<Item = Vec<u8>, Error = Self::Error> + Send + 'static;

/// The type of error that this transport emits if it fails.
type Error: ::std::error::Error + Send + 'static;

Expand All @@ -139,7 +201,7 @@ pub trait Transport {

/// Sends the given data over the transport and returns a future that will complete with the
/// response to the request, or the transport specific error if something went wrong.
fn send(&self, json_data: Vec<u8>) -> BoxFuture<Vec<u8>, Self::Error>;
fn send(&self, json_data: Vec<u8>) -> Self::Future;
}


Expand All @@ -150,41 +212,25 @@ pub trait Transport {
/// # Not intended for direct use
/// This is being called from the client structs generated by the `jsonrpc_client` macro. This
/// function is not intended to be used directly, only the generated structs should call this.
pub fn call_method<T, P, R>(transport: &mut T, method: String, params: P) -> RpcRequest<R>
pub fn call_method<T, P, R>(
transport: &mut T,
method: String,
params: P,
) -> RpcRequest<R, T::Future>
where
T: Transport,
P: serde::Serialize,
R: serde::de::DeserializeOwned + Send + 'static,
{
let raw_id = transport.get_next_id();
let id = Id::Num(raw_id);
trace!(
"Serializing call to method \"{}\" with id {}",
method,
raw_id
);
let id = Id::Num(transport.get_next_id());
trace!("Serializing call to method \"{}\" with id {:?}", method, id);
let request_serialization_result = serialize_request(id.clone(), method.clone(), params)
.chain_err(|| ErrorKind::SerializeError);
match request_serialization_result {
Err(e) => RpcRequest(Box::new(futures::future::err(e))),
Err(e) => RpcRequest(Err(Some(e))),
Ok(request_raw) => {
trace!(
"Sending call to method \"{}\" with id {} to transport",
method,
raw_id
);
let future = transport
.send(request_raw)
.map_err(|e| Error::with_chain(e, ErrorKind::TransportError))
.and_then(move |response_raw: Vec<u8>| {
trace!(
"Deserializing response to method \"{}\" with id {}",
method,
raw_id
);
response::parse::<R>(&response_raw, id)
});
RpcRequest(Box::new(future))
let transport_future = transport.send(request_raw);
RpcRequest(Ok(InnerRpcRequest::new(transport_future, method, id)))
}
}
}
Expand Down Expand Up @@ -220,19 +266,22 @@ mod tests {
use super::*;
use std::io;

pub type BoxFuture<T, E> = Box<Future<Item = T, Error = E> + Send>;

/// A test transport that just echoes back a response containing the entire request as the
/// result.
#[derive(Clone)]
struct EchoTransport;

impl Transport for EchoTransport {
type Future = BoxFuture<Vec<u8>, io::Error>;
type Error = io::Error;

fn get_next_id(&mut self) -> u64 {
1
}

fn send(&self, json_data: Vec<u8>) -> BoxFuture<Vec<u8>, io::Error> {
fn send(&self, json_data: Vec<u8>) -> Self::Future {
let json = json!({
"jsonrpc": "2.0",
"id": 1,
Expand All @@ -247,13 +296,14 @@ mod tests {
struct ErrorTransport;

impl Transport for ErrorTransport {
type Future = BoxFuture<Vec<u8>, io::Error>;
type Error = io::Error;

fn get_next_id(&mut self) -> u64 {
1
}

fn send(&self, _json_data: Vec<u8>) -> BoxFuture<Vec<u8>, io::Error> {
fn send(&self, _json_data: Vec<u8>) -> Self::Future {
let json = json!({
"jsonrpc": "2.0",
"id": 1,
Expand Down
2 changes: 1 addition & 1 deletion core/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ macro_rules! jsonrpc_client {
$(
$(#[$doc])*
pub fn $method(&mut $selff $(, $arg_name: $arg_ty)*)
-> $crate::RpcRequest<$return_ty>
-> $crate::RpcRequest<$return_ty, T::Future>
{
let method = String::from(stringify!($method));
let params = expand_params!($($arg_name,)*);
Expand Down
8 changes: 3 additions & 5 deletions core/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use serde_json;

/// Parses a binary response into json, extracts the "result" field and tries to deserialize that
/// to the desired type.
pub fn parse<R>(response_raw: &[u8], expected_id: Id) -> Result<R>
pub fn parse<R>(response_raw: &[u8], expected_id: &Id) -> Result<R>
where
R: serde::de::DeserializeOwned,
{
Expand All @@ -24,7 +24,7 @@ where
ErrorKind::ResponseError("Not JSON-RPC 2.0 compatible")
);
ensure!(
response.id() == &expected_id,
response.id() == expected_id,
ErrorKind::ResponseError("Response id not equal to request id")
);
match response {
Expand All @@ -33,8 +33,6 @@ where
serde_json::from_value::<R>(success.result)
.chain_err(|| ErrorKind::ResponseError("Not valid for target type"))
}
Output::Failure(failure) => {
Err(ErrorKind::JsonRpcError(failure.error).into())
}
Output::Failure(failure) => Err(ErrorKind::JsonRpcError(failure.error).into()),
}
}
8 changes: 6 additions & 2 deletions http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ extern crate native_tls;
use futures::{future, Future, Stream};
use futures::sync::{mpsc, oneshot};
use hyper::{Client, Request, StatusCode, Uri};
use jsonrpc_client_core::{BoxFuture, Transport};
use jsonrpc_client_core::Transport;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
Expand Down Expand Up @@ -119,6 +119,9 @@ error_chain! {
type CoreSender = mpsc::UnboundedSender<(Request, oneshot::Sender<Result<Vec<u8>>>)>;
type CoreReceiver = mpsc::UnboundedReceiver<(Request, oneshot::Sender<Result<Vec<u8>>>)>;

/// Future type returned from `HttpTransport`.
pub type HttpFuture<T, E> = Box<Future<Item = T, Error = E> + Send>;

/// The main struct of the HTTP transport implementation for
/// [`jsonrpc_client_core`](../jsonrpc_client_core).
///
Expand Down Expand Up @@ -336,13 +339,14 @@ impl HttpHandle {
}

impl Transport for HttpHandle {
type Future = HttpFuture<Vec<u8>, Error>;
type Error = Error;

fn get_next_id(&mut self) -> u64 {
self.id.fetch_add(1, Ordering::SeqCst) as u64
}

fn send(&self, json_data: Vec<u8>) -> BoxFuture<Vec<u8>, Error> {
fn send(&self, json_data: Vec<u8>) -> Self::Future {
let request = self.create_request(json_data.clone());
let (response_tx, response_rx) = oneshot::channel();
let future = future::result(self.request_tx.unbounded_send((request, response_tx)))
Expand Down

0 comments on commit 5ccea9a

Please sign in to comment.