Skip to content

Commit

Permalink
improve http mapping (#100)
Browse files Browse the repository at this point in the history
* improve http mapping

* fix
  • Loading branch information
xlc authored Oct 1, 2023
1 parent 7f38290 commit 93f1d18
Show file tree
Hide file tree
Showing 6 changed files with 244 additions and 17 deletions.
2 changes: 1 addition & 1 deletion benches/bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ fn config() -> Config {
listen_address: SUBWAY_SERVER_ADDR.to_string(),
port: SUBWAY_SERVER_PORT,
max_connections: 1024 * 1024,
health: None,
http_methods: Vec::new(),
}),
..Default::default()
},
Expand Down
8 changes: 5 additions & 3 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ extensions:
port: 9944
listen_address: '0.0.0.0'
max_connections: 2000
health:
path: /health
method: system_health
http_methods:
- path: /health
method: system_health
- path: /liveness
method: chain_getBlockHash

middlewares:
methods:
Expand Down
32 changes: 21 additions & 11 deletions src/extensions/server.rs → src/extensions/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
use std::{future::Future, net::SocketAddr};

use async_trait::async_trait;
use jsonrpsee::server::{
middleware::ProxyGetRequestLayer,
{RandomStringIdProvider, RpcModule, ServerBuilder, ServerHandle},
};
use jsonrpsee::server::{RandomStringIdProvider, RpcModule, ServerBuilder, ServerHandle};
use serde::Deserialize;

use crate::{extension::Extension, middleware::ExtensionRegistry};
use proxy_get_request::ProxyGetRequestLayer;

use self::proxy_get_request::ProxyGetRequestMethod;

mod proxy_get_request;

pub struct Server {
config: ServerConfig,
}

#[derive(Deserialize, Debug, Clone)]
pub struct HealthConfig {
pub struct HttpMethodsConfig {
pub path: String,
pub method: String,
}
Expand All @@ -25,7 +27,7 @@ pub struct ServerConfig {
pub listen_address: String,
pub max_connections: u32,
#[serde(default)]
pub health: Option<HealthConfig>,
pub http_methods: Vec<HttpMethodsConfig>,
}

#[async_trait]
Expand All @@ -49,11 +51,19 @@ impl Server {
&self,
builder: impl FnOnce() -> Fut,
) -> anyhow::Result<(SocketAddr, ServerHandle)> {
let service_builder =
tower::ServiceBuilder::new().option_layer(self.config.health.as_ref().map(|h| {
ProxyGetRequestLayer::new(h.path.clone(), h.method.clone())
.expect("Invalid health config")
}));
let service_builder = tower::ServiceBuilder::new().layer(
ProxyGetRequestLayer::new(
self.config
.http_methods
.iter()
.map(|m| ProxyGetRequestMethod {
path: m.path.clone(),
method: m.method.clone(),
})
.collect(),
)
.expect("Invalid health config"),
);

let server = ServerBuilder::default()
.set_middleware(service_builder)
Expand Down
215 changes: 215 additions & 0 deletions src/extensions/server/proxy_get_request.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

//! Middleware that proxies requests at a specified URI to internal
//! RPC method calls.
use hyper::header::{ACCEPT, CONTENT_TYPE};
use hyper::http::HeaderValue;
use hyper::{Body, Method, Request, Response, Uri};
use jsonrpsee::{
core::Error as RpcError,
types::{Id, RequestSer},
};
use std::collections::HashMap;
use std::error::Error;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tower::{Layer, Service};

#[derive(Debug, Clone)]
pub struct ProxyGetRequestMethod {
pub path: String,
pub method: String,
}

#[derive(Debug, Clone)]
pub struct ProxyGetRequestLayer {
methods: Vec<ProxyGetRequestMethod>,
}

impl ProxyGetRequestLayer {
pub fn new(methods: Vec<ProxyGetRequestMethod>) -> Result<Self, RpcError> {
for method in &methods {
if !method.path.starts_with('/') {
return Err(RpcError::Custom(
"ProxyGetRequestLayer path must start with `/`".to_string(),
));
}
}

Ok(Self { methods })
}
}
impl<S> Layer<S> for ProxyGetRequestLayer {
type Service = ProxyGetRequest<S>;

fn layer(&self, inner: S) -> Self::Service {
ProxyGetRequest::new(inner, self.methods.clone())
.expect("Path already validated in ProxyGetRequestLayer; qed")
}
}

#[derive(Debug, Clone)]
pub struct ProxyGetRequest<S> {
inner: S,
methods: HashMap<String, String>,
}

impl<S> ProxyGetRequest<S> {
pub fn new(inner: S, methods: Vec<ProxyGetRequestMethod>) -> Result<Self, RpcError> {
let mut map = HashMap::with_capacity(methods.len());

for method in methods {
if !method.path.starts_with('/') {
return Err(RpcError::Custom(
"ProxyGetRequestLayer path must start with `/`".to_string(),
));
}

map.insert(method.path, method.method);
}

Ok(Self {
inner,
methods: map,
})
}
}

impl<S> Service<Request<Body>> for ProxyGetRequest<S>
where
S: Service<Request<Body>, Response = Response<Body>>,
S::Response: 'static,
S::Error: Into<Box<dyn Error + Send + Sync>> + 'static,
S::Future: Send + 'static,
{
type Response = S::Response;
type Error = Box<dyn Error + Send + Sync + 'static>;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx).map_err(Into::into)
}

fn call(&mut self, mut req: Request<Body>) -> Self::Future {
let method = self.methods.get(req.uri().path());
let modify = method.is_some() && req.method() == Method::GET;

// Proxy the request to the appropriate method call.
if modify {
// RPC methods are accessed with `POST`.
*req.method_mut() = Method::POST;
// Precautionary remove the URI.
*req.uri_mut() = Uri::from_static("/");

// Requests must have the following headers:
req.headers_mut()
.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
req.headers_mut()
.insert(ACCEPT, HeaderValue::from_static("application/json"));

// Adjust the body to reflect the method call.
let body = Body::from(
serde_json::to_string(&RequestSer::borrowed(&Id::Number(0), method.unwrap(), None))
.expect("Valid request; qed"),
);
req = req.map(|_| body);
}

// Call the inner service and get a future that resolves to the response.
let fut = self.inner.call(req);

// Adjust the response if needed.
let res_fut = async move {
let res = fut.await.map_err(|err| err.into())?;

// Nothing to modify: return the response as is.
if !modify {
return Ok(res);
}

let body = res.into_body();
let bytes = hyper::body::to_bytes(body).await?;

#[derive(serde::Deserialize, Debug)]
struct RpcPayload<'a> {
#[serde(borrow)]
result: &'a serde_json::value::RawValue,
}

let response = if let Ok(payload) = serde_json::from_slice::<RpcPayload>(&bytes) {
response::ok_response(payload.result.to_string())
} else {
response::internal_error()
};

Ok(response)
};

Box::pin(res_fut)
}
}

