Skip to content

Commit

Permalink
Add native TCP ports to test ClickHouseDeployments (#6603)
Browse files Browse the repository at this point in the history
This lets ClickHouse use a random OS port for its native TCP protocol.
Fixes #6592.
  • Loading branch information
bnaecker authored Sep 19, 2024
1 parent 430e1f6 commit fb5d793
Show file tree
Hide file tree
Showing 8 changed files with 440 additions and 187 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions dev-tools/ch-dev/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ clap.workspace = true
dropshot.workspace = true
futures.workspace = true
libc.workspace = true
omicron-common.workspace = true
omicron-test-utils.workspace = true
omicron-workspace-hack.workspace = true
# See omicron-rpaths for more about the "pq-sys" dependency.
Expand Down
52 changes: 33 additions & 19 deletions dev-tools/ch-dev/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use clap::{Args, Parser, Subcommand};
use dropshot::test_util::LogContext;
use futures::StreamExt;
use libc::SIGINT;
use omicron_test_utils::dev;
use omicron_common::address::{CLICKHOUSE_HTTP_PORT, CLICKHOUSE_TCP_PORT};
use omicron_test_utils::dev::{self, clickhouse::ClickHousePorts};
use signal_hook_tokio::Signals;

#[tokio::main]
Expand Down Expand Up @@ -43,10 +44,13 @@ enum ChDevCmd {
#[derive(Clone, Debug, Args)]
struct ChRunArgs {
/// The HTTP port on which the server will listen
#[clap(short, long, default_value = "8123", action)]
port: u16,
#[clap(short = 'H', long, default_value_t = CLICKHOUSE_HTTP_PORT, action)]
http_port: u16,
/// The port on which the native protocol server will listen
#[clap(short, long, default_value_t = CLICKHOUSE_TCP_PORT, action)]
native_port: u16,
/// Starts a ClickHouse replicated cluster of 2 replicas and 3 keeper nodes
#[clap(long, conflicts_with = "port", action)]
#[clap(long, conflicts_with_all = ["http_port", "native_port"], action)]
replicated: bool,
}

Expand All @@ -61,26 +65,31 @@ impl ChRunArgs {
if self.replicated {
start_replicated_cluster(&logctx).await?;
} else {
start_single_node(&logctx, self.port).await?;
start_single_node(&logctx, self.http_port, self.native_port)
.await?;
}
Ok(())
}
}

async fn start_single_node(
logctx: &LogContext,
port: u16,
http_port: u16,
native_port: u16,
) -> Result<(), anyhow::Error> {
// Start a stream listening for SIGINT
let signals = Signals::new(&[SIGINT]).expect("failed to wait for SIGINT");
let mut signal_stream = signals.fuse();

// Start the database server process, possibly on a specific port
let ports = ClickHousePorts::new(http_port, native_port)?;
let mut deployment =
dev::clickhouse::ClickHouseDeployment::new_single_node(logctx, port)
.await?;
dev::clickhouse::ClickHouseDeployment::new_single_node_with_ports(
logctx, ports,
)
.await?;
let db_instance = deployment
.instances()
.replicas()
.next()
.expect("Should have launched a ClickHouse instance");
println!(
Expand All @@ -99,12 +108,13 @@ async fn start_single_node(
);
println!(
"ch-dev: ClickHouse HTTP server listening on port {}",
db_instance.port()
db_instance.http_address.port()
);
println!(
"ch-dev: ClickHouse data stored in: [{}]",
db_instance.data_path()
"ch-dev: ClickHouse Native server listening on port {}",
db_instance.native_address.port()
);
println!("ch-dev: ClickHouse data stored in: {}", db_instance.data_path());

// Wait for the DB to exit itself (an error), or for SIGINT
tokio::select! {
Expand Down Expand Up @@ -164,27 +174,31 @@ async fn start_replicated_cluster(
cluster.replica_config_path().unwrap().display(),
cluster.keeper_config_path().unwrap().display()
);
for instance in cluster.instances() {
for replica in cluster.replicas() {
println!(
"ch-dev: running ClickHouse replica with full command:\
\n\"clickhouse {}\"",
instance.cmdline().join(" ")
replica.cmdline().join(" ")
);
println!("ch-dev: ClickHouse replica environment:");
for (k, v) in instance.environment() {
for (k, v) in replica.environment() {
println!("\t{k}={v}");
}
println!(
"ch-dev: ClickHouse replica PID is {}",
instance.pid().context("Failed to get instance PID")?
replica.pid().context("Failed to get instance PID")?
);
println!(
"ch-dev: ClickHouse replica data path is {}",
instance.data_path(),
replica.data_path(),
);
println!(
"ch-dev: ClickHouse replica HTTP server is listening on port {}",
instance.address.port(),
replica.http_address.port(),
);
println!(
"ch-dev: ClickHouse replica Native server is listening on port {}",
replica.native_address.port(),
);
}
for keeper in cluster.keepers() {
Expand All @@ -206,7 +220,7 @@ async fn start_replicated_cluster(
keeper.data_path(),
);
println!(
"ch-dev: ClickHouse Keeper HTTP server is listening on port {}",
"ch-dev: ClickHouse Keeper server is listening on port {}",
keeper.address.port(),
);
}
Expand Down
2 changes: 1 addition & 1 deletion nexus/benches/setup_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async fn do_clickhouse_setup() {
let cfg = nexus_test_utils::load_test_config();
let logctx = LogContext::new("clickhouse_setup", &cfg.pkg.log);
let mut clickhouse =
dev::clickhouse::ClickHouseDeployment::new_single_node(&logctx, 0)
dev::clickhouse::ClickHouseDeployment::new_single_node(&logctx)
.await
.unwrap();
clickhouse.cleanup().await.unwrap();
Expand Down
1 change: 0 additions & 1 deletion nexus/test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,6 @@ impl<'a, N: NexusServer> ControlPlaneTestContextBuilder<'a, N> {
let clickhouse =
dev::clickhouse::ClickHouseDeployment::new_single_node(
&self.logctx,
0,
)
.await
.unwrap();
Expand Down
18 changes: 9 additions & 9 deletions oximeter/db/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1281,7 +1281,7 @@ mod tests {
#[tokio::test]
async fn test_single_node() {
let logctx = test_setup_log("test_single_node");
let mut db = ClickHouseDeployment::new_single_node(&logctx, 0)
let mut db = ClickHouseDeployment::new_single_node(&logctx)
.await
.expect("Failed to start ClickHouse");
bad_db_connection_test().await.unwrap();
Expand Down Expand Up @@ -3508,7 +3508,7 @@ mod tests {
const TEST_NAME: &str = "test_apply_one_schema_upgrade_single_node";
let logctx = test_setup_log(TEST_NAME);
let log = &logctx.log;
let mut db = ClickHouseDeployment::new_single_node(&logctx, 0)
let mut db = ClickHouseDeployment::new_single_node(&logctx)
.await
.expect("Failed to start ClickHouse");
let address = db.http_address().into();
Expand All @@ -3522,7 +3522,7 @@ mod tests {
let logctx =
test_setup_log("test_ensure_schema_with_version_gaps_fails");
let log = &logctx.log;
let mut db = ClickHouseDeployment::new_single_node(&logctx, 0)
let mut db = ClickHouseDeployment::new_single_node(&logctx)
.await
.expect("Failed to start ClickHouse");
let address = db.http_address().into();
Expand Down Expand Up @@ -3565,7 +3565,7 @@ mod tests {
"test_ensure_schema_with_missing_desired_schema_version_fails",
);
let log = &logctx.log;
let mut db = ClickHouseDeployment::new_single_node(&logctx, 0)
let mut db = ClickHouseDeployment::new_single_node(&logctx)
.await
.expect("Failed to start ClickHouse");
let address = db.http_address().into();
Expand Down Expand Up @@ -3698,7 +3698,7 @@ mod tests {
"test_ensure_schema_walks_through_multiple_steps_single_node";
let logctx = test_setup_log(TEST_NAME);
let log = &logctx.log;
let mut db = ClickHouseDeployment::new_single_node(&logctx, 0)
let mut db = ClickHouseDeployment::new_single_node(&logctx)
.await
.expect("Failed to start ClickHouse");
let address = db.http_address().into();
Expand Down Expand Up @@ -3795,7 +3795,7 @@ mod tests {
let logctx = test_setup_log("test_select_all_field_types");
let log = &logctx.log;

let mut db = ClickHouseDeployment::new_single_node(&logctx, 0)
let mut db = ClickHouseDeployment::new_single_node(&logctx)
.await
.expect("Failed to start ClickHouse");
let address = db.http_address().into();
Expand Down Expand Up @@ -3827,7 +3827,7 @@ mod tests {
async fn test_sql_query_output() {
let logctx = test_setup_log("test_sql_query_output");
let log = &logctx.log;
let mut db = ClickHouseDeployment::new_single_node(&logctx, 0)
let mut db = ClickHouseDeployment::new_single_node(&logctx)
.await
.expect("Failed to start ClickHouse");
let address = db.http_address().into();
Expand Down Expand Up @@ -3976,7 +3976,7 @@ mod tests {
let mut db = if replicated {
create_cluster(&logctx).await
} else {
ClickHouseDeployment::new_single_node(&logctx, 0)
ClickHouseDeployment::new_single_node(&logctx)
.await
.expect("Failed to start ClickHouse")
};
Expand Down Expand Up @@ -4167,7 +4167,7 @@ mod tests {
const TEST_NAME: &str = "test_expunge_timeseries_by_name_single_node";
let logctx = test_setup_log(TEST_NAME);
let log = &logctx.log;
let mut db = ClickHouseDeployment::new_single_node(&logctx, 0)
let mut db = ClickHouseDeployment::new_single_node(&logctx)
.await
.expect("Failed to start ClickHouse");
test_expunge_timeseries_by_name_impl(
Expand Down
2 changes: 1 addition & 1 deletion oximeter/db/src/client/oxql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1236,7 +1236,7 @@ mod tests {

async fn setup_oxql_test(name: &str) -> TestContext {
let logctx = test_setup_log(name);
let db = ClickHouseDeployment::new_single_node(&logctx, 0)
let db = ClickHouseDeployment::new_single_node(&logctx)
.await
.expect("Failed to start ClickHouse");
let client = Client::new(db.http_address().into(), &logctx.log);
Expand Down
Loading

0 comments on commit fb5d793

Please sign in to comment.