Skip to content

Commit

Permalink
fix mem leak - drop the JoinSets in the dynamic proxy connection loops (
Browse files Browse the repository at this point in the history
#858)

Ran locally and watched memory usage with `htop` while making a stream
of requests to the proxy. Before this change, memory continued to
increase with each request. After this change, memory rose slightly but
then plateaued indefinitely.
  • Loading branch information
rolyatmax authored Jan 6, 2025
1 parent 31f2cf5 commit a9d5ccf
Showing 1 changed file with 6 additions and 25 deletions.
31 changes: 6 additions & 25 deletions dynamic-proxy/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{
sync::Arc,
time::Duration,
};
use tokio::{net::TcpListener, select, task::JoinSet};
use tokio::{net::TcpListener, select};
use tokio_rustls::TlsAcceptor;

/// Header which passes the client's IP address to the backend.
Expand All @@ -27,23 +27,17 @@ const X_FORWARDED_PROTO: &str = "x-forwarded-proto";
/// The server can be configured to listen for either HTTP and HTTPS,
/// and supports graceful shutdown and x-forwarded-* headers.
pub struct SimpleHttpServer {
handle: tokio::task::JoinHandle<JoinSet<()>>,
handle: tokio::task::JoinHandle<()>,
graceful_shutdown: Option<GracefulShutdown>,
}

#[must_use] // Otherwise, the tasks we started would be stopped as soon as the graceful shutdown is initiated.
async fn listen_loop<S>(
listener: TcpListener,
service: S,
graceful_shutdown: GracefulShutdown,
) -> JoinSet<()>
async fn listen_loop<S>(listener: TcpListener, service: S, graceful_shutdown: GracefulShutdown)
where
S: Service<Request<Incoming>, Response = Response<SimpleBody>> + Clone + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
let mut recv = graceful_shutdown.subscribe();
let mut join_set = JoinSet::new();

loop {
let stream = select! {
Expand All @@ -66,27 +60,20 @@ where
let conn = server.serve_connection_with_upgrades(io, service);

let conn = graceful_shutdown.watch(conn.into_owned());
join_set.spawn(async {
tokio::spawn(async {
if let Err(e) = conn.await {
tracing::warn!(?e, "Failed to serve connection.");
}
});
}

// Even though join_set is never used, we return it to keep it from being dropped
// until the graceful shutdown (or timeout) is complete. Otherwise, the tasks we started
// would be stopped as soon as the graceful shutdown is initiated.
join_set
}

#[must_use] // Otherwise, the tasks we started would be stopped as soon as the graceful shutdown is initiated.
async fn listen_loop_tls<S>(
listener: TcpListener,
service: S,
resolver: Arc<dyn ResolvesServerCert>,
graceful_shutdown: GracefulShutdown,
) -> JoinSet<()>
where
) where
S: Service<Request<Incoming>, Response = Response<SimpleBody>> + Clone + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
Expand All @@ -96,7 +83,6 @@ where
.with_cert_resolver(resolver);
let tls_acceptor = TlsAcceptor::from(Arc::new(server_config));
let mut recv = graceful_shutdown.subscribe();
let mut join_set = JoinSet::new();

loop {
let stream = select! {
Expand All @@ -116,7 +102,7 @@ where
let tls_acceptor = tls_acceptor.clone();

let graceful_shutdown = graceful_shutdown.clone();
join_set.spawn(async move {
tokio::spawn(async move {
let server = ServerBuilder::new(TokioExecutor::new());

let stream = match tls_acceptor.accept(stream).await {
Expand All @@ -136,11 +122,6 @@ where
}
});
}

// Even though join_set is never used, we return it to keep it from being dropped
// until the graceful shutdown (or timeout) is complete. Otherwise, the tasks we started
// would be stopped as soon as the graceful shutdown is initiated.
join_set
}

pub enum HttpsConfig {
Expand Down

0 comments on commit a9d5ccf

Please sign in to comment.