diff --git a/test-utils/src/dev/clickhouse.rs b/test-utils/src/dev/clickhouse.rs index c687ae3c0d..59e9f18097 100644 --- a/test-utils/src/dev/clickhouse.rs +++ b/test-utils/src/dev/clickhouse.rs @@ -29,6 +29,19 @@ const CLICKHOUSE_TIMEOUT: Duration = Duration::from_secs(30); // Timeout used when starting a ClickHouse keeper subprocess. const CLICKHOUSE_KEEPER_TIMEOUT: Duration = Duration::from_secs(30); +// The string to look for in a keeper log file that indiccates that the server +// is ready. +const KEEPER_READY: &'static str = "Server initialized, waiting for quorum"; + +// The string to look for in a clickhouse log file that indiccates that the +// server is ready. +const CLICKHOUSE_READY: &'static str = + " Application: Ready for connections"; + +// The string to look for in a clickhouse log file when trying to determine the +// port number on which it is listening. +const CLICKHOUSE_PORT: &'static str = "Application: Listening for http://[::1]"; + /// A `ClickHouseInstance` is used to start and manage a ClickHouse single node server process. #[derive(Debug)] pub struct ClickHouseInstance { @@ -173,7 +186,12 @@ impl ClickHouseInstance { let data_path = data_dir.root_path().to_path_buf(); let address = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), port); - let result = wait_for_ready(data_dir.log_path()).await; + let result = wait_for_ready( + data_dir.log_path(), + CLICKHOUSE_TIMEOUT, + CLICKHOUSE_READY, + ) + .await; match result { Ok(()) => Ok(Self { data_dir: Some(data_dir), @@ -243,7 +261,12 @@ impl ClickHouseInstance { let data_path = data_dir.root_path().to_path_buf(); let address = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), port); - let result = wait_for_ready(data_dir.keeper_log_path()).await; + let result = wait_for_ready( + data_dir.keeper_log_path(), + CLICKHOUSE_KEEPER_TIMEOUT, + KEEPER_READY, + ) + .await; match result { Ok(()) => Ok(Self { data_dir: Some(data_dir), @@ -671,17 +694,15 @@ async fn find_clickhouse_port_in_log( path: &Utf8Path, ) -> Result { let mut reader = BufReader::new(File::open(path).await?); - const NEEDLE: &str = - " Application: Listening for http://[::1]"; let mut lines = reader.lines(); loop { let line = lines.next_line().await?; match line { Some(line) => { - if let Some(needle_start) = line.find(NEEDLE) { + if let Some(needle_start) = line.find(CLICKHOUSE_PORT) { // Our needle ends with `http://[::1]`; we'll split on the // colon we expect to follow it to find the port. - let address_start = needle_start + NEEDLE.len(); + let address_start = needle_start + CLICKHOUSE_PORT.len(); return line[address_start..] .trim() .split(':') @@ -707,11 +728,12 @@ async fn find_clickhouse_port_in_log( // Wait for the ClickHouse log file to report it is ready to receive connections pub async fn wait_for_ready( log_path: Utf8PathBuf, + timeout: Duration, + needle: &'static str, ) -> Result<(), anyhow::Error> { let p = poll::wait_for_condition( || async { - let result = - discover_ready(&log_path, CLICKHOUSE_KEEPER_TIMEOUT).await; + let result = discover_ready(&log_path, timeout, needle).await; match result { Ok(ready) => Ok(ready), Err(e) => { @@ -731,7 +753,7 @@ pub async fn wait_for_ready( } }, &Duration::from_millis(500), - &CLICKHOUSE_KEEPER_TIMEOUT, + &timeout, ) .await .context("waiting to discover if ClickHouse is ready for connections")?; @@ -743,9 +765,10 @@ pub async fn wait_for_ready( async fn discover_ready( path: &Utf8Path, timeout: Duration, + needle: &'static str, ) -> Result<(), ClickHouseError> { let timeout = Instant::now() + timeout; - tokio::time::timeout_at(timeout, clickhouse_ready_from_log(path)) + tokio::time::timeout_at(timeout, clickhouse_ready_from_log(path, needle)) .await .map_err(|_| ClickHouseError::Timeout)? } @@ -756,15 +779,15 @@ async fn discover_ready( // should be run under a timeout, or some other mechanism for cancelling it. async fn clickhouse_ready_from_log( path: &Utf8Path, + needle: &'static str, ) -> Result<(), ClickHouseError> { let mut reader = BufReader::new(File::open(path).await?); - const READY: &str = " Application: Ready for connections"; let mut lines = reader.lines(); loop { let line = lines.next_line().await?; match line { Some(line) => { - if let Some(_) = line.find(READY) { + if let Some(_) = line.find(needle) { return Ok(()); } } @@ -785,7 +808,7 @@ async fn clickhouse_ready_from_log( mod tests { use super::{ discover_local_listening_port, discover_ready, ClickHouseError, - CLICKHOUSE_TIMEOUT, + CLICKHOUSE_PORT, CLICKHOUSE_READY, CLICKHOUSE_TIMEOUT, }; use camino_tempfile::NamedUtf8TempFile; use std::process::Stdio; @@ -834,14 +857,16 @@ mod tests { writeln!(file, "A garbage line").unwrap(); writeln!( file, - "2023.07.31 20:12:38.936192 [ 82373 ] Application: Ready for connections.", + "2023.07.31 20:12:38.936192 [ 82373 ] {}", + CLICKHOUSE_READY, ) .unwrap(); writeln!(file, "Another garbage line").unwrap(); file.flush().unwrap(); assert!(matches!( - discover_ready(file.path(), CLICKHOUSE_TIMEOUT).await, + discover_ready(file.path(), CLICKHOUSE_TIMEOUT, CLICKHOUSE_READY) + .await, Ok(()) )); } @@ -859,7 +884,12 @@ mod tests { writeln!(file, "Another garbage line").unwrap(); file.flush().unwrap(); assert!(matches!( - discover_ready(file.path(), Duration::from_secs(1)).await, + discover_ready( + file.path(), + Duration::from_secs(1), + CLICKHOUSE_READY + ) + .await, Err(ClickHouseError::Timeout {}) )); } @@ -942,13 +972,13 @@ mod tests { // (https://github.com/oxidecomputer/omicron/issues/3580). write_and_wait( &mut file, - " Application: List".to_string(), + format!("{}", &CLICKHOUSE_PORT[..30]), writer_interval, ) .await; write_and_wait( &mut file, - format!("ening for http://[::1]:{}\n", EXPECTED_PORT), + format!("{}:{}\n", &CLICKHOUSE_PORT[30..], EXPECTED_PORT), writer_interval, ) .await;