Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove BoxFuture from core and replace with manual future impl #30

Merged
merged 5 commits into from
Mar 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 84 additions & 37 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@

#[macro_use]
extern crate error_chain;
#[macro_use]
extern crate futures;
extern crate jsonrpc_core;
#[macro_use]
Expand All @@ -65,6 +66,7 @@ extern crate serde;
#[cfg_attr(test, macro_use)]
extern crate serde_json;

use futures::Async;
use futures::future::Future;
use jsonrpc_core::types::{Id, MethodCall, Params, Version};
use serde_json::Value as JsonValue;
Expand Down Expand Up @@ -102,34 +104,91 @@ error_chain! {
}
}

/// A `Future` trait object.
pub type BoxFuture<T, E> = Box<Future<Item = T, Error = E> + Send>;

/// A lazy RPC call `Future`. The actual call has not been sent when an instance of this type
/// is returned from a client generated by the macro in this crate. This is a `Future` that, when
/// executed, performs the RPC call.
pub struct RpcRequest<T>(BoxFuture<T, Error>);
pub struct RpcRequest<T, F>(::std::result::Result<InnerRpcRequest<T, F>, Option<Error>>);

impl<T> RpcRequest<T> {
impl<T, E, F> RpcRequest<T, F>
where
T: serde::de::DeserializeOwned + Send + 'static,
E: ::std::error::Error + Send + 'static,
F: Future<Item = Vec<u8>, Error = E> + Send + 'static,
{
/// Consume this RPC request and run it synchronously. This blocks until the RPC call is done,
/// then the result of the call is returned.
pub fn call(self) -> Result<T> {
self.0.wait()
self.wait()
}
}

impl<T, E, F> Future for RpcRequest<T, F>
where
T: serde::de::DeserializeOwned + Send + 'static,
E: ::std::error::Error + Send + 'static,
F: Future<Item = Vec<u8>, Error = E> + Send + 'static,
{
type Item = T;
type Error = Error;

fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
match self.0 {
Ok(ref mut inner) => inner.poll(),
Err(ref mut error_option) => Err(error_option
.take()
.expect("Cannot call RpcRequest poll twice when in error state")),
}
}
}

struct InnerRpcRequest<T, F> {
transport_future: F,
id: Id,
_marker: ::std::marker::PhantomData<T>,
}

impl<T, F> InnerRpcRequest<T, F> {
fn new(transport_future: F, id: Id) -> Self {
Self {
transport_future,
id,
_marker: ::std::marker::PhantomData,
}
}
}

impl<T> Future for RpcRequest<T> {
impl<T, E, F> Future for InnerRpcRequest<T, F>
where
T: serde::de::DeserializeOwned + Send + 'static,
E: ::std::error::Error + Send + 'static,
F: Future<Item = Vec<u8>, Error = E> + Send + 'static,
{
type Item = T;
type Error = Error;

fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
self.0.poll()
let response_raw = try_ready!(
self.transport_future
.poll()
.chain_err(|| ErrorKind::TransportError)
);
trace!(
"Deserializing {} byte response to request with id {:?}",
response_raw.len(),
self.id
);
response::parse(&response_raw, &self.id).map(|t| Async::Ready(t))
}
}


