Skip to content

Commit

Permalink
Wait for ClickHouse to report ready before parsing ports
Browse files Browse the repository at this point in the history
This changes the logic for parsing the server ports that ClickHouse
listens on, so that it waits until we are certain that they've been
completely written. This fixes a few flaky tests, currently including at
least #4779, #4972, and #5180. There have been others in the past, which
we've addressed with less-complete solutions that only narrow the race
window. This should eliminate it.
  • Loading branch information
bnaecker committed Sep 24, 2024
1 parent 3bacf4a commit d6cf2e0
Showing 1 changed file with 65 additions and 174 deletions.
239 changes: 65 additions & 174 deletions test-utils/src/dev/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ impl ClickHouseProcess {
// that we don't return from this until the server is actually ready to
// accept connections.
let ports = {
let new_ports = wait_for_ports(data_dir.log_path()).await?;
let new_ports = wait_for_ports(&data_dir.log_path()).await?;
// If either port was specified, add an additional check that we
// recovered exactly that port.
ports.assert_consistent(&new_ports);
Expand Down Expand Up @@ -1148,22 +1148,12 @@ pub enum NodeKind {
// from this function successfully, but the server itself is not yet
// ready to accept connections.
pub async fn wait_for_ports(
log_path: Utf8PathBuf,
log_path: &Utf8Path,
) -> Result<ClickHousePorts, anyhow::Error> {
wait_for_ready(&log_path, CLICKHOUSE_TIMEOUT, CLICKHOUSE_READY).await?;
find_clickhouse_ports_in_log(&log_path).await.context("finding ClickHouse ports in log")
}

// Parse the ClickHouse log file at the given path, looking for a line
// reporting the port number of the HTTP and native TCP servers.
async fn discover_local_listening_ports(
path: &Utf8Path,
timeout: Duration,
) -> Result<ClickHousePorts, ClickHouseError> {
let timeout = Instant::now() + timeout;
tokio::time::timeout_at(timeout, find_clickhouse_ports_in_log(path))
wait_for_ready(log_path, CLICKHOUSE_TIMEOUT, CLICKHOUSE_READY).await?;
find_clickhouse_ports_in_log(log_path)
.await
.map_err(|_| ClickHouseError::Timeout)?
.context("finding ClickHouse ports in log")
}

// Parse the ClickHouse log for the HTTP and native TCP port numbers.
Expand All @@ -1180,10 +1170,9 @@ async fn find_clickhouse_ports_in_log(
let mut lines = reader.lines();
let mut ports = ClickHousePorts::zero();
loop {
let line = lines
.next_line()
.await?
.expect("Reached EOF in ClickHouse log file without discovering ports");
let line = lines.next_line().await?.expect(
"Reached EOF in ClickHouse log file without discovering ports",
);
if let Some(http_port) =
find_port_after_needle(&line, CLICKHOUSE_HTTP_PORT_NEEDLE)?
{
Expand Down Expand Up @@ -1230,7 +1219,7 @@ fn find_port_after_needle(

// Wait for the ClickHouse log file to report it is ready to receive connections
pub async fn wait_for_ready(
log_path: &Utf8PathBuf,
log_path: &Utf8Path,
timeout: Duration,
needle: &str,
) -> Result<(), anyhow::Error> {
Expand Down Expand Up @@ -1309,12 +1298,10 @@ async fn clickhouse_ready_from_log(

#[cfg(test)]
mod tests {
use crate::dev::clickhouse::CLICKHOUSE_TCP_PORT_NEEDLE;

use super::{
discover_local_listening_ports, discover_ready, ClickHouseError,
ClickHousePorts, CLICKHOUSE_HTTP_PORT_NEEDLE, CLICKHOUSE_READY,
CLICKHOUSE_TIMEOUT,
discover_ready, wait_for_ports, ClickHouseError, ClickHousePorts,
CLICKHOUSE_HTTP_PORT_NEEDLE, CLICKHOUSE_READY,
CLICKHOUSE_TCP_PORT_NEEDLE, CLICKHOUSE_TIMEOUT,
};
use camino_tempfile::NamedUtf8TempFile;
use std::process::Stdio;
Expand All @@ -1336,8 +1323,9 @@ mod tests {
}

#[tokio::test]
async fn test_discover_local_listening_ports() {
// Write some data to a fake log file
async fn wait_for_ports_finds_actual_ports() {
// Test the nominal case, where ClickHouse writes out the lines with
// the ports, and then the sentinel indicating readiness.
let mut file = NamedUtf8TempFile::new().unwrap();
writeln!(file, "A garbage line").unwrap();
writeln!(
Expand All @@ -1347,15 +1335,59 @@ mod tests {
)
.unwrap();
writeln!(file, "Another garbage line").unwrap();
writeln!(file, "{}:{}", CLICKHOUSE_TCP_PORT_NEEDLE, EXPECTED_TCP_PORT,)
writeln!(file, "{}:{}", CLICKHOUSE_TCP_PORT_NEEDLE, EXPECTED_TCP_PORT)
.unwrap();
writeln!(file, "Yet another garbage line").unwrap();
writeln!(file, "{}", CLICKHOUSE_READY).unwrap();
file.flush().unwrap();
let ports = wait_for_ports(&file.path()).await.unwrap();
assert_eq!(ports.http, EXPECTED_HTTP_PORT);
assert_eq!(ports.native, EXPECTED_TCP_PORT);
}

let ports =
discover_local_listening_ports(file.path(), CLICKHOUSE_TIMEOUT)
.await
.unwrap();
#[should_panic]
#[tokio::test]
async fn wait_for_ports_panics_with_sentinel_but_no_ports() {
let mut file = NamedUtf8TempFile::new().unwrap();
writeln!(file, "A garbage line").unwrap();
writeln!(
file,
"{}:{}",
CLICKHOUSE_HTTP_PORT_NEEDLE, EXPECTED_HTTP_PORT,
)
.unwrap();
writeln!(file, "Another garbage line").unwrap();
writeln!(file, "{}", CLICKHOUSE_READY).unwrap();
file.flush().unwrap();
wait_for_ports(&file.path()).await.unwrap();
}

#[tokio::test]
async fn wait_for_ports_waits_for_sentinel_line() {
let file = Arc::new(Mutex::new(NamedUtf8TempFile::new().unwrap()));
// Start a task that slowly writes lines into the file. This ensures
// that we wait for a while until the sentinel line is written.
let file_ = file.clone();
spawn(async move {
for line in [
String::from("A garbage line"),
format!(
"{}:{}",
CLICKHOUSE_HTTP_PORT_NEEDLE, EXPECTED_HTTP_PORT
),
String::from("Another garbage line"),
format!("{}:{}", CLICKHOUSE_TCP_PORT_NEEDLE, EXPECTED_TCP_PORT),
String::from(CLICKHOUSE_READY),
] {
{
let mut f = file_.lock().await;
writeln!(f, "{}", line).unwrap();
f.flush().unwrap();
}
sleep(Duration::from_millis(100)).await;
}
});
let path = file.lock().await.path().to_owned();
let ports = wait_for_ports(&path).await.unwrap();
assert_eq!(ports.http, EXPECTED_HTTP_PORT);
assert_eq!(ports.native, EXPECTED_TCP_PORT);
}
Expand Down Expand Up @@ -1404,147 +1436,6 @@ mod tests {
));
}

// A regression test for #131.
//
// The function `discover_local_listening_ports` initially read from the log
// file until EOF, but there's no guarantee that ClickHouse has written the
// port we're searching for before the reader consumes the whole file. This
// test confirms that the file is read until the line is found, ignoring
// EOF, at least until the timeout is hit.
#[tokio::test]
async fn test_discover_local_listening_ports_slow_write() {
// In this case the writer is slightly "slower" than the reader.
let writer_interval = Duration::from_millis(20);
let ports =
read_log_file(CLICKHOUSE_TIMEOUT, writer_interval).await.unwrap();
assert_eq!(ports.http, EXPECTED_HTTP_PORT);
assert_eq!(ports.native, EXPECTED_TCP_PORT);
}

// An extremely slow write test, to verify the timeout handling.
#[tokio::test]
async fn test_discover_local_listening_ports_timeout() {
// In this case, the writer is _much_ slower than the reader, so that the reader times out
// entirely before finding the desired line.
let reader_timeout = Duration::from_millis(1);
let writer_interval = Duration::from_millis(100);
assert!(read_log_file(reader_timeout, writer_interval).await.is_err());
}

// Implementation of the above tests, simulating simultaneous
// reading/writing of the log file
//
// This uses Tokio's test utilities to manage time, rather than relying on
// timeouts.
async fn read_log_file(
reader_timeout: Duration,
writer_interval: Duration,
) -> Result<ClickHousePorts, ClickHouseError> {
async fn write_and_wait(
file: &mut NamedUtf8TempFile,
line: String,
interval: Duration,
) {
println!(
"Writing to log file: {:?}, contents: '{}'",
file.path(),
line
);
write!(file, "{}", line).unwrap();
file.flush().unwrap();
sleep(interval).await;
}

// Start a task that slowly writes lines to the log file.
//
// NOTE: This looks overly complicated, and it is. We have to wrap this
// in a mutex because both this function, and the writer task we're
// spawning, need access to the file. They may complete in any order,
// and so it's not possible to give one of them ownership over the
// `NamedTempFile`. If the owning task completes, that may delete the
// file before the other task accesses it. So we need interior
// mutability (because one of the references is mutable for writing),
// and _this_ scope must own it.
let file = Arc::new(Mutex::new(NamedUtf8TempFile::new()?));
let path = file.lock().await.path().to_path_buf();
let writer_file = file.clone();
let writer_task = spawn(async move {
let mut file = writer_file.lock().await;
write_and_wait(
&mut file,
"A garbage line\n".to_string(),
writer_interval,
)
.await;

// Ensure we can still parse the line even if our buf reader hits
// EOF in the middle of the line
// (https://github.com/oxidecomputer/omicron/issues/3580).
write_and_wait(
&mut file,
(&CLICKHOUSE_HTTP_PORT_NEEDLE[..30]).to_string(),
writer_interval,
)
.await;
write_and_wait(
&mut file,
format!(
"{}:{}\n",
&CLICKHOUSE_HTTP_PORT_NEEDLE[30..],
EXPECTED_HTTP_PORT
),
writer_interval,
)
.await;
write_and_wait(
&mut file,
"Another garbage line\n".to_string(),
writer_interval,
)
.await;
write_and_wait(
&mut file,
format!(
"{}:{}\n",
&CLICKHOUSE_TCP_PORT_NEEDLE, EXPECTED_TCP_PORT
),
writer_interval,
)
.await;
write_and_wait(
&mut file,
"Yet another line of junk\n".to_string(),
writer_interval,
)
.await;
});
println!("Starting reader task");
let reader_task = discover_local_listening_ports(&path, reader_timeout);

// "Run" the test.
//
// Note that the futures for the reader/writer tasks must be pinned to
// the stack, so that they may be polled on multiple passes through the
// select loop without consuming them.
tokio::pin!(writer_task);
tokio::pin!(reader_task);
let mut poll_writer = true;
let reader_result = loop {
tokio::select! {
reader_result = &mut reader_task => {
println!("Reader finished");
break reader_result;
},
writer_result = &mut writer_task, if poll_writer => {
println!("Writer finished");
let _ = writer_result.unwrap();
poll_writer = false;
},
}
};
reader_result
}

#[test]
fn test_clickhouse_ports_assert_consistent() {
let second = ClickHousePorts { http: 1, native: 1 };
Expand Down

0 comments on commit d6cf2e0

Please sign in to comment.