Skip to content

Commit

Permalink
feat: enable SO_REUSEPORT on the listener for tests
Browse files Browse the repository at this point in the history
This could potentially help with port binding issues on the CI server.
Creates a socket2 with SO_REUSEPORT enabled.
  • Loading branch information
praveen-influx committed Oct 9, 2024
1 parent 1f1125c commit 14bc02d
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 5 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ serde_urlencoded = "0.7.0"
serde_with = "3.8.1"
sha2 = "0.10.8"
snap = "1.0.0"
socket2 = "0.5.7"
sqlparser = "0.48.0"
sysinfo = "0.30.8"
test-log = { version = "0.2.16", features = ["trace"] }
Expand Down
1 change: 1 addition & 0 deletions influxdb3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ uuid.workspace = true

# Optional Dependencies
console-subscriber = { version = "0.1.10", optional = true, features = ["parking_lot"] }
socket2 = "0.5.7"

[features]
default = ["jemalloc_replacing_malloc", "azure", "gcp", "aws"]
Expand Down
30 changes: 25 additions & 5 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ use object_store::ObjectStore;
use observability_deps::tracing::*;
use panic_logging::SendPanicsToTracing;
use parquet_file::storage::{ParquetStorage, StorageId};
use socket2::{Domain, Type};
use std::{collections::HashMap, path::Path, str::FromStr};
use std::{num::NonZeroUsize, sync::Arc};
use thiserror::Error;
use tokio::net::TcpListener;
use tokio::net::TcpListener as TokioTcpListener;
use tokio_util::sync::CancellationToken;
use trace_exporters::TracingConfig;
use trace_http::ctx::TraceHeaderParser;
Expand Down Expand Up @@ -460,6 +461,10 @@ pub async fn command(config: Config) -> Result<()> {
Arc::clone(&telemetry_store),
)?;

let sock_addr: std::net::SocketAddr = *config.http_bind_address;

let listener = setup_tokio_tcp_listener(sock_addr)?;

let query_executor = Arc::new(QueryExecutorImpl::new(
write_buffer.catalog(),
Arc::clone(&write_buffer),
Expand All @@ -471,10 +476,6 @@ pub async fn command(config: Config) -> Result<()> {
Arc::clone(&telemetry_store),
));

let listener = TcpListener::bind(*config.http_bind_address)
.await
.map_err(Error::BindAddress)?;

let builder = ServerBuilder::new(common_state)
.max_request_size(config.max_http_request_size)
.write_buffer(write_buffer)
Expand Down Expand Up @@ -552,3 +553,22 @@ fn parse_datafusion_config(

Ok(out)
}

#[cfg(windows)]
fn setup_tokio_tcp_listener(sock_addr: std::net::SocketAddr) -> Result<TokioTcpListener> {
TokioTcpListener::bind(sock_addr)
.await
.map_err(Error::BindAddress)
}

#[cfg(not(windows))]
fn setup_tokio_tcp_listener(sock_addr: std::net::SocketAddr) -> Result<TokioTcpListener> {
let socket = socket2::Socket::new(Domain::IPV4, Type::STREAM, None).expect("create socket");
socket.bind(&sock_addr.into()).expect("bind socket");
socket.set_reuse_port(true).expect("setup reuse port");
socket.listen(1).expect("listening on socket");

let listener: std::net::TcpListener = socket.into();
let listener = TokioTcpListener::from_std(listener);
listener.map_err(Error::BindAddress)
}
1 change: 1 addition & 0 deletions influxdb3_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ serde.workspace = true
serde_json.workspace = true
serde_urlencoded.workspace = true
sha2.workspace = true
socket2.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-util.workspace = true
Expand Down

0 comments on commit 14bc02d

Please sign in to comment.