mod response {
use jsonrpsee::types::{error::ErrorCode, ErrorObjectOwned, Id, Response, ResponsePayload};

const JSON: &str = "application/json; charset=utf-8";

/// Create a response body.
fn from_template<S: Into<hyper::Body>>(
status: hyper::StatusCode,
body: S,
content_type: &'static str,
) -> hyper::Response<hyper::Body> {
hyper::Response::builder()
.status(status)
.header(
"content-type",
hyper::header::HeaderValue::from_static(content_type),
)
.body(body.into())
// Parsing `StatusCode` and `HeaderValue` is infalliable but
// parsing body content is not.
.expect("Unable to parse response body for type conversion")
}

/// Create a valid JSON response.
pub(crate) fn ok_response(body: String) -> hyper::Response<hyper::Body> {
from_template(hyper::StatusCode::OK, body, JSON)
}
/// Create a response for json internal error.
pub(crate) fn internal_error() -> hyper::Response<hyper::Body> {
let err = ResponsePayload::error(ErrorObjectOwned::from(ErrorCode::InternalError));
let rp = Response::new(err, Id::Null);
let error = serde_json::to_string(&rp).expect("built from known-good data; qed");

from_template(hyper::StatusCode::INTERNAL_SERVER_ERROR, error, JSON)
}
}
2 changes: 1 addition & 1 deletion src/integration_tests/merge_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ async fn merge_subscription_works() {
listen_address: "0.0.0.0".to_string(),
port: 0,
max_connections: 10,
health: None,
http_methods: Vec::new(),
}),
merge_subscription: Some(MergeSubscriptionConfig {
keep_alive_seconds: Some(1),
Expand Down
2 changes: 1 addition & 1 deletion src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ mod tests {
listen_address: "127.0.0.1".to_string(),
port: 9944,
max_connections: 1024,
health: None,
http_methods: Vec::new(),
}),
..Default::default()
},
Expand Down

0 comments on commit 93f1d18

Please sign in to comment.