From 40a7d1c59749021a853d775e13cc33035b2b2ced Mon Sep 17 00:00:00 2001 From: luofucong Date: Fri, 5 Jan 2024 21:11:12 +0800 Subject: [PATCH] test: data files compatibility test --- Cargo.lock | 16 +- src/common/test-util/Cargo.toml | 9 + .../conf/datanode-test.toml.template | 0 .../conf/metasrv-test.toml.template | 0 .../conf/standalone-test.toml.template | 4 +- src/common/test-util/src/config.rs | 74 ++++++++ src/common/test-util/src/display.rs | 58 ++++++ src/common/test-util/src/lib.rs | 2 + src/common/test-util/src/ports.rs | 25 +++ tests-compatibility/Cargo.toml | 7 +- tests-compatibility/README.md | 8 + tests-compatibility/src/lib.rs | 13 -- tests-compatibility/src/main.rs | 172 ++++++++++++++++++ tests-compatibility/tests/compatibility.rs | 13 -- tests/runner/Cargo.toml | 2 +- tests/runner/src/env.rs | 91 ++------- tests/runner/src/util.rs | 27 --- 17 files changed, 388 insertions(+), 133 deletions(-) rename {tests => src/common/test-util}/conf/datanode-test.toml.template (100%) rename {tests => src/common/test-util}/conf/metasrv-test.toml.template (100%) rename {tests => src/common/test-util}/conf/standalone-test.toml.template (92%) create mode 100644 src/common/test-util/src/config.rs create mode 100644 src/common/test-util/src/display.rs create mode 100644 tests-compatibility/README.md delete mode 100644 tests-compatibility/src/lib.rs create mode 100644 tests-compatibility/src/main.rs delete mode 100644 tests-compatibility/tests/compatibility.rs diff --git a/Cargo.lock b/Cargo.lock index b08dffbd2573..6c6923b499dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1977,9 +1977,18 @@ dependencies = [ name = "common-test-util" version = "0.5.1" dependencies = [ + "client", + "common-error", + "common-query", + "common-recordbatch", + "common-time", "once_cell", "rand", + "serde", + "strum 0.25.0", "tempfile", + "tinytemplate", + "tokio", ] [[package]] @@ -8879,11 +8888,11 @@ dependencies = [ "common-grpc", "common-query", "common-recordbatch", + "common-test-util", "common-time", "serde", "serde_json", "sqlness", - "tinytemplate", "tokio", ] @@ -9478,7 +9487,10 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" name = "tests-compatibility" version = "0.5.0" dependencies = [ - "tinytemplate", + "client", + "common-telemetry", + "common-test-util", + "tokio", ] [[package]] diff --git a/src/common/test-util/Cargo.toml b/src/common/test-util/Cargo.toml index 60e854740643..b27327cb691f 100644 --- a/src/common/test-util/Cargo.toml +++ b/src/common/test-util/Cargo.toml @@ -5,6 +5,15 @@ edition.workspace = true license.workspace = true [dependencies] +client.workspace = true +common-error.workspace = true +common-query.workspace = true +common-recordbatch.workspace = true +common-time.workspace = true once_cell.workspace = true rand.workspace = true +serde.workspace = true +strum.workspace = true tempfile.workspace = true +tinytemplate = "1.2" +tokio.workspace = true diff --git a/tests/conf/datanode-test.toml.template b/src/common/test-util/conf/datanode-test.toml.template similarity index 100% rename from tests/conf/datanode-test.toml.template rename to src/common/test-util/conf/datanode-test.toml.template diff --git a/tests/conf/metasrv-test.toml.template b/src/common/test-util/conf/metasrv-test.toml.template similarity index 100% rename from tests/conf/metasrv-test.toml.template rename to src/common/test-util/conf/metasrv-test.toml.template diff --git a/tests/conf/standalone-test.toml.template b/src/common/test-util/conf/standalone-test.toml.template similarity index 92% rename from tests/conf/standalone-test.toml.template rename to src/common/test-util/conf/standalone-test.toml.template index 2e30ac35c266..8cc51668d84c 100644 --- a/tests/conf/standalone-test.toml.template +++ b/src/common/test-util/conf/standalone-test.toml.template @@ -20,8 +20,8 @@ linger = "5ms" type = 'File' data_home = '{data_home}' -[grpc_options] -addr = '127.0.0.1:4001' +[grpc] +addr = "{grpc_addr}" runtime_size = 8 [procedure] diff --git a/src/common/test-util/src/config.rs b/src/common/test-util/src/config.rs new file mode 100644 index 000000000000..a338155bc51f --- /dev/null +++ b/src/common/test-util/src/config.rs @@ -0,0 +1,74 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fs; +use std::path::Path; + +use serde::Serialize; +use strum::{Display, EnumString}; +use tinytemplate::TinyTemplate; + +use crate::find_workspace_path; + +#[derive(EnumString, Display)] +#[strum(serialize_all = "lowercase")] +pub enum ConfigTemplate { + Datanode, + Metasrv, + Standalone, +} + +impl ConfigTemplate { + fn render(&self, config_values: ConfigValues) -> String { + let template_name = self.to_string(); + + let template = fs::read_to_string(find_workspace_path(&format!( + "/src/common/test-util/conf/{template_name}-test.toml.template" + ))) + .unwrap(); + + let mut tt = TinyTemplate::new(); + tt.add_template(&template_name, &template).unwrap(); + tt.render(&template_name, &config_values).unwrap() + } +} + +#[derive(Serialize, Default)] +pub struct ConfigValues { + pub wal_dir: String, + pub data_home: String, + pub procedure_dir: String, + pub is_raft_engine: bool, + pub kafka_wal_broker_endpoints: String, + pub grpc_addr: String, +} + +/// Generate a config file from template (determined by parameter `config_template`), with provided +/// config values (in parameter `config_values`), and stores the file under the directory specified +/// by parameter `target_dir`, returns the target file name. +pub fn generate_config_file( + config_template: ConfigTemplate, + config_values: ConfigValues, + target_dir: &Path, +) -> String { + let rendered = config_template.render(config_values); + + let target_file = format!( + "{config_template}-{}.toml", + common_time::util::current_time_millis() + ); + fs::write(target_dir.join(&target_file), rendered).unwrap(); + + target_file +} diff --git a/src/common/test-util/src/display.rs b/src/common/test-util/src/display.rs new file mode 100644 index 000000000000..d9fd9c797c01 --- /dev/null +++ b/src/common/test-util/src/display.rs @@ -0,0 +1,58 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt; +use std::fmt::Formatter; + +use client::error::Result; +use common_error::ext::ErrorExt; +use common_query::Output; +use common_recordbatch::util::collect_batches; + +pub struct ResultDisplayer(pub Result); + +impl fmt::Display for ResultDisplayer { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match &self.0 { + Ok(result) => match result { + Output::AffectedRows(rows) => { + write!(f, "Affected Rows: {rows}") + } + Output::RecordBatches(recordbatches) => { + let pretty = recordbatches.pretty_print().unwrap(); + write!(f, "{pretty}") + } + Output::Stream(_) => unimplemented!(), + }, + Err(e) => { + let status_code = e.status_code(); + let root_cause = e.output_msg(); + write!( + f, + "Error: {}({status_code}), {root_cause}", + status_code as u32 + ) + } + } + } +} + +impl ResultDisplayer { + pub async fn display(self) -> String { + match self.0 { + Ok(Output::Stream(s)) => collect_batches(s).await.unwrap().pretty_print().unwrap(), + _ => self.to_string(), + } + } +} diff --git a/src/common/test-util/src/lib.rs b/src/common/test-util/src/lib.rs index ef6ff4696861..9acefd3e65c3 100644 --- a/src/common/test-util/src/lib.rs +++ b/src/common/test-util/src/lib.rs @@ -18,6 +18,8 @@ use std::path::{Path, PathBuf}; use std::process::Command; use std::sync::LazyLock; +pub mod config; +pub mod display; pub mod ports; pub mod temp_dir; diff --git a/src/common/test-util/src/ports.rs b/src/common/test-util/src/ports.rs index 659e08b04703..4ece3fd83a7c 100644 --- a/src/common/test-util/src/ports.rs +++ b/src/common/test-util/src/ports.rs @@ -13,9 +13,14 @@ // limitations under the License. use std::sync::atomic::{AtomicUsize, Ordering}; +use std::time::Duration; use once_cell::sync::OnceCell; use rand::Rng; +use tokio::io::AsyncWriteExt; +use tokio::net::TcpSocket; + +const PORT_CHECK_INTERVAL: Duration = Duration::from_millis(100); static PORTS: OnceCell = OnceCell::new(); @@ -25,3 +30,23 @@ pub fn get_port() -> usize { .get_or_init(|| AtomicUsize::new(rand::thread_rng().gen_range(3000..3800))) .fetch_add(1, Ordering::Relaxed) } + +/// Spin awaits a socket address until it is connectable, or timeout. +pub async fn check_connectable(ip_addr: &str, timeout: Duration) -> bool { + let ip_addr = ip_addr.parse().unwrap(); + + let check_task = async { + loop { + let socket = TcpSocket::new_v4().unwrap(); + match socket.connect(ip_addr).await { + Ok(mut stream) => { + let _ = stream.shutdown().await; + break; + } + Err(_) => tokio::time::sleep(PORT_CHECK_INTERVAL).await, + } + } + }; + + tokio::time::timeout(timeout, check_task).await.is_ok() +} diff --git a/tests-compatibility/Cargo.toml b/tests-compatibility/Cargo.toml index 3dc9a9291595..b98ad6537f2f 100644 --- a/tests-compatibility/Cargo.toml +++ b/tests-compatibility/Cargo.toml @@ -5,6 +5,7 @@ edition.workspace = true license.workspace = true [dependencies] - -[dev-dependencies] -tinytemplate = "1.2" +client.workspace = true +common-telemetry.workspace = true +common-test-util.workspace = true +tokio.workspace = true diff --git a/tests-compatibility/README.md b/tests-compatibility/README.md new file mode 100644 index 000000000000..8e4a6ff7d1bc --- /dev/null +++ b/tests-compatibility/README.md @@ -0,0 +1,8 @@ +# When compatibility tests failed: + +- Start an old version (or built from old source codes) GreptimeDB with its data home and WAL set to directory `tests-compatibility/data_home` and `tests-compatibility/data_home/wal/` respectively. +- Using our CLI tool to [export](https://docs.greptime.com/user-guide/upgrade) the data. +- Clear everything under `tests-compatibility/data_home/`. +- Build a new GreptimeDB with your changes with the same data home and WAL dir config. +- Using our CLI tool again to [import](https://docs.greptime.com/user-guide/upgrade) the data. +- Rerun the compatibility test. If it passes, commit the data files. diff --git a/tests-compatibility/src/lib.rs b/tests-compatibility/src/lib.rs deleted file mode 100644 index 59f3388c4861..000000000000 --- a/tests-compatibility/src/lib.rs +++ /dev/null @@ -1,13 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. diff --git a/tests-compatibility/src/main.rs b/tests-compatibility/src/main.rs new file mode 100644 index 000000000000..46c0f8d2e2f9 --- /dev/null +++ b/tests-compatibility/src/main.rs @@ -0,0 +1,172 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io; +use std::io::Write; +use std::path::PathBuf; +use std::process::{Child, Command}; +use std::time::Duration; + +use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_telemetry::{error, info}; +use common_test_util::config::{generate_config_file, ConfigTemplate, ConfigValues}; +use common_test_util::display::ResultDisplayer; +use common_test_util::find_workspace_path; +use common_test_util::ports::check_connectable; + +const GRPC_ADDR: &str = "127.0.0.1:24001"; + +#[tokio::main] +async fn main() { + common_telemetry::init_default_ut_logging(); + + info!("Start to build GreptimeDB ..."); + let target_dir = build_greptimedb(); + info!( + "Build GreptimeDB success, target dir: {}", + target_dir.display() + ); + + let mut process = start_standalone(target_dir); + info!( + "GreptimeDB standalone is spawned with process id {}", + process.id() + ); + + verify_data().await; + + process.kill().unwrap(); +} + +/// Verify the data that reside in the files are correct. +async fn verify_data() { + if !check_connectable(GRPC_ADDR, Duration::from_secs(10)).await { + panic!("GreptimeDB standalone is not ready within 10 seconds, quit!"); + } + + let client = Client::with_urls(vec![GRPC_ADDR]); + let db = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); + + let result = db.sql("show tables").await; + let expect = "\ ++---------+ +| Tables | ++---------+ +| monitor | +| numbers | ++---------+\ +"; + assert_eq!(ResultDisplayer(result).display().await, expect); + + let result = db.sql("select count(*) from monitor").await; + let expect = "\ ++----------+ +| COUNT(*) | ++----------+ +| 20002 | ++----------+\ +"; + assert_eq!(ResultDisplayer(result).display().await, expect); + + let result = db + .sql("select ts, host, cpu, memory from monitor order by ts, host limit 10") + .await; + let expect = "\ ++-------------------------+-----------+-----+--------+ +| ts | host | cpu | memory | ++-------------------------+-----------+-----+--------+ +| 2023-12-26T07:46:31.287 | 127.0.0.1 | 0.0 | | +| 2023-12-26T07:46:31.287 | 127.0.0.2 | 0.3 | 0.5 | +| 2023-12-26T07:46:31.288 | 127.0.0.1 | 1.0 | | +| 2023-12-26T07:46:31.289 | 127.0.0.1 | 2.0 | | +| 2023-12-26T07:46:31.290 | 127.0.0.1 | 3.0 | | +| 2023-12-26T07:46:31.291 | 127.0.0.1 | 4.0 | | +| 2023-12-26T07:46:31.292 | 127.0.0.1 | 5.0 | | +| 2023-12-26T07:46:31.293 | 127.0.0.1 | 6.0 | | +| 2023-12-26T07:46:31.294 | 127.0.0.1 | 7.0 | | +| 2023-12-26T07:46:31.295 | 127.0.0.1 | 8.0 | | ++-------------------------+-----------+-----+--------+\ +"; + assert_eq!(ResultDisplayer(result).display().await, expect); +} + +/// Start the just built GreptimeDB standalone, set it's data home and WAL path to an existing directory. +/// The directory contains a whole dataset that is generated by an old version of GreptimeDB. +/// If the current GreptimeDB starts successfully, it means we have data files compatibility. +fn start_standalone(target_dir: PathBuf) -> Child { + #[cfg(not(windows))] + let program = "./greptime"; + #[cfg(windows)] + let program = "greptime.exe"; + + let config_values = ConfigValues { + wal_dir: find_workspace_path("/tests-compatibility/data_home/wal/") + .display() + .to_string(), + data_home: find_workspace_path("/tests-compatibility/data_home/") + .display() + .to_string(), + is_raft_engine: true, + grpc_addr: GRPC_ADDR.to_string(), + ..Default::default() + }; + let config_file = target_dir + .join(generate_config_file( + ConfigTemplate::Standalone, + config_values, + &target_dir, + )) + .display() + .to_string(); + + let args = vec![ + "--log-level=debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info", + "standalone", + "start", + "-c", + config_file.as_str(), + ]; + + Command::new(program) + .current_dir(target_dir) + .env("TZ", "UTC") + .args(args) + .spawn() + .unwrap() +} + +/// Build the GreptimeDB with **current** source codes, returns the directory path to the binary. +fn build_greptimedb() -> PathBuf { + let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + + // cd to workspace root + path.pop(); + + let output = Command::new("cargo") + .current_dir(&path) + .args(["build", "--bin", "greptime"]) + .output() + .unwrap(); + + if !output.status.success() { + error!("Failed to build GreptimeDB, {}", output.status); + io::stdout().write_all(&output.stdout).unwrap(); + io::stderr().write_all(&output.stderr).unwrap(); + panic!(); + } + + path.push("target"); + path.push("debug"); + path +} diff --git a/tests-compatibility/tests/compatibility.rs b/tests-compatibility/tests/compatibility.rs deleted file mode 100644 index 59f3388c4861..000000000000 --- a/tests-compatibility/tests/compatibility.rs +++ /dev/null @@ -1,13 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. diff --git a/tests/runner/Cargo.toml b/tests/runner/Cargo.toml index b2757f479dd6..ad8e17143087 100644 --- a/tests/runner/Cargo.toml +++ b/tests/runner/Cargo.toml @@ -13,9 +13,9 @@ common-error.workspace = true common-grpc.workspace = true common-query.workspace = true common-recordbatch.workspace = true +common-test-util.workspace = true common-time.workspace = true serde.workspace = true serde_json.workspace = true sqlness = { version = "0.5" } -tinytemplate = "1.2" tokio.workspace = true diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index 76946ad6ba60..c48292ae59f3 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -18,21 +18,21 @@ use std::io; use std::io::Write; use std::path::{Path, PathBuf}; use std::process::{Child, Command}; +use std::str::FromStr; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; use async_trait::async_trait; use client::error::ServerSnafu; -use client::{ - Client, Database as DB, Error as ClientError, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, -}; +use client::{Client, Database as DB, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::ext::ErrorExt; use common_query::Output; use common_recordbatch::RecordBatches; -use serde::Serialize; +use common_test_util::config::{ConfigTemplate, ConfigValues}; +use common_test_util::display::ResultDisplayer; +use common_test_util::ports::check_connectable; use sqlness::{Database, EnvController, QueryContext}; -use tinytemplate::TinyTemplate; use tokio::sync::Mutex as TokioMutex; use crate::util; @@ -232,7 +232,7 @@ impl Env { _ => panic!("Unexpected subcommand: {subcommand}"), }; - if util::check_port(check_ip_addr.parse().unwrap(), Duration::from_secs(1)).await { + if check_connectable(&check_ip_addr, Duration::from_secs(1)).await { panic!( "Port {check_ip_addr} is already in use, please check and retry.", check_ip_addr = check_ip_addr @@ -254,7 +254,7 @@ impl Env { panic!("Failed to start the DB with subcommand {subcommand},Error: {error}") }); - if !util::check_port(check_ip_addr.parse().unwrap(), Duration::from_secs(10)).await { + if !check_connectable(&check_ip_addr, Duration::from_secs(10)).await { Env::stop_server(&mut process); panic!("{subcommand} doesn't up in 10 seconds, quit.") } @@ -319,23 +319,7 @@ impl Env { /// Generate config file to `/tmp/{subcommand}-{current_time}.toml` fn generate_config_file(&self, subcommand: &str, db_ctx: &GreptimeDBContext) -> String { - let mut tt = TinyTemplate::new(); - - let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); - path.pop(); - path.push("conf"); - path.push(format!("{subcommand}-test.toml.template")); - let template = std::fs::read_to_string(path).unwrap(); - tt.add_template(subcommand, &template).unwrap(); - - #[derive(Serialize)] - struct Context { - wal_dir: String, - data_home: String, - procedure_dir: String, - is_raft_engine: bool, - kafka_wal_broker_endpoints: String, - } + let config_template = ConfigTemplate::from_str(subcommand).unwrap(); let data_home = self .data_home @@ -344,23 +328,23 @@ impl Env { let wal_dir = data_home.join("wal").display().to_string(); let procedure_dir = data_home.join("procedure").display().to_string(); - let ctx = Context { + + let config_values = ConfigValues { wal_dir, data_home: data_home.display().to_string(), procedure_dir, is_raft_engine: db_ctx.is_raft_engine(), kafka_wal_broker_endpoints: db_ctx.kafka_wal_broker_endpoints(), + grpc_addr: SERVER_ADDR.to_string(), }; - let rendered = tt.render(subcommand, &ctx).unwrap(); - let conf_file = data_home - .join(format!("{subcommand}-{}.toml", db_ctx.time)) - .display() - .to_string(); - println!("Generating {subcommand} config file in {conf_file}, full content:\n{rendered}"); - std::fs::write(&conf_file, rendered).unwrap(); + let conf_file = common_test_util::config::generate_config_file( + config_template, + config_values, + &data_home, + ); - conf_file + data_home.join(conf_file).display().to_string() } /// Build the DB with `cargo build --bin greptime` @@ -408,9 +392,7 @@ impl Database for GreptimeDB { .expect("Illegal `USE` statement: expecting a database.") .trim_end_matches(';'); client.set_schema(database); - Box::new(ResultDisplayer { - result: Ok(Output::AffectedRows(0)), - }) as _ + Box::new(ResultDisplayer(Ok(Output::AffectedRows(0)))) as _ } else { let mut result = client.sql(&query).await; if let Ok(Output::Stream(stream)) = result { @@ -427,7 +409,7 @@ impl Database for GreptimeDB { } } } - Box::new(ResultDisplayer { result }) as _ + Box::new(ResultDisplayer(result)) as _ } } } @@ -500,38 +482,3 @@ impl GreptimeDBContext { self.datanode_id.store(0, Ordering::Relaxed); } } - -struct ResultDisplayer { - result: Result, -} - -impl Display for ResultDisplayer { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match &self.result { - Ok(result) => match result { - Output::AffectedRows(rows) => { - write!(f, "Affected Rows: {rows}") - } - Output::RecordBatches(recordbatches) => { - let pretty = recordbatches.pretty_print().map_err(|e| e.to_string()); - match pretty { - Ok(s) => write!(f, "{s}"), - Err(e) => { - write!(f, "Failed to pretty format {recordbatches:?}, error: {e}") - } - } - } - Output::Stream(_) => unreachable!(), - }, - Err(e) => { - let status_code = e.status_code(); - let root_cause = e.output_msg(); - write!( - f, - "Error: {}({status_code}), {root_cause}", - status_code as u32 - ) - } - } - } -} diff --git a/tests/runner/src/util.rs b/tests/runner/src/util.rs index c51b81db25bb..bb85ecb7892e 100644 --- a/tests/runner/src/util.rs +++ b/tests/runner/src/util.rs @@ -13,16 +13,8 @@ // limitations under the License. use std::fmt::Display; -use std::net::SocketAddr; use std::path::PathBuf; -use std::time::Duration; -use tokio::io::AsyncWriteExt; -use tokio::net::TcpSocket; -use tokio::time; - -/// Check port every 0.1 second. -const PORT_CHECK_INTERVAL: Duration = Duration::from_millis(100); const NULL_DATA_PLACEHOLDER: &str = "NULL"; /// Helper struct for iterate over column with null_mask @@ -100,22 +92,3 @@ pub fn get_binary_dir(mode: &str) -> String { workspace_root.into_os_string().into_string().unwrap() } - -/// Spin-waiting a socket address is available, or timeout. -/// Returns whether the addr is up. -pub async fn check_port(ip_addr: SocketAddr, timeout: Duration) -> bool { - let check_task = async { - loop { - let socket = TcpSocket::new_v4().expect("Cannot create v4 socket"); - match socket.connect(ip_addr).await { - Ok(mut stream) => { - let _ = stream.shutdown().await; - break; - } - Err(_) => time::sleep(PORT_CHECK_INTERVAL).await, - } - } - }; - - tokio::time::timeout(timeout, check_task).await.is_ok() -}