/// Trait for types acting as a transport layer for the JSON-RPC 2.0 clients generated by the
/// `jsonrpc_client` macro.
pub trait Transport {
/// The future type this transport returns on send operations.
type Future: Future<Item = Vec<u8>, Error = Self::Error> + Send + 'static;

/// The type of error that this transport emits if it fails.
type Error: ::std::error::Error + Send + 'static;

Expand All @@ -139,7 +198,7 @@ pub trait Transport {

/// Sends the given data over the transport and returns a future that will complete with the
/// response to the request, or the transport specific error if something went wrong.
fn send(&self, json_data: Vec<u8>) -> BoxFuture<Vec<u8>, Self::Error>;
fn send(&self, json_data: Vec<u8>) -> Self::Future;
}


Expand All @@ -150,41 +209,25 @@ pub trait Transport {
/// # Not intended for direct use
/// This is being called from the client structs generated by the `jsonrpc_client` macro. This
/// function is not intended to be used directly, only the generated structs should call this.
pub fn call_method<T, P, R>(transport: &mut T, method: String, params: P) -> RpcRequest<R>
pub fn call_method<T, P, R>(
transport: &mut T,
method: String,
params: P,
) -> RpcRequest<R, T::Future>
where
T: Transport,
P: serde::Serialize,
R: serde::de::DeserializeOwned + Send + 'static,
{
let raw_id = transport.get_next_id();
let id = Id::Num(raw_id);
trace!(
"Serializing call to method \"{}\" with id {}",
method,
raw_id
);
let request_serialization_result = serialize_request(id.clone(), method.clone(), params)
let id = Id::Num(transport.get_next_id());
trace!("Serializing call to method \"{}\" with id {:?}", method, id);
let request_serialization_result = serialize_request(id.clone(), method, params)
.chain_err(|| ErrorKind::SerializeError);
match request_serialization_result {
Err(e) => RpcRequest(Box::new(futures::future::err(e))),
Err(e) => RpcRequest(Err(Some(e))),
Ok(request_raw) => {
trace!(
"Sending call to method \"{}\" with id {} to transport",
method,
raw_id
);
let future = transport
.send(request_raw)
.map_err(|e| Error::with_chain(e, ErrorKind::TransportError))
.and_then(move |response_raw: Vec<u8>| {
trace!(
"Deserializing response to method \"{}\" with id {}",
method,
raw_id
);
response::parse::<R>(&response_raw, id)
});
RpcRequest(Box::new(future))
let transport_future = transport.send(request_raw);
RpcRequest(Ok(InnerRpcRequest::new(transport_future, id)))
}
}
}
Expand Down Expand Up @@ -220,19 +263,22 @@ mod tests {
use super::*;
use std::io;

pub type BoxFuture<T, E> = Box<Future<Item = T, Error = E> + Send>;

/// A test transport that just echoes back a response containing the entire request as the
/// result.
#[derive(Clone)]
struct EchoTransport;

impl Transport for EchoTransport {
type Future = BoxFuture<Vec<u8>, io::Error>;
type Error = io::Error;

fn get_next_id(&mut self) -> u64 {
1
}

fn send(&self, json_data: Vec<u8>) -> BoxFuture<Vec<u8>, io::Error> {
fn send(&self, json_data: Vec<u8>) -> Self::Future {
let json = json!({
"jsonrpc": "2.0",
"id": 1,
Expand All @@ -247,13 +293,14 @@ mod tests {
struct ErrorTransport;

impl Transport for ErrorTransport {
type Future = BoxFuture<Vec<u8>, io::Error>;
type Error = io::Error;

fn get_next_id(&mut self) -> u64 {
1
}

fn send(&self, _json_data: Vec<u8>) -> BoxFuture<Vec<u8>, io::Error> {
fn send(&self, _json_data: Vec<u8>) -> Self::Future {
let json = json!({
"jsonrpc": "2.0",
"id": 1,
Expand Down
10 changes: 5 additions & 5 deletions core/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
#[macro_export]
macro_rules! jsonrpc_client {
(
$(#[$struct_doc:meta])*
$(#[$struct_attr:meta])*
pub struct $struct_name:ident {$(
$(#[$doc:meta])*
$(#[$attr:meta])*
pub fn $method:ident(&mut $selff:ident $(, $arg_name:ident: $arg_ty:ty)*)
-> RpcRequest<$return_ty:ty>;
)*}
) => (
$(#[$struct_doc])*
$(#[$struct_attr])*
pub struct $struct_name<T: $crate::Transport> {
transport: T,
}
Expand All @@ -30,9 +30,9 @@ macro_rules! jsonrpc_client {
}

$(
$(#[$doc])*
$(#[$attr])*
pub fn $method(&mut $selff $(, $arg_name: $arg_ty)*)
-> $crate::RpcRequest<$return_ty>
-> $crate::RpcRequest<$return_ty, T::Future>
{
let method = String::from(stringify!($method));
let params = expand_params!($($arg_name,)*);
Expand Down
10 changes: 4 additions & 6 deletions core/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use serde_json;

/// Parses a binary response into json, extracts the "result" field and tries to deserialize that
/// to the desired type.
pub fn parse<R>(response_raw: &[u8], expected_id: Id) -> Result<R>
pub fn parse<R>(response_raw: &[u8], expected_id: &Id) -> Result<R>
where
R: serde::de::DeserializeOwned,
{
Expand All @@ -24,17 +24,15 @@ where
ErrorKind::ResponseError("Not JSON-RPC 2.0 compatible")
);
ensure!(
response.id() == &expected_id,
response.id() == expected_id,
ErrorKind::ResponseError("Response id not equal to request id")
);
match response {
Output::Success(success) => {
trace!("Received json result: {}", success.result);
serde_json::from_value::<R>(success.result)
serde_json::from_value(success.result)
.chain_err(|| ErrorKind::ResponseError("Not valid for target type"))
}
Output::Failure(failure) => {
Err(ErrorKind::JsonRpcError(failure.error).into())
}
Output::Failure(failure) => bail!(ErrorKind::JsonRpcError(failure.error)),
}
}
8 changes: 5 additions & 3 deletions http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ extern crate native_tls;
use futures::{future, Future, Stream};
use futures::sync::{mpsc, oneshot};
use hyper::{Client, Request, StatusCode, Uri};
use jsonrpc_client_core::{BoxFuture, Transport};
use jsonrpc_client_core::Transport;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
Expand Down Expand Up @@ -119,6 +119,7 @@ error_chain! {
type CoreSender = mpsc::UnboundedSender<(Request, oneshot::Sender<Result<Vec<u8>>>)>;
type CoreReceiver = mpsc::UnboundedReceiver<(Request, oneshot::Sender<Result<Vec<u8>>>)>;


/// The main struct of the HTTP transport implementation for
/// [`jsonrpc_client_core`](../jsonrpc_client_core).
///
Expand Down Expand Up @@ -336,14 +337,15 @@ impl HttpHandle {
}

impl Transport for HttpHandle {
type Future = Box<Future<Item = Vec<u8>, Error = Self::Error> + Send>;
type Error = Error;

fn get_next_id(&mut self) -> u64 {
self.id.fetch_add(1, Ordering::SeqCst) as u64
}

fn send(&self, json_data: Vec<u8>) -> BoxFuture<Vec<u8>, Error> {
let request = self.create_request(json_data.clone());
fn send(&self, json_data: Vec<u8>) -> Self::Future {
let request = self.create_request(json_data);
let (response_tx, response_rx) = oneshot::channel();
let future = future::result(self.request_tx.unbounded_send((request, response_tx)))
.map_err(|e| {
Expand Down