Skip to content

Commit

Permalink
Http Upstream client
Browse files Browse the repository at this point in the history
This pull request introduces the HTTP Upstream client. The main goal is to be
backward compatible and introduce an HTTP upstream client.

If the config has any HTTP/s server to connect to, the HttpClient struct will
handle them. Any WebSocket will use the existing upstream client code.

If one or more HTTP clients are configured, they route all requests, and
Websocket clients will be used for subscriptions.

If no HTTP clients are configured, the fallback behavior is used, and all
requests and subscriptions are routed through the WebSocket upstream client.

If no WebSocket upstream clients are configured, then subscriptions are not
enabled, only method requests.
  • Loading branch information
cesar-startale committed Jul 15, 2024
1 parent a97673b commit dbbd473
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 45 deletions.
61 changes: 61 additions & 0 deletions src/extensions/client/http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use crate::middlewares::CallResult;
use jsonrpsee::{
core::{
client::{ClientT, Error},
JsonValue,
},
http_client::HttpClient as RpcClient,
types::{error::INTERNAL_ERROR_CODE, ErrorObject},
};
use std::sync::atomic::{AtomicUsize, Ordering};

/// Very simple struct to have a set of JsonRpsee HTTP clients and send requests to them
pub struct HttpClient {
clients: Vec<RpcClient>,
last_sent: AtomicUsize,
}

impl HttpClient {
pub fn new(endpoints: Vec<String>) -> Result<(Option<Self>, Vec<String>), Error> {
let mut other_urls = vec![];
let clients = endpoints
.into_iter()
.filter_map(|url| {
let t_url = url.to_lowercase();
if t_url.starts_with("http://") || t_url.starts_with("https://") {
Some(RpcClient::builder().build(url))
} else {
other_urls.push(url);
None
}
})
.collect::<Result<Vec<_>, _>>()?;

if clients.is_empty() {
Ok((None, other_urls))
} else {
Ok((
Some(Self {
clients,
last_sent: AtomicUsize::new(0),
}),
other_urls,
))
}
}

/// Sends a request to one of the clients
///
/// The client is selected in a round-robin fashion as fair as possible
pub async fn request(&self, method: &str, params: Vec<JsonValue>) -> CallResult {
let client_id = self.last_sent.fetch_add(1, Ordering::Relaxed) % self.clients.len();

self.clients[client_id]
.request(method, params)
.await
.map_err(|e| match e {
jsonrpsee::core::client::Error::Call(e) => e,
e => ErrorObject::owned(INTERNAL_ERROR_CODE, e.to_string(), None::<String>),
})
}
}
124 changes: 79 additions & 45 deletions src/extensions/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::{
utils::{self, errors},
};

mod http;
#[cfg(test)]
pub mod mock;
#[cfg(test)]
Expand All @@ -38,15 +39,18 @@ const TRACER: utils::telemetry::Tracer = utils::telemetry::Tracer::new("client")

pub struct Client {
endpoints: Vec<String>,
sender: tokio::sync::mpsc::Sender<Message>,
rotation_notify: Arc<Notify>,
http_client: Option<http::HttpClient>,
sender: Option<tokio::sync::mpsc::Sender<Message>>,
rotation_notify: Option<Arc<Notify>>,
retries: u32,
background_task: tokio::task::JoinHandle<()>,
background_task: Option<tokio::task::JoinHandle<()>>,
}

impl Drop for Client {
fn drop(&mut self) {
self.background_task.abort();
if let Some(background_task) = self.background_task.take() {
background_task.abort();
}
}
}

Expand Down Expand Up @@ -152,20 +156,33 @@ impl Client {
retries: Option<u32>,
) -> Result<Self, anyhow::Error> {
let endpoints: Vec<_> = endpoints.into_iter().map(|e| e.as_ref().to_string()).collect();
let endpoints_ = endpoints.clone();

if endpoints.is_empty() {
return Err(anyhow!("No endpoints provided"));
}

tracing::debug!("New client with endpoints: {:?}", endpoints);
let (http_client, ws_endpoints) = http::HttpClient::new(endpoints)?;

if ws_endpoints.is_empty() {
return Ok(Self {
http_client,
endpoints: endpoints_,
sender: None, // No websocket
rotation_notify: None,
retries: retries.unwrap_or(3),
background_task: None,
});
}

tracing::debug!("New client with endpoints: {:?}", ws_endpoints);

let (message_tx, mut message_rx) = tokio::sync::mpsc::channel::<Message>(100);

let message_tx_bg = message_tx.clone();

let rotation_notify = Arc::new(Notify::new());
let rotation_notify_bg = rotation_notify.clone();
let endpoints_ = endpoints.clone();

let background_task = tokio::spawn(async move {
let connect_backoff_counter = Arc::new(AtomicU32::new(0));
Expand All @@ -177,7 +194,7 @@ impl Client {
let build_ws = || async {
let build = || {
let current_endpoint = current_endpoint.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let url = &endpoints[current_endpoint % endpoints.len()];
let url = &ws_endpoints[current_endpoint % ws_endpoints.len()];

tracing::info!("Connecting to endpoint: {}", url);

Expand Down Expand Up @@ -414,11 +431,12 @@ impl Client {
}

Ok(Self {
http_client,
endpoints: endpoints_,
sender: message_tx,
rotation_notify,
sender: Some(message_tx),
rotation_notify: Some(rotation_notify),
retries: retries.unwrap_or(3),
background_task,
background_task: Some(background_task),
})
}

Expand All @@ -431,22 +449,30 @@ impl Client {
}

pub async fn request(&self, method: &str, params: Vec<JsonValue>) -> CallResult {
async move {
let (tx, rx) = tokio::sync::oneshot::channel();
self.sender
.send(Message::Request {
method: method.into(),
params,
response: tx,
retries: self.retries,
})
.await
.map_err(errors::internal_error)?;
if let Some(http_client) = &self.http_client {
return http_client.request(method, params).await;
}

rx.await.map_err(errors::internal_error)?.map_err(errors::map_error)
if let Some(sender) = self.sender.as_ref() {
async move {
let (tx, rx) = tokio::sync::oneshot::channel();
sender
.send(Message::Request {
method: method.into(),
params,
response: tx,
retries: self.retries,
})
.await
.map_err(errors::internal_error)?;

rx.await.map_err(errors::internal_error)?.map_err(errors::map_error)
}
.with_context(TRACER.context(method.to_string()))
.await
} else {
Err(errors::internal_error("No sender"))
}
.with_context(TRACER.context(method.to_string()))
.await
}

pub async fn subscribe(
Expand All @@ -455,35 +481,43 @@ impl Client {
params: Vec<JsonValue>,
unsubscribe: &str,
) -> Result<Subscription<JsonValue>, Error> {
async move {
let (tx, rx) = tokio::sync::oneshot::channel();
self.sender
.send(Message::Subscribe {
subscribe: subscribe.into(),
params,
unsubscribe: unsubscribe.into(),
response: tx,
retries: self.retries,
})
.await
.map_err(errors::internal_error)?;

rx.await.map_err(errors::internal_error)?
if let Some(sender) = self.sender.as_ref() {
async move {
let (tx, rx) = tokio::sync::oneshot::channel();
sender
.send(Message::Subscribe {
subscribe: subscribe.into(),
params,
unsubscribe: unsubscribe.into(),
response: tx,
retries: self.retries,
})
.await
.map_err(errors::internal_error)?;

rx.await.map_err(errors::internal_error)?
}
.with_context(TRACER.context(subscribe.to_string()))
.await
} else {
Err(Error::Call(errors::internal_error("No websocket connection")))
}
.with_context(TRACER.context(subscribe.to_string()))
.await
}

pub async fn rotate_endpoint(&self) {
self.sender
.send(Message::RotateEndpoint)
.await
.expect("Failed to rotate endpoint");
if let Some(sender) = self.sender.as_ref() {
sender
.send(Message::RotateEndpoint)
.await
.expect("Failed to rotate endpoint");
}
}

/// Returns a future that resolves when the endpoint is rotated.
pub async fn on_rotation(&self) {
self.rotation_notify.notified().await
if let Some(rotation_notify) = self.rotation_notify.as_ref() {
rotation_notify.notified().await
}
}
}

Expand Down

0 comments on commit dbbd473

Please sign in to comment.