Skip to content

Commit

Permalink
centralise hard coded strings
Browse files Browse the repository at this point in the history
  • Loading branch information
citrus-it committed Feb 26, 2024
1 parent 3a292d8 commit 3a412e8
Showing 1 changed file with 48 additions and 18 deletions.
66 changes: 48 additions & 18 deletions test-utils/src/dev/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,19 @@ const CLICKHOUSE_TIMEOUT: Duration = Duration::from_secs(30);
// Timeout used when starting a ClickHouse keeper subprocess.
const CLICKHOUSE_KEEPER_TIMEOUT: Duration = Duration::from_secs(30);

// The string to look for in a keeper log file that indiccates that the server
// is ready.
const KEEPER_READY: &'static str = "Server initialized, waiting for quorum";

// The string to look for in a clickhouse log file that indiccates that the
// server is ready.
const CLICKHOUSE_READY: &'static str =
"<Information> Application: Ready for connections";

// The string to look for in a clickhouse log file when trying to determine the
// port number on which it is listening.
const CLICKHOUSE_PORT: &'static str = "Application: Listening for http://[::1]";

/// A `ClickHouseInstance` is used to start and manage a ClickHouse single node server process.
#[derive(Debug)]
pub struct ClickHouseInstance {
Expand Down Expand Up @@ -173,7 +186,12 @@ impl ClickHouseInstance {
let data_path = data_dir.root_path().to_path_buf();
let address = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), port);

let result = wait_for_ready(data_dir.log_path()).await;
let result = wait_for_ready(
data_dir.log_path(),
CLICKHOUSE_TIMEOUT,
CLICKHOUSE_READY,
)
.await;
match result {
Ok(()) => Ok(Self {
data_dir: Some(data_dir),
Expand Down Expand Up @@ -243,7 +261,12 @@ impl ClickHouseInstance {
let data_path = data_dir.root_path().to_path_buf();
let address = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), port);

let result = wait_for_ready(data_dir.keeper_log_path()).await;
let result = wait_for_ready(
data_dir.keeper_log_path(),
CLICKHOUSE_KEEPER_TIMEOUT,
KEEPER_READY,
)
.await;
match result {
Ok(()) => Ok(Self {
data_dir: Some(data_dir),
Expand Down Expand Up @@ -671,17 +694,15 @@ async fn find_clickhouse_port_in_log(
path: &Utf8Path,
) -> Result<u16, ClickHouseError> {
let mut reader = BufReader::new(File::open(path).await?);
const NEEDLE: &str =
"<Information> Application: Listening for http://[::1]";
let mut lines = reader.lines();
loop {
let line = lines.next_line().await?;
match line {
Some(line) => {
if let Some(needle_start) = line.find(NEEDLE) {
if let Some(needle_start) = line.find(CLICKHOUSE_PORT) {
// Our needle ends with `http://[::1]`; we'll split on the
// colon we expect to follow it to find the port.
let address_start = needle_start + NEEDLE.len();
let address_start = needle_start + CLICKHOUSE_PORT.len();
return line[address_start..]
.trim()
.split(':')
Expand All @@ -707,11 +728,12 @@ async fn find_clickhouse_port_in_log(
// Wait for the ClickHouse log file to report it is ready to receive connections
pub async fn wait_for_ready(
log_path: Utf8PathBuf,
timeout: Duration,
needle: &'static str,
) -> Result<(), anyhow::Error> {
let p = poll::wait_for_condition(
|| async {
let result =
discover_ready(&log_path, CLICKHOUSE_KEEPER_TIMEOUT).await;
let result = discover_ready(&log_path, timeout, needle).await;
match result {
Ok(ready) => Ok(ready),
Err(e) => {
Expand All @@ -731,7 +753,7 @@ pub async fn wait_for_ready(
}
},
&Duration::from_millis(500),
&CLICKHOUSE_KEEPER_TIMEOUT,
&timeout,
)
.await
.context("waiting to discover if ClickHouse is ready for connections")?;
Expand All @@ -743,9 +765,10 @@ pub async fn wait_for_ready(
async fn discover_ready(
path: &Utf8Path,
timeout: Duration,
needle: &'static str,
) -> Result<(), ClickHouseError> {
let timeout = Instant::now() + timeout;
tokio::time::timeout_at(timeout, clickhouse_ready_from_log(path))
tokio::time::timeout_at(timeout, clickhouse_ready_from_log(path, needle))
.await
.map_err(|_| ClickHouseError::Timeout)?
}
Expand All @@ -756,15 +779,15 @@ async fn discover_ready(
// should be run under a timeout, or some other mechanism for cancelling it.
async fn clickhouse_ready_from_log(
path: &Utf8Path,
needle: &'static str,
) -> Result<(), ClickHouseError> {
let mut reader = BufReader::new(File::open(path).await?);
const READY: &str = "<Information> Application: Ready for connections";
let mut lines = reader.lines();
loop {
let line = lines.next_line().await?;
match line {
Some(line) => {
if let Some(_) = line.find(READY) {
if let Some(_) = line.find(needle) {
return Ok(());
}
}
Expand All @@ -785,7 +808,7 @@ async fn clickhouse_ready_from_log(
mod tests {
use super::{
discover_local_listening_port, discover_ready, ClickHouseError,
CLICKHOUSE_TIMEOUT,
CLICKHOUSE_PORT, CLICKHOUSE_READY, CLICKHOUSE_TIMEOUT,
};
use camino_tempfile::NamedUtf8TempFile;
use std::process::Stdio;
Expand Down Expand Up @@ -834,14 +857,16 @@ mod tests {
writeln!(file, "A garbage line").unwrap();
writeln!(
file,
"2023.07.31 20:12:38.936192 [ 82373 ] <Information> Application: Ready for connections.",
"2023.07.31 20:12:38.936192 [ 82373 ] <Information> {}",
CLICKHOUSE_READY,
)
.unwrap();
writeln!(file, "Another garbage line").unwrap();
file.flush().unwrap();

assert!(matches!(
discover_ready(file.path(), CLICKHOUSE_TIMEOUT).await,
discover_ready(file.path(), CLICKHOUSE_TIMEOUT, CLICKHOUSE_READY)
.await,
Ok(())
));
}
Expand All @@ -859,7 +884,12 @@ mod tests {
writeln!(file, "Another garbage line").unwrap();
file.flush().unwrap();
assert!(matches!(
discover_ready(file.path(), Duration::from_secs(1)).await,
discover_ready(
file.path(),
Duration::from_secs(1),
CLICKHOUSE_READY
)
.await,
Err(ClickHouseError::Timeout {})
));
}
Expand Down Expand Up @@ -942,13 +972,13 @@ mod tests {
// (https://github.com/oxidecomputer/omicron/issues/3580).
write_and_wait(
&mut file,
"<Information> Application: List".to_string(),
format!("{}", &CLICKHOUSE_PORT[..30]),
writer_interval,
)
.await;
write_and_wait(
&mut file,
format!("ening for http://[::1]:{}\n", EXPECTED_PORT),
format!("{}:{}\n", &CLICKHOUSE_PORT[30..], EXPECTED_PORT),
writer_interval,
)
.await;
Expand Down

0 comments on commit 3a412e8

Please sign in to comment.