Skip to content

Commit

Permalink
Add connection timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
paulgb committed Nov 4, 2024
1 parent 27f1fd9 commit 8f16e3e
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 48 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions dynamic-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ serde = { version = "1.0.210", features = ["derive"] }
tokio = { version = "1.40.0", features = ["macros", "rt-multi-thread"] }
tokio-rustls = "0.26.0"
tracing = "0.1.40"
tower-service = "0.3.3"
futures-util = "0.3.30"

[dev-dependencies]
axum = { version = "0.7.6", features = ["http2", "ws"] }
Expand Down
54 changes: 54 additions & 0 deletions dynamic-proxy/src/connector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use core::task;
use http::Uri;
use hyper_util::{client::legacy::connect::HttpConnector, rt::TokioIo};
use std::{future::Future, pin::Pin, task::Poll, time::Duration};
use tokio::net::TcpStream;

#[derive(Clone)]
pub struct TimeoutHttpConnector {
pub timeout: Duration,
pub connector: HttpConnector,
}

impl Default for TimeoutHttpConnector {
fn default() -> Self {
TimeoutHttpConnector {
timeout: Duration::from_secs(10),
connector: HttpConnector::new(),
}
}
}

impl tower_service::Service<Uri> for TimeoutHttpConnector {
type Response = TokioIo<TcpStream>;
type Error = TimeoutHttpConnectorError;
type Future = Pin<Box<dyn Future<Output = Result<TokioIo<TcpStream>, Self::Error>> + Send>>;

fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
self.connector
.poll_ready(cx)
.map_err(|e| TimeoutHttpConnectorError::Boxed(Box::new(e)))
}

fn call(&mut self, dst: Uri) -> Self::Future {
let fut = self.connector.call(dst);
let timeout = self.timeout;
Box::pin(async move {
let result = tokio::time::timeout(timeout, fut).await;
match result {
Ok(Ok(io)) => Ok(io),
Ok(Err(e)) => Err(TimeoutHttpConnectorError::Boxed(Box::new(e))),
Err(_) => Err(TimeoutHttpConnectorError::Timeout),
}
})
}
}

#[derive(thiserror::Error, Debug)]
pub enum TimeoutHttpConnectorError {
#[error("Timeout")]
Timeout,

#[error("Non-timeout error: {0}")]
Boxed(#[from] Box<dyn std::error::Error + Send + Sync>),
}
1 change: 1 addition & 0 deletions dynamic-proxy/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod body;
pub mod connector;
mod graceful_shutdown;
pub mod https_redirect;
pub mod proxy;
Expand Down
10 changes: 4 additions & 6 deletions dynamic-proxy/src/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
use crate::{
body::{simple_empty_body, to_simple_body, SimpleBody},
connector::TimeoutHttpConnector,
request::should_upgrade,
upgrade::{split_request, split_response, UpgradeHandler},
};
use http::StatusCode;
use hyper::{Request, Response};
use hyper_util::{
client::legacy::{connect::HttpConnector, Client},
rt::TokioExecutor,
};
use hyper_util::{client::legacy::Client, rt::TokioExecutor};
use std::{convert::Infallible, time::Duration};

const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);

/// A client for proxying HTTP requests to an upstream server.
#[derive(Clone)]
pub struct ProxyClient {
client: Client<HttpConnector, SimpleBody>,
client: Client<TimeoutHttpConnector, SimpleBody>,
#[allow(unused)] // TODO: implement this.
timeout: Duration,
}
Expand All @@ -29,7 +27,7 @@ impl Default for ProxyClient {

impl ProxyClient {
pub fn new() -> Self {
let client = Client::builder(TokioExecutor::new()).build(HttpConnector::new());
let client = Client::builder(TokioExecutor::new()).build(TimeoutHttpConnector::default());
Self {
client,
timeout: DEFAULT_TIMEOUT,
Expand Down
42 changes: 0 additions & 42 deletions plane/plane-tests/tests/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,48 +108,6 @@ async fn proxy_backend_unreachable(env: TestEnvironment) {
assert_eq!(response.status(), StatusCode::BAD_GATEWAY);
}

// TODO: Re-enable when timeout is re-implemented. (Paul 2024-10-11)
// #[plane_test]
// async fn proxy_backend_timeout(env: TestEnvironment) {
// // We will start a listener, but never respond on it, to simulate a timeout.
// let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
// .await
// .unwrap();
// let addr = listener.local_addr().unwrap();

// let mut proxy = MockProxy::new().await;
// let port = proxy.port();
// let cluster = ClusterName::from_str(&format!("plane.test:{}", port)).unwrap();
// let url = format!("http://plane.test:{port}/abc123/");
// let client = localhost_client();
// let handle = tokio::spawn(client.get(url).send());

// let route_info_request = proxy.recv_route_info_request().await;
// assert_eq!(
// route_info_request.token,
// BearerToken::from("abc123".to_string())
// );

// proxy
// .send_route_info_response(RouteInfoResponse {
// token: BearerToken::from("abc123".to_string()),
// route_info: Some(RouteInfo {
// backend_id: BackendName::new_random(),
// address: BackendAddr(addr),
// secret_token: SecretToken::from("secret".to_string()),
// cluster,
// user: None,
// user_data: None,
// subdomain: None,
// }),
// })
// .await;

// let response = handle.await.unwrap().unwrap();

// assert_eq!(response.status(), StatusCode::GATEWAY_TIMEOUT);
// }

#[plane_test]
async fn proxy_backend_accepts(env: TestEnvironment) {
let server = SimpleAxumServer::new().await;
Expand Down

0 comments on commit 8f16e3e

Please sign in to comment.