From 9456047be34bb3aa0cc6b5827e17f81e9291dad9 Mon Sep 17 00:00:00 2001 From: zyy17 Date: Fri, 3 Nov 2023 17:05:30 +0800 Subject: [PATCH 1/3] feat: add '--server-addr' and '--mode' in sqlness runner --- tests/runner/src/env.rs | 103 +++++++++++++++++++++++++++++++-------- tests/runner/src/main.rs | 10 +++- 2 files changed, 93 insertions(+), 20 deletions(-) diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index 498e08ac7795..2b2efca991b1 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -41,8 +41,11 @@ const METASRV_ADDR: &str = "127.0.0.1:3002"; const SERVER_ADDR: &str = "127.0.0.1:4001"; const DEFAULT_LOG_LEVEL: &str = "--log-level=debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info"; +#[derive(Clone)] pub struct Env { data_home: PathBuf, + server_addr: Option, + specified_mode: String, } #[allow(clippy::print_stdout)] @@ -52,8 +55,56 @@ impl EnvController for Env { async fn start(&self, mode: &str, _config: Option<&Path>) -> Self::DB { match mode { - "standalone" => self.start_standalone().await, - "distributed" => self.start_distributed().await, + "standalone" => { + if self.specified_mode == "all" || self.specified_mode == "standalone" { + if let Some(server_addr) = self.server_addr.clone() { + let client = Client::with_urls(vec![server_addr.clone()]); + let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); + GreptimeDB { + client: TokioMutex::new(db), + server_processes: None, + metasrv_process: None, + frontend_process: None, + ctx: GreptimeDBContext { + time: 0, + datanode_id: Default::default(), + }, + is_standalone: false, + env: self.clone(), + } + } else { + self.start_standalone().await + } + } else { + // Exit and do nothing. + std::process::exit(0); + } + } + "distributed" => { + if self.specified_mode == "all" || self.specified_mode == "distributed" { + if let Some(server_addr) = self.server_addr.clone() { + let client = Client::with_urls(vec![server_addr.clone()]); + let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); + GreptimeDB { + client: TokioMutex::new(db), + server_processes: None, + metasrv_process: None, + frontend_process: None, + ctx: GreptimeDBContext { + time: 0, + datanode_id: Default::default(), + }, + is_standalone: false, + env: self.clone(), + } + } else { + self.start_distributed().await + } + } else { + // Exit and do nothing. + std::process::exit(0); + } + } _ => panic!("Unexpected mode: {mode}"), } } @@ -66,8 +117,12 @@ impl EnvController for Env { #[allow(clippy::print_stdout)] impl Env { - pub fn new(data_home: PathBuf) -> Self { - Self { data_home } + pub fn new(data_home: PathBuf, server_addr: Option, specified_mode: String) -> Self { + Self { + data_home, + server_addr, + specified_mode, + } } async fn start_standalone(&self) -> GreptimeDB { @@ -81,13 +136,13 @@ impl Env { let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); GreptimeDB { - server_processes: Arc::new(Mutex::new(vec![server_process])), + server_processes: Some(Arc::new(Mutex::new(vec![server_process]))), metasrv_process: None, frontend_process: None, client: TokioMutex::new(db), ctx: db_ctx, is_standalone: true, - env: Env::new(self.data_home.clone()), + env: self.clone(), } } @@ -109,13 +164,15 @@ impl Env { let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); GreptimeDB { - server_processes: Arc::new(Mutex::new(vec![datanode_1, datanode_2, datanode_3])), + server_processes: Some(Arc::new(Mutex::new(vec![ + datanode_1, datanode_2, datanode_3, + ]))), metasrv_process: Some(meta_server), frontend_process: Some(frontend), client: TokioMutex::new(db), ctx: db_ctx, is_standalone: false, - env: Env::new(self.data_home.clone()), + env: self.clone(), } } @@ -244,9 +301,11 @@ impl Env { /// stop and restart the server process async fn restart_server(&self, db: &GreptimeDB) { { - let mut server_processes = db.server_processes.lock().unwrap(); - for server_process in server_processes.iter_mut() { - Env::stop_server(server_process); + if let Some(server_process) = db.server_processes.clone() { + let mut server_processes = server_process.lock().unwrap(); + for server_process in server_processes.iter_mut() { + Env::stop_server(server_process); + } } } @@ -265,8 +324,10 @@ impl Env { processes }; - let mut server_processes = db.server_processes.lock().unwrap(); - *server_processes = new_server_processes; + if let Some(server_process) = db.server_processes.clone() { + let mut server_processes = server_process.lock().unwrap(); + *server_processes = new_server_processes; + } } /// Generate config file to `/tmp/{subcommand}-{current_time}.toml` @@ -332,7 +393,7 @@ impl Env { } pub struct GreptimeDB { - server_processes: Arc>>, + server_processes: Option>>>, metasrv_process: Option, frontend_process: Option, client: TokioMutex, @@ -344,7 +405,7 @@ pub struct GreptimeDB { #[async_trait] impl Database for GreptimeDB { async fn query(&self, ctx: QueryContext, query: String) -> Box { - if ctx.context.contains_key("restart") { + if ctx.context.contains_key("restart") && self.env.server_addr.is_none() { self.env.restart_server(self).await; } @@ -383,9 +444,11 @@ impl Database for GreptimeDB { impl GreptimeDB { #![allow(clippy::print_stdout)] fn stop(&mut self) { - let mut servers = self.server_processes.lock().unwrap(); - for server in servers.iter_mut() { - Env::stop_server(server); + if let Some(server_processes) = self.server_processes.clone() { + let mut server_processes = server_processes.lock().unwrap(); + for server_process in server_processes.iter_mut() { + Env::stop_server(server_process); + } } if let Some(mut metasrv) = self.metasrv_process.take() { Env::stop_server(&mut metasrv); @@ -399,7 +462,9 @@ impl GreptimeDB { impl Drop for GreptimeDB { fn drop(&mut self) { - self.stop(); + if self.env.server_addr.is_none() { + self.stop(); + } } } diff --git a/tests/runner/src/main.rs b/tests/runner/src/main.rs index 8cf02d53668b..b8e6a97541f2 100644 --- a/tests/runner/src/main.rs +++ b/tests/runner/src/main.rs @@ -40,6 +40,14 @@ struct Args { /// Name of test cases to run. Accept as a regexp. #[clap(short, long, default_value = ".*")] test_filter: String, + + /// Address of the server + #[clap(short, long)] + server_addr: Option, + + /// Test mode, can be 'all', 'standalone' and 'distributed'. + #[clap(short, long, default_value = "all")] + mode: String, } #[tokio::main] @@ -59,6 +67,6 @@ async fn main() { .env_config_file(args.env_config_file) .build() .unwrap(); - let runner = Runner::new(config, Env::new(data_home)); + let runner = Runner::new(config, Env::new(data_home, args.server_addr, args.mode)); runner.run().await.unwrap(); } From 53b0c0bf08767e5c441544fead93e91c9c1b38ea Mon Sep 17 00:00:00 2001 From: zyy17 Date: Sun, 5 Nov 2023 20:02:54 +0800 Subject: [PATCH 2/3] chore: remove '--mode' --- tests/runner/src/env.rs | 74 +++++++++++++++++----------------------- tests/runner/src/main.rs | 6 +--- 2 files changed, 32 insertions(+), 48 deletions(-) diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index 2b2efca991b1..524730f8118c 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -45,7 +45,6 @@ const DEFAULT_LOG_LEVEL: &str = "--log-level=debug,hyper=warn,tower=warn,datafus pub struct Env { data_home: PathBuf, server_addr: Option, - specified_mode: String, } #[allow(clippy::print_stdout)] @@ -56,53 +55,43 @@ impl EnvController for Env { async fn start(&self, mode: &str, _config: Option<&Path>) -> Self::DB { match mode { "standalone" => { - if self.specified_mode == "all" || self.specified_mode == "standalone" { - if let Some(server_addr) = self.server_addr.clone() { - let client = Client::with_urls(vec![server_addr.clone()]); - let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); - GreptimeDB { - client: TokioMutex::new(db), - server_processes: None, - metasrv_process: None, - frontend_process: None, - ctx: GreptimeDBContext { - time: 0, - datanode_id: Default::default(), - }, - is_standalone: false, - env: self.clone(), - } - } else { - self.start_standalone().await + if let Some(server_addr) = self.server_addr.clone() { + let client = Client::with_urls(vec![server_addr.clone()]); + let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); + GreptimeDB { + client: TokioMutex::new(db), + server_processes: None, + metasrv_process: None, + frontend_process: None, + ctx: GreptimeDBContext { + time: 0, + datanode_id: Default::default(), + }, + is_standalone: false, + env: self.clone(), } } else { - // Exit and do nothing. - std::process::exit(0); + self.start_standalone().await } } "distributed" => { - if self.specified_mode == "all" || self.specified_mode == "distributed" { - if let Some(server_addr) = self.server_addr.clone() { - let client = Client::with_urls(vec![server_addr.clone()]); - let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); - GreptimeDB { - client: TokioMutex::new(db), - server_processes: None, - metasrv_process: None, - frontend_process: None, - ctx: GreptimeDBContext { - time: 0, - datanode_id: Default::default(), - }, - is_standalone: false, - env: self.clone(), - } - } else { - self.start_distributed().await + if let Some(server_addr) = self.server_addr.clone() { + let client = Client::with_urls(vec![server_addr.clone()]); + let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); + GreptimeDB { + client: TokioMutex::new(db), + server_processes: None, + metasrv_process: None, + frontend_process: None, + ctx: GreptimeDBContext { + time: 0, + datanode_id: Default::default(), + }, + is_standalone: false, + env: self.clone(), } } else { - // Exit and do nothing. - std::process::exit(0); + self.start_distributed().await } } _ => panic!("Unexpected mode: {mode}"), @@ -117,11 +106,10 @@ impl EnvController for Env { #[allow(clippy::print_stdout)] impl Env { - pub fn new(data_home: PathBuf, server_addr: Option, specified_mode: String) -> Self { + pub fn new(data_home: PathBuf, server_addr: Option) -> Self { Self { data_home, server_addr, - specified_mode, } } diff --git a/tests/runner/src/main.rs b/tests/runner/src/main.rs index b8e6a97541f2..9c93e628a484 100644 --- a/tests/runner/src/main.rs +++ b/tests/runner/src/main.rs @@ -44,10 +44,6 @@ struct Args { /// Address of the server #[clap(short, long)] server_addr: Option, - - /// Test mode, can be 'all', 'standalone' and 'distributed'. - #[clap(short, long, default_value = "all")] - mode: String, } #[tokio::main] @@ -67,6 +63,6 @@ async fn main() { .env_config_file(args.env_config_file) .build() .unwrap(); - let runner = Runner::new(config, Env::new(data_home, args.server_addr, args.mode)); + let runner = Runner::new(config, Env::new(data_home, args.server_addr)); runner.run().await.unwrap(); } From b1766a5f1566b286e9c6e7523e31497b31331d98 Mon Sep 17 00:00:00 2001 From: zyy17 Date: Mon, 6 Nov 2023 17:29:01 +0800 Subject: [PATCH 3/3] refactor: add 'connect_db()' --- tests/runner/src/env.rs | 125 ++++++++++++++++++---------------------- 1 file changed, 56 insertions(+), 69 deletions(-) diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index 524730f8118c..b5218979821a 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -54,46 +54,8 @@ impl EnvController for Env { async fn start(&self, mode: &str, _config: Option<&Path>) -> Self::DB { match mode { - "standalone" => { - if let Some(server_addr) = self.server_addr.clone() { - let client = Client::with_urls(vec![server_addr.clone()]); - let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); - GreptimeDB { - client: TokioMutex::new(db), - server_processes: None, - metasrv_process: None, - frontend_process: None, - ctx: GreptimeDBContext { - time: 0, - datanode_id: Default::default(), - }, - is_standalone: false, - env: self.clone(), - } - } else { - self.start_standalone().await - } - } - "distributed" => { - if let Some(server_addr) = self.server_addr.clone() { - let client = Client::with_urls(vec![server_addr.clone()]); - let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); - GreptimeDB { - client: TokioMutex::new(db), - server_processes: None, - metasrv_process: None, - frontend_process: None, - ctx: GreptimeDBContext { - time: 0, - datanode_id: Default::default(), - }, - is_standalone: false, - env: self.clone(), - } - } else { - self.start_distributed().await - } - } + "standalone" => self.start_standalone().await, + "distributed" => self.start_distributed().await, _ => panic!("Unexpected mode: {mode}"), } } @@ -114,51 +76,76 @@ impl Env { } async fn start_standalone(&self) -> GreptimeDB { - Self::build_db().await; + if let Some(server_addr) = self.server_addr.clone() { + self.connect_db(&server_addr) + } else { + Self::build_db().await; - let db_ctx = GreptimeDBContext::new(); + let db_ctx = GreptimeDBContext::new(); - let server_process = self.start_server("standalone", &db_ctx, true).await; + let server_process = self.start_server("standalone", &db_ctx, true).await; - let client = Client::with_urls(vec![SERVER_ADDR]); - let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); + let client = Client::with_urls(vec![SERVER_ADDR]); + let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); - GreptimeDB { - server_processes: Some(Arc::new(Mutex::new(vec![server_process]))), - metasrv_process: None, - frontend_process: None, - client: TokioMutex::new(db), - ctx: db_ctx, - is_standalone: true, - env: self.clone(), + GreptimeDB { + server_processes: Some(Arc::new(Mutex::new(vec![server_process]))), + metasrv_process: None, + frontend_process: None, + client: TokioMutex::new(db), + ctx: db_ctx, + is_standalone: true, + env: self.clone(), + } } } async fn start_distributed(&self) -> GreptimeDB { - Self::build_db().await; + if let Some(server_addr) = self.server_addr.clone() { + self.connect_db(&server_addr) + } else { + Self::build_db().await; - let db_ctx = GreptimeDBContext::new(); + let db_ctx = GreptimeDBContext::new(); - // start a distributed GreptimeDB - let meta_server = self.start_server("metasrv", &db_ctx, true).await; + // start a distributed GreptimeDB + let meta_server = self.start_server("metasrv", &db_ctx, true).await; - let datanode_1 = self.start_server("datanode", &db_ctx, true).await; - let datanode_2 = self.start_server("datanode", &db_ctx, true).await; - let datanode_3 = self.start_server("datanode", &db_ctx, true).await; + let datanode_1 = self.start_server("datanode", &db_ctx, true).await; + let datanode_2 = self.start_server("datanode", &db_ctx, true).await; + let datanode_3 = self.start_server("datanode", &db_ctx, true).await; - let frontend = self.start_server("frontend", &db_ctx, true).await; + let frontend = self.start_server("frontend", &db_ctx, true).await; - let client = Client::with_urls(vec![SERVER_ADDR]); - let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); + let client = Client::with_urls(vec![SERVER_ADDR]); + let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); + + GreptimeDB { + server_processes: Some(Arc::new(Mutex::new(vec![ + datanode_1, datanode_2, datanode_3, + ]))), + metasrv_process: Some(meta_server), + frontend_process: Some(frontend), + client: TokioMutex::new(db), + ctx: db_ctx, + is_standalone: false, + env: self.clone(), + } + } + } + fn connect_db(&self, server_addr: &str) -> GreptimeDB { + let client = Client::with_urls(vec![server_addr.to_owned()]); + let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); GreptimeDB { - server_processes: Some(Arc::new(Mutex::new(vec![ - datanode_1, datanode_2, datanode_3, - ]))), - metasrv_process: Some(meta_server), - frontend_process: Some(frontend), client: TokioMutex::new(db), - ctx: db_ctx, + server_processes: None, + metasrv_process: None, + frontend_process: None, + ctx: GreptimeDBContext { + time: 0, + datanode_id: Default::default(), + }, is_standalone: false, env: self.clone(), }