Skip to content

Commit

Permalink
Implement ClickHouse replicated cluster option for simulated omicron (#…
Browse files Browse the repository at this point in the history
…4422)

New `--replicated` flag for the `omicron-dev ch-run` command. This will
allow us to spin up a ClickHouse replicated cluster containing 2
replicas and 3 keepers.

```console
$ cargo run --bin omicron-dev -- ch-run --replicated
    Finished dev [unoptimized + debuginfo] target(s) in 0.31s
     Running `target/debug/omicron-dev ch-run --replicated`
omicron-dev: running ClickHouse cluster with configuration files:
 replicas: /home/{user}/src/omicron/oximeter/db/src/configs/replica_config.xml
 keepers: /home/{user}/src/omicron/oximeter/db/src/configs/keeper_config.xml
omicron-dev: ClickHouse cluster is running with PIDs: 1113482, 1113681, 1113387, 1113451, 1113419
omicron-dev: ClickHouse HTTP servers listening on ports: 8123, 8124
omicron-dev: using /tmp/.tmpFH6v8h and /tmp/.tmpkUjDji for ClickHouse data storage
```

Related: #4148
  • Loading branch information
karencfv authored Nov 7, 2023
1 parent 5b6f7df commit bd05e19
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 25 deletions.
126 changes: 125 additions & 1 deletion dev-tools/omicron-dev/src/bin/omicron-dev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,16 +265,28 @@ struct ChRunArgs {
/// The HTTP port on which the server will listen
#[clap(short, long, default_value = "8123", action)]
port: u16,
/// Starts a ClickHouse replicated cluster of 2 replicas and 3 keeper nodes
#[clap(long, conflicts_with = "port", action)]
replicated: bool,
}

async fn cmd_clickhouse_run(args: &ChRunArgs) -> Result<(), anyhow::Error> {
if args.replicated {
start_replicated_cluster().await?;
} else {
start_single_node(args.port).await?;
}
Ok(())
}

async fn start_single_node(port: u16) -> Result<(), anyhow::Error> {
// Start a stream listening for SIGINT
let signals = Signals::new(&[SIGINT]).expect("failed to wait for SIGINT");
let mut signal_stream = signals.fuse();

// Start the database server process, possibly on a specific port
let mut db_instance =
dev::clickhouse::ClickHouseInstance::new_single_node(args.port).await?;
dev::clickhouse::ClickHouseInstance::new_single_node(port).await?;
println!(
"omicron-dev: running ClickHouse with full command:\n\"clickhouse {}\"",
db_instance.cmdline().join(" ")
Expand Down Expand Up @@ -320,6 +332,118 @@ async fn cmd_clickhouse_run(args: &ChRunArgs) -> Result<(), anyhow::Error> {
Ok(())
}

async fn start_replicated_cluster() -> Result<(), anyhow::Error> {
// Start a stream listening for SIGINT
let signals = Signals::new(&[SIGINT]).expect("failed to wait for SIGINT");
let mut signal_stream = signals.fuse();

// Start the database server and keeper processes
let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
let replica_config = manifest_dir
.as_path()
.join("../../oximeter/db/src/configs/replica_config.xml");
let keeper_config = manifest_dir
.as_path()
.join("../../oximeter/db/src/configs/keeper_config.xml");

let mut cluster =
dev::clickhouse::ClickHouseCluster::new(replica_config, keeper_config)
.await?;
println!(
"omicron-dev: running ClickHouse cluster with configuration files:\n \
replicas: {}\n keepers: {}",
cluster.replica_config_path().display(),
cluster.keeper_config_path().display()
);
let pid_error_msg = "Failed to get process PID, it may not have started";
println!(
"omicron-dev: ClickHouse cluster is running with: server PIDs = [{}, {}] \
and keeper PIDs = [{}, {}, {}]",
cluster.replica_1
.pid()
.expect(pid_error_msg),
cluster.replica_2
.pid()
.expect(pid_error_msg),
cluster.keeper_1
.pid()
.expect(pid_error_msg),
cluster.keeper_2
.pid()
.expect(pid_error_msg),
cluster.keeper_3
.pid()
.expect(pid_error_msg),
);
println!(
"omicron-dev: ClickHouse HTTP servers listening on ports: {}, {}",
cluster.replica_1.port(),
cluster.replica_2.port()
);
println!(
"omicron-dev: using {} and {} for ClickHouse data storage",
cluster.replica_1.data_path().display(),
cluster.replica_2.data_path().display()
);

// Wait for the replicas and keepers to exit themselves (an error), or for SIGINT
tokio::select! {
_ = cluster.replica_1.wait_for_shutdown() => {
cluster.replica_1.cleanup().await.context(
format!("clean up {} after shutdown", cluster.replica_1.data_path().display())
)?;
bail!("omicron-dev: ClickHouse replica 1 shutdown unexpectedly");
}
_ = cluster.replica_2.wait_for_shutdown() => {
cluster.replica_2.cleanup().await.context(
format!("clean up {} after shutdown", cluster.replica_2.data_path().display())
)?;
bail!("omicron-dev: ClickHouse replica 2 shutdown unexpectedly");
}
_ = cluster.keeper_1.wait_for_shutdown() => {
cluster.keeper_1.cleanup().await.context(
format!("clean up {} after shutdown", cluster.keeper_1.data_path().display())
)?;
bail!("omicron-dev: ClickHouse keeper 1 shutdown unexpectedly");
}
_ = cluster.keeper_2.wait_for_shutdown() => {
cluster.keeper_2.cleanup().await.context(
format!("clean up {} after shutdown", cluster.keeper_2.data_path().display())
)?;
bail!("omicron-dev: ClickHouse keeper 2 shutdown unexpectedly");
}
_ = cluster.keeper_3.wait_for_shutdown() => {
cluster.keeper_3.cleanup().await.context(
format!("clean up {} after shutdown", cluster.keeper_3.data_path().display())
)?;
bail!("omicron-dev: ClickHouse keeper 3 shutdown unexpectedly");
}
caught_signal = signal_stream.next() => {
assert_eq!(caught_signal.unwrap(), SIGINT);
eprintln!(
"omicron-dev: caught signal, shutting down and removing \
temporary directories"
);

// Remove the data directories.
let mut instances = vec![
cluster.replica_1,
cluster.replica_2,
cluster.keeper_1,
cluster.keeper_2,
cluster.keeper_3,
];
for instance in instances.iter_mut() {
instance
.wait_for_shutdown()
.await
.context(format!("clean up {} after SIGINT shutdown", instance.data_path().display()))?;
};
}
}
Ok(())
}

#[derive(Clone, Debug, Args)]
struct RunAllArgs {
/// Nexus external API listen port. Use `0` to request any available port.
Expand Down
14 changes: 14 additions & 0 deletions docs/how-to-run-simulated.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,20 @@ $ cargo run --bin omicron-dev -- ch-run
omicron-dev: running ClickHouse (PID: 2463), full command is "clickhouse server --log-file /var/folders/67/2tlym22x1r3d2kwbh84j298w0000gn/T/.tmpJ5nhot/clickhouse-server.log --errorlog-file /var/folders/67/2tlym22x1r3d2kwbh84j298w0000gn/T/.tmpJ5nhot/clickhouse-server.errlog -- --http_port 8123 --path /var/folders/67/2tlym22x1r3d2kwbh84j298w0000gn/T/.tmpJ5nhot"
omicron-dev: using /var/folders/67/2tlym22x1r3d2kwbh84j298w0000gn/T/.tmpJ5nhot for ClickHouse data storage
----
+
If you wish to start a ClickHouse replicated cluster instead of a single node, run the following instead:
[source,text]
---
$ cargo run --bin omicron-dev -- ch-run --replicated
Finished dev [unoptimized + debuginfo] target(s) in 0.31s
Running `target/debug/omicron-dev ch-run --replicated`
omicron-dev: running ClickHouse cluster with configuration files:
replicas: /home/{user}/src/omicron/oximeter/db/src/configs/replica_config.xml
keepers: /home/{user}/src/omicron/oximeter/db/src/configs/keeper_config.xml
omicron-dev: ClickHouse cluster is running with PIDs: 1113482, 1113681, 1113387, 1113451, 1113419
omicron-dev: ClickHouse HTTP servers listening on ports: 8123, 8124
omicron-dev: using /tmp/.tmpFH6v8h and /tmp/.tmpkUjDji for ClickHouse data storage
---

. `nexus` requires a configuration file to run. You can use `nexus/examples/config.toml` to start with. Build and run it like this:
+
Expand Down
9 changes: 8 additions & 1 deletion oximeter/db/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1064,7 +1064,14 @@ mod tests {

#[tokio::test]
async fn test_replicated() {
let mut cluster = ClickHouseCluster::new()
let cur_dir = std::env::current_dir().unwrap();
let replica_config =
cur_dir.as_path().join("src/configs/replica_config.xml");
let cur_dir = std::env::current_dir().unwrap();
let keeper_config =
cur_dir.as_path().join("src/configs/keeper_config.xml");

let mut cluster = ClickHouseCluster::new(replica_config, keeper_config)
.await
.expect("Failed to initialise ClickHouse Cluster");

Expand Down
53 changes: 30 additions & 23 deletions test-utils/src/dev/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,6 @@ pub struct ClickHouseInstance {
child: Option<tokio::process::Child>,
}

/// A `ClickHouseCluster` is used to start and manage a 2 replica 3 keeper ClickHouse cluster.
#[derive(Debug)]
pub struct ClickHouseCluster {
pub replica_1: ClickHouseInstance,
pub replica_2: ClickHouseInstance,
pub keeper_1: ClickHouseInstance,
pub keeper_2: ClickHouseInstance,
pub keeper_3: ClickHouseInstance,
}

#[derive(Debug, Error)]
pub enum ClickHouseError {
#[error("Failed to open ClickHouse log file")]
Expand Down Expand Up @@ -330,25 +320,32 @@ impl Drop for ClickHouseInstance {
}
}

/// A `ClickHouseCluster` is used to start and manage a 2 replica 3 keeper ClickHouse cluster.
#[derive(Debug)]
pub struct ClickHouseCluster {
pub replica_1: ClickHouseInstance,
pub replica_2: ClickHouseInstance,
pub keeper_1: ClickHouseInstance,
pub keeper_2: ClickHouseInstance,
pub keeper_3: ClickHouseInstance,
pub replica_config_path: PathBuf,
pub keeper_config_path: PathBuf,
}

impl ClickHouseCluster {
pub async fn new() -> Result<Self, anyhow::Error> {
pub async fn new(
replica_config: PathBuf,
keeper_config: PathBuf,
) -> Result<Self, anyhow::Error> {
// Start all Keeper coordinator nodes
let cur_dir = std::env::current_dir().unwrap();
let keeper_config =
cur_dir.as_path().join("src/configs/keeper_config.xml");

let keeper_amount = 3;
let mut keepers =
Self::new_keeper_set(keeper_amount, keeper_config).await?;
Self::new_keeper_set(keeper_amount, &keeper_config).await?;

// Start all replica nodes
let cur_dir = std::env::current_dir().unwrap();
let replica_config =
cur_dir.as_path().join("src/configs/replica_config.xml");

let replica_amount = 2;
let mut replicas =
Self::new_replica_set(replica_amount, replica_config).await?;
Self::new_replica_set(replica_amount, &replica_config).await?;

let r1 = replicas.swap_remove(0);
let r2 = replicas.swap_remove(0);
Expand All @@ -362,12 +359,14 @@ impl ClickHouseCluster {
keeper_1: k1,
keeper_2: k2,
keeper_3: k3,
replica_config_path: replica_config,
keeper_config_path: keeper_config,
})
}

pub async fn new_keeper_set(
keeper_amount: u16,
config_path: PathBuf,
config_path: &PathBuf,
) -> Result<Vec<ClickHouseInstance>, anyhow::Error> {
let mut keepers = vec![];

Expand All @@ -392,7 +391,7 @@ impl ClickHouseCluster {

pub async fn new_replica_set(
replica_amount: u16,
config_path: PathBuf,
config_path: &PathBuf,
) -> Result<Vec<ClickHouseInstance>, anyhow::Error> {
let mut replicas = vec![];

Expand All @@ -419,6 +418,14 @@ impl ClickHouseCluster {

Ok(replicas)
}

pub fn replica_config_path(&self) -> &Path {
&self.replica_config_path
}

pub fn keeper_config_path(&self) -> &Path {
&self.keeper_config_path
}
}

// Wait for the ClickHouse log file to become available, including the
Expand Down

0 comments on commit bd05e19

Please sign in to comment.