Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
bnaecker committed Sep 24, 2024
1 parent 45813be commit 3bacf4a
Showing 1 changed file with 30 additions and 65 deletions.
95 changes: 30 additions & 65 deletions test-utils/src/dev/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ impl ClickHouseProcess {
}
let data_path = data_dir.root_path().to_path_buf();
wait_for_ready(
data_dir.log_path(),
&data_dir.log_path(),
CLICKHOUSE_TIMEOUT,
CLICKHOUSE_READY,
)
Expand Down Expand Up @@ -632,7 +632,7 @@ impl ClickHouseProcess {
let data_path = data_dir.root_path().to_path_buf();
let address = ipv6_localhost_on(port);
wait_for_ready(
data_dir.keeper_log_path(),
&data_dir.keeper_log_path(),
CLICKHOUSE_KEEPER_TIMEOUT,
KEEPER_READY,
)
Expand Down Expand Up @@ -1150,36 +1150,8 @@ pub enum NodeKind {
pub async fn wait_for_ports(
log_path: Utf8PathBuf,
) -> Result<ClickHousePorts, anyhow::Error> {
let p = poll::wait_for_condition(
|| async {
let result =
discover_local_listening_ports(&log_path, CLICKHOUSE_TIMEOUT)
.await;
match result {
// Successfully extracted the ports, return them.
Ok(ports) => Ok(ports),
Err(e) => {
match e {
ClickHouseError::Io(ref inner) => {
if matches!(
inner.kind(),
std::io::ErrorKind::NotFound
) {
return Err(poll::CondCheckError::NotYet);
}
}
_ => {}
}
Err(poll::CondCheckError::from(e))
}
}
},
&Duration::from_millis(500),
&CLICKHOUSE_TIMEOUT,
)
.await
.context("waiting to discover ClickHouse ports")?;
Ok(p)
wait_for_ready(&log_path, CLICKHOUSE_TIMEOUT, CLICKHOUSE_READY).await?;
find_clickhouse_ports_in_log(&log_path).await.context("finding ClickHouse ports in log")
}

// Parse the ClickHouse log file at the given path, looking for a line
Expand All @@ -1196,42 +1168,35 @@ async fn discover_local_listening_ports(

// Parse the ClickHouse log for the HTTP and native TCP port numbers.
//
// NOTE: This function loops forever until the expected lines are found. It
// should be run under a timeout, or some other mechanism for cancelling it.
// # Panics
//
// This function panics if it reaches EOF without discovering the ports. This
// means the function can only be called after `wait_for_ready()` or a similar
// method.
async fn find_clickhouse_ports_in_log(
path: &Utf8Path,
) -> Result<ClickHousePorts, ClickHouseError> {
let mut reader = BufReader::new(File::open(path).await?);
let reader = BufReader::new(File::open(path).await?);
let mut lines = reader.lines();
let mut ports = ClickHousePorts::zero();
'line_search: loop {
let line = lines.next_line().await?;
match line {
Some(line) => {
if let Some(http_port) =
find_port_after_needle(&line, CLICKHOUSE_HTTP_PORT_NEEDLE)?
{
ports.http = http_port;
} else if let Some(native_port) =
find_port_after_needle(&line, CLICKHOUSE_TCP_PORT_NEEDLE)?
{
ports.native = native_port;
} else {
continue 'line_search;
}
if !ports.any_zero() {
return Ok(ports);
}
}
None => {
// Reached EOF, just sleep for an interval and check again.
sleep(Duration::from_millis(10)).await;

// We might have gotten a partial line; close the file, reopen
// it, and start reading again from the beginning.
reader = BufReader::new(File::open(path).await?);
lines = reader.lines();
}
loop {
let line = lines
.next_line()
.await?
.expect("Reached EOF in ClickHouse log file without discovering ports");
if let Some(http_port) =
find_port_after_needle(&line, CLICKHOUSE_HTTP_PORT_NEEDLE)?
{
ports.http = http_port;
} else if let Some(native_port) =
find_port_after_needle(&line, CLICKHOUSE_TCP_PORT_NEEDLE)?
{
ports.native = native_port;
} else {
continue;
}
if !ports.any_zero() {
return Ok(ports);
}
}
}
Expand Down Expand Up @@ -1265,13 +1230,13 @@ fn find_port_after_needle(

// Wait for the ClickHouse log file to report it is ready to receive connections
pub async fn wait_for_ready(
log_path: Utf8PathBuf,
log_path: &Utf8PathBuf,
timeout: Duration,
needle: &str,
) -> Result<(), anyhow::Error> {
let p = poll::wait_for_condition(
|| async {
let result = discover_ready(&log_path, timeout, needle).await;
let result = discover_ready(log_path, timeout, needle).await;
match result {
Ok(ready) => Ok(ready),
Err(e) => {
Expand Down

0 comments on commit 3bacf4a

Please sign in to comment.