diff --git a/src/extensions/client/http.rs b/src/extensions/client/http.rs new file mode 100644 index 0000000..f8497b2 --- /dev/null +++ b/src/extensions/client/http.rs @@ -0,0 +1,61 @@ +use crate::middlewares::CallResult; +use jsonrpsee::{ + core::{ + client::{ClientT, Error}, + JsonValue, + }, + http_client::HttpClient as RpcClient, + types::{error::INTERNAL_ERROR_CODE, ErrorObject}, +}; +use std::sync::atomic::{AtomicUsize, Ordering}; + +/// Very simple struct to have a set of JsonRpsee HTTP clients and send requests to them +pub struct HttpClient { + clients: Vec, + last_sent: AtomicUsize, +} + +impl HttpClient { + pub fn new(endpoints: Vec) -> Result<(Option, Vec), Error> { + let mut other_urls = vec![]; + let clients = endpoints + .into_iter() + .filter_map(|url| { + let t_url = url.to_lowercase(); + if t_url.starts_with("http://") || t_url.starts_with("https://") { + Some(RpcClient::builder().build(url)) + } else { + other_urls.push(url); + None + } + }) + .collect::, _>>()?; + + if clients.is_empty() { + Ok((None, other_urls)) + } else { + Ok(( + Some(Self { + clients, + last_sent: AtomicUsize::new(0), + }), + other_urls, + )) + } + } + + /// Sends a request to one of the clients + /// + /// The client is selected in a round-robin fashion as fair as possible + pub async fn request(&self, method: &str, params: Vec) -> CallResult { + let client_id = self.last_sent.fetch_add(1, Ordering::Relaxed) % self.clients.len(); + + self.clients[client_id] + .request(method, params) + .await + .map_err(|e| match e { + jsonrpsee::core::client::Error::Call(e) => e, + e => ErrorObject::owned(INTERNAL_ERROR_CODE, e.to_string(), None::), + }) + } +} diff --git a/src/extensions/client/mod.rs b/src/extensions/client/mod.rs index 2e03ea7..063214c 100644 --- a/src/extensions/client/mod.rs +++ b/src/extensions/client/mod.rs @@ -29,6 +29,7 @@ use crate::{ utils::{self, errors}, }; +mod http; #[cfg(test)] pub mod mock; #[cfg(test)] @@ -38,15 +39,18 @@ const TRACER: utils::telemetry::Tracer = utils::telemetry::Tracer::new("client") pub struct Client { endpoints: Vec, - sender: tokio::sync::mpsc::Sender, - rotation_notify: Arc, + http_client: Option, + sender: Option>, + rotation_notify: Option>, retries: u32, - background_task: tokio::task::JoinHandle<()>, + background_task: Option>, } impl Drop for Client { fn drop(&mut self) { - self.background_task.abort(); + if let Some(background_task) = self.background_task.take() { + background_task.abort(); + } } } @@ -152,12 +156,26 @@ impl Client { retries: Option, ) -> Result { let endpoints: Vec<_> = endpoints.into_iter().map(|e| e.as_ref().to_string()).collect(); + let endpoints_ = endpoints.clone(); if endpoints.is_empty() { return Err(anyhow!("No endpoints provided")); } - tracing::debug!("New client with endpoints: {:?}", endpoints); + let (http_client, ws_endpoints) = http::HttpClient::new(endpoints)?; + + if ws_endpoints.is_empty() { + return Ok(Self { + http_client, + endpoints: endpoints_, + sender: None, // No websocket + rotation_notify: None, + retries: retries.unwrap_or(3), + background_task: None, + }); + } + + tracing::debug!("New client with endpoints: {:?}", ws_endpoints); let (message_tx, mut message_rx) = tokio::sync::mpsc::channel::(100); @@ -165,7 +183,6 @@ impl Client { let rotation_notify = Arc::new(Notify::new()); let rotation_notify_bg = rotation_notify.clone(); - let endpoints_ = endpoints.clone(); let background_task = tokio::spawn(async move { let connect_backoff_counter = Arc::new(AtomicU32::new(0)); @@ -177,7 +194,7 @@ impl Client { let build_ws = || async { let build = || { let current_endpoint = current_endpoint.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - let url = &endpoints[current_endpoint % endpoints.len()]; + let url = &ws_endpoints[current_endpoint % ws_endpoints.len()]; tracing::info!("Connecting to endpoint: {}", url); @@ -414,11 +431,12 @@ impl Client { } Ok(Self { + http_client, endpoints: endpoints_, - sender: message_tx, - rotation_notify, + sender: Some(message_tx), + rotation_notify: Some(rotation_notify), retries: retries.unwrap_or(3), - background_task, + background_task: Some(background_task), }) } @@ -431,22 +449,30 @@ impl Client { } pub async fn request(&self, method: &str, params: Vec) -> CallResult { - async move { - let (tx, rx) = tokio::sync::oneshot::channel(); - self.sender - .send(Message::Request { - method: method.into(), - params, - response: tx, - retries: self.retries, - }) - .await - .map_err(errors::internal_error)?; + if let Some(http_client) = &self.http_client { + return http_client.request(method, params).await; + } - rx.await.map_err(errors::internal_error)?.map_err(errors::map_error) + if let Some(sender) = self.sender.as_ref() { + async move { + let (tx, rx) = tokio::sync::oneshot::channel(); + sender + .send(Message::Request { + method: method.into(), + params, + response: tx, + retries: self.retries, + }) + .await + .map_err(errors::internal_error)?; + + rx.await.map_err(errors::internal_error)?.map_err(errors::map_error) + } + .with_context(TRACER.context(method.to_string())) + .await + } else { + Err(errors::internal_error("No sender")) } - .with_context(TRACER.context(method.to_string())) - .await } pub async fn subscribe( @@ -455,35 +481,43 @@ impl Client { params: Vec, unsubscribe: &str, ) -> Result, Error> { - async move { - let (tx, rx) = tokio::sync::oneshot::channel(); - self.sender - .send(Message::Subscribe { - subscribe: subscribe.into(), - params, - unsubscribe: unsubscribe.into(), - response: tx, - retries: self.retries, - }) - .await - .map_err(errors::internal_error)?; - - rx.await.map_err(errors::internal_error)? + if let Some(sender) = self.sender.as_ref() { + async move { + let (tx, rx) = tokio::sync::oneshot::channel(); + sender + .send(Message::Subscribe { + subscribe: subscribe.into(), + params, + unsubscribe: unsubscribe.into(), + response: tx, + retries: self.retries, + }) + .await + .map_err(errors::internal_error)?; + + rx.await.map_err(errors::internal_error)? + } + .with_context(TRACER.context(subscribe.to_string())) + .await + } else { + Err(Error::Call(errors::internal_error("No websocket connection"))) } - .with_context(TRACER.context(subscribe.to_string())) - .await } pub async fn rotate_endpoint(&self) { - self.sender - .send(Message::RotateEndpoint) - .await - .expect("Failed to rotate endpoint"); + if let Some(sender) = self.sender.as_ref() { + sender + .send(Message::RotateEndpoint) + .await + .expect("Failed to rotate endpoint"); + } } /// Returns a future that resolves when the endpoint is rotated. pub async fn on_rotation(&self) { - self.rotation_notify.notified().await + if let Some(rotation_notify) = self.rotation_notify.as_ref() { + rotation_notify.notified().await + } } }