From 07e4e95d20c01c4fe57e85624940dcd5a112a832 Mon Sep 17 00:00:00 2001 From: Bryan Chen Date: Fri, 29 Sep 2023 21:41:20 +1300 Subject: [PATCH] improve http mapping --- config.yml | 8 +- src/extensions/{server.rs => server/mod.rs} | 32 ++- src/extensions/server/proxy_get_request.rs | 215 ++++++++++++++++++++ 3 files changed, 241 insertions(+), 14 deletions(-) rename src/extensions/{server.rs => server/mod.rs} (66%) create mode 100644 src/extensions/server/proxy_get_request.rs diff --git a/config.yml b/config.yml index cdff9a7..5f2fe2e 100644 --- a/config.yml +++ b/config.yml @@ -17,9 +17,11 @@ extensions: port: 9944 listen_address: '0.0.0.0' max_connections: 2000 - health: - path: /health - method: system_health + http_methods: + - path: /health + method: system_health + - path: /liveness + method: chain_getBlockHash middlewares: methods: diff --git a/src/extensions/server.rs b/src/extensions/server/mod.rs similarity index 66% rename from src/extensions/server.rs rename to src/extensions/server/mod.rs index dde9ca8..bea92ea 100644 --- a/src/extensions/server.rs +++ b/src/extensions/server/mod.rs @@ -1,20 +1,22 @@ use std::{future::Future, net::SocketAddr}; use async_trait::async_trait; -use jsonrpsee::server::{ - middleware::ProxyGetRequestLayer, - {RandomStringIdProvider, RpcModule, ServerBuilder, ServerHandle}, -}; +use jsonrpsee::server::{RandomStringIdProvider, RpcModule, ServerBuilder, ServerHandle}; use serde::Deserialize; use crate::{extension::Extension, middleware::ExtensionRegistry}; +use proxy_get_request::ProxyGetRequestLayer; + +use self::proxy_get_request::ProxyGetRequestMethod; + +mod proxy_get_request; pub struct Server { config: ServerConfig, } #[derive(Deserialize, Debug, Clone)] -pub struct HealthConfig { +pub struct HttpMethodsConfig { pub path: String, pub method: String, } @@ -25,7 +27,7 @@ pub struct ServerConfig { pub listen_address: String, pub max_connections: u32, #[serde(default)] - pub health: Option, + pub http_methods: Vec, } #[async_trait] @@ -49,11 +51,19 @@ impl Server { &self, builder: impl FnOnce() -> Fut, ) -> anyhow::Result<(SocketAddr, ServerHandle)> { - let service_builder = - tower::ServiceBuilder::new().option_layer(self.config.health.as_ref().map(|h| { - ProxyGetRequestLayer::new(h.path.clone(), h.method.clone()) - .expect("Invalid health config") - })); + let service_builder = tower::ServiceBuilder::new().layer( + ProxyGetRequestLayer::new( + self.config + .http_methods + .iter() + .map(|m| ProxyGetRequestMethod { + path: m.path.clone(), + method: m.method.clone(), + }) + .collect(), + ) + .expect("Invalid health config"), + ); let server = ServerBuilder::default() .set_middleware(service_builder) diff --git a/src/extensions/server/proxy_get_request.rs b/src/extensions/server/proxy_get_request.rs new file mode 100644 index 0000000..7d1e959 --- /dev/null +++ b/src/extensions/server/proxy_get_request.rs @@ -0,0 +1,215 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Middleware that proxies requests at a specified URI to internal +//! RPC method calls. + +use hyper::header::{ACCEPT, CONTENT_TYPE}; +use hyper::http::HeaderValue; +use hyper::{Body, Method, Request, Response, Uri}; +use jsonrpsee::{ + core::Error as RpcError, + types::{Id, RequestSer}, +}; +use std::collections::HashMap; +use std::error::Error; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tower::{Layer, Service}; + +#[derive(Debug, Clone)] +pub struct ProxyGetRequestMethod { + pub path: String, + pub method: String, +} + +#[derive(Debug, Clone)] +pub struct ProxyGetRequestLayer { + methods: Vec, +} + +impl ProxyGetRequestLayer { + pub fn new(methods: Vec) -> Result { + for method in &methods { + if !method.path.starts_with('/') { + return Err(RpcError::Custom( + "ProxyGetRequestLayer path must start with `/`".to_string(), + )); + } + } + + Ok(Self { methods }) + } +} +impl Layer for ProxyGetRequestLayer { + type Service = ProxyGetRequest; + + fn layer(&self, inner: S) -> Self::Service { + ProxyGetRequest::new(inner, self.methods.clone()) + .expect("Path already validated in ProxyGetRequestLayer; qed") + } +} + +#[derive(Debug, Clone)] +pub struct ProxyGetRequest { + inner: S, + methods: HashMap, +} + +impl ProxyGetRequest { + pub fn new(inner: S, methods: Vec) -> Result { + let mut map = HashMap::with_capacity(methods.len()); + + for method in methods { + if !method.path.starts_with('/') { + return Err(RpcError::Custom( + "ProxyGetRequestLayer path must start with `/`".to_string(), + )); + } + + map.insert(method.path, method.method); + } + + Ok(Self { + inner, + methods: map, + }) + } +} + +impl Service> for ProxyGetRequest +where + S: Service, Response = Response>, + S::Response: 'static, + S::Error: Into> + 'static, + S::Future: Send + 'static, +{ + type Response = S::Response; + type Error = Box; + type Future = + Pin> + Send + 'static>>; + + #[inline] + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx).map_err(Into::into) + } + + fn call(&mut self, mut req: Request) -> Self::Future { + let method = self.methods.get(req.uri().path()); + let modify = method.is_some() && req.method() == Method::GET; + + // Proxy the request to the appropriate method call. + if modify { + // RPC methods are accessed with `POST`. + *req.method_mut() = Method::POST; + // Precautionary remove the URI. + *req.uri_mut() = Uri::from_static("/"); + + // Requests must have the following headers: + req.headers_mut() + .insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); + req.headers_mut() + .insert(ACCEPT, HeaderValue::from_static("application/json")); + + // Adjust the body to reflect the method call. + let body = Body::from( + serde_json::to_string(&RequestSer::borrowed(&Id::Number(0), method.unwrap(), None)) + .expect("Valid request; qed"), + ); + req = req.map(|_| body); + } + + // Call the inner service and get a future that resolves to the response. + let fut = self.inner.call(req); + + // Adjust the response if needed. + let res_fut = async move { + let res = fut.await.map_err(|err| err.into())?; + + // Nothing to modify: return the response as is. + if !modify { + return Ok(res); + } + + let body = res.into_body(); + let bytes = hyper::body::to_bytes(body).await?; + + #[derive(serde::Deserialize, Debug)] + struct RpcPayload<'a> { + #[serde(borrow)] + result: &'a serde_json::value::RawValue, + } + + let response = if let Ok(payload) = serde_json::from_slice::(&bytes) { + response::ok_response(payload.result.to_string()) + } else { + response::internal_error() + }; + + Ok(response) + }; + + Box::pin(res_fut) + } +} + +mod response { + use jsonrpsee::types::{error::ErrorCode, ErrorObjectOwned, Id, Response, ResponsePayload}; + + const JSON: &str = "application/json; charset=utf-8"; + + /// Create a response body. + fn from_template>( + status: hyper::StatusCode, + body: S, + content_type: &'static str, + ) -> hyper::Response { + hyper::Response::builder() + .status(status) + .header( + "content-type", + hyper::header::HeaderValue::from_static(content_type), + ) + .body(body.into()) + // Parsing `StatusCode` and `HeaderValue` is infalliable but + // parsing body content is not. + .expect("Unable to parse response body for type conversion") + } + + /// Create a valid JSON response. + pub(crate) fn ok_response(body: String) -> hyper::Response { + from_template(hyper::StatusCode::OK, body, JSON) + } + /// Create a response for json internal error. + pub(crate) fn internal_error() -> hyper::Response { + let err = ResponsePayload::error(ErrorObjectOwned::from(ErrorCode::InternalError)); + let rp = Response::new(err, Id::Null); + let error = serde_json::to_string(&rp).expect("built from known-good data; qed"); + + from_template(hyper::StatusCode::INTERNAL_SERVER_ERROR, error, JSON) + } +}