From 3bacf4ac648779415c6a1db25cafbff9c89e2cfc Mon Sep 17 00:00:00 2001 From: Benjamin Naecker Date: Tue, 24 Sep 2024 08:42:50 -0700 Subject: [PATCH] WIP --- test-utils/src/dev/clickhouse.rs | 95 ++++++++++---------------------- 1 file changed, 30 insertions(+), 65 deletions(-) diff --git a/test-utils/src/dev/clickhouse.rs b/test-utils/src/dev/clickhouse.rs index 12fa64869a5..ddc628fdf8a 100644 --- a/test-utils/src/dev/clickhouse.rs +++ b/test-utils/src/dev/clickhouse.rs @@ -541,7 +541,7 @@ impl ClickHouseProcess { } let data_path = data_dir.root_path().to_path_buf(); wait_for_ready( - data_dir.log_path(), + &data_dir.log_path(), CLICKHOUSE_TIMEOUT, CLICKHOUSE_READY, ) @@ -632,7 +632,7 @@ impl ClickHouseProcess { let data_path = data_dir.root_path().to_path_buf(); let address = ipv6_localhost_on(port); wait_for_ready( - data_dir.keeper_log_path(), + &data_dir.keeper_log_path(), CLICKHOUSE_KEEPER_TIMEOUT, KEEPER_READY, ) @@ -1150,36 +1150,8 @@ pub enum NodeKind { pub async fn wait_for_ports( log_path: Utf8PathBuf, ) -> Result { - let p = poll::wait_for_condition( - || async { - let result = - discover_local_listening_ports(&log_path, CLICKHOUSE_TIMEOUT) - .await; - match result { - // Successfully extracted the ports, return them. - Ok(ports) => Ok(ports), - Err(e) => { - match e { - ClickHouseError::Io(ref inner) => { - if matches!( - inner.kind(), - std::io::ErrorKind::NotFound - ) { - return Err(poll::CondCheckError::NotYet); - } - } - _ => {} - } - Err(poll::CondCheckError::from(e)) - } - } - }, - &Duration::from_millis(500), - &CLICKHOUSE_TIMEOUT, - ) - .await - .context("waiting to discover ClickHouse ports")?; - Ok(p) + wait_for_ready(&log_path, CLICKHOUSE_TIMEOUT, CLICKHOUSE_READY).await?; + find_clickhouse_ports_in_log(&log_path).await.context("finding ClickHouse ports in log") } // Parse the ClickHouse log file at the given path, looking for a line @@ -1196,42 +1168,35 @@ async fn discover_local_listening_ports( // Parse the ClickHouse log for the HTTP and native TCP port numbers. // -// NOTE: This function loops forever until the expected lines are found. It -// should be run under a timeout, or some other mechanism for cancelling it. +// # Panics +// +// This function panics if it reaches EOF without discovering the ports. This +// means the function can only be called after `wait_for_ready()` or a similar +// method. async fn find_clickhouse_ports_in_log( path: &Utf8Path, ) -> Result { - let mut reader = BufReader::new(File::open(path).await?); + let reader = BufReader::new(File::open(path).await?); let mut lines = reader.lines(); let mut ports = ClickHousePorts::zero(); - 'line_search: loop { - let line = lines.next_line().await?; - match line { - Some(line) => { - if let Some(http_port) = - find_port_after_needle(&line, CLICKHOUSE_HTTP_PORT_NEEDLE)? - { - ports.http = http_port; - } else if let Some(native_port) = - find_port_after_needle(&line, CLICKHOUSE_TCP_PORT_NEEDLE)? - { - ports.native = native_port; - } else { - continue 'line_search; - } - if !ports.any_zero() { - return Ok(ports); - } - } - None => { - // Reached EOF, just sleep for an interval and check again. - sleep(Duration::from_millis(10)).await; - - // We might have gotten a partial line; close the file, reopen - // it, and start reading again from the beginning. - reader = BufReader::new(File::open(path).await?); - lines = reader.lines(); - } + loop { + let line = lines + .next_line() + .await? + .expect("Reached EOF in ClickHouse log file without discovering ports"); + if let Some(http_port) = + find_port_after_needle(&line, CLICKHOUSE_HTTP_PORT_NEEDLE)? + { + ports.http = http_port; + } else if let Some(native_port) = + find_port_after_needle(&line, CLICKHOUSE_TCP_PORT_NEEDLE)? + { + ports.native = native_port; + } else { + continue; + } + if !ports.any_zero() { + return Ok(ports); } } } @@ -1265,13 +1230,13 @@ fn find_port_after_needle( // Wait for the ClickHouse log file to report it is ready to receive connections pub async fn wait_for_ready( - log_path: Utf8PathBuf, + log_path: &Utf8PathBuf, timeout: Duration, needle: &str, ) -> Result<(), anyhow::Error> { let p = poll::wait_for_condition( || async { - let result = discover_ready(&log_path, timeout, needle).await; + let result = discover_ready(log_path, timeout, needle).await; match result { Ok(ready) => Ok(ready), Err(e) => {