diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index 498e08ac7795..b5218979821a 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -41,8 +41,10 @@ const METASRV_ADDR: &str = "127.0.0.1:3002"; const SERVER_ADDR: &str = "127.0.0.1:4001"; const DEFAULT_LOG_LEVEL: &str = "--log-level=debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info"; +#[derive(Clone)] pub struct Env { data_home: PathBuf, + server_addr: Option, } #[allow(clippy::print_stdout)] @@ -66,56 +68,86 @@ impl EnvController for Env { #[allow(clippy::print_stdout)] impl Env { - pub fn new(data_home: PathBuf) -> Self { - Self { data_home } + pub fn new(data_home: PathBuf, server_addr: Option) -> Self { + Self { + data_home, + server_addr, + } } async fn start_standalone(&self) -> GreptimeDB { - Self::build_db().await; + if let Some(server_addr) = self.server_addr.clone() { + self.connect_db(&server_addr) + } else { + Self::build_db().await; - let db_ctx = GreptimeDBContext::new(); + let db_ctx = GreptimeDBContext::new(); - let server_process = self.start_server("standalone", &db_ctx, true).await; + let server_process = self.start_server("standalone", &db_ctx, true).await; - let client = Client::with_urls(vec![SERVER_ADDR]); - let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); + let client = Client::with_urls(vec![SERVER_ADDR]); + let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); - GreptimeDB { - server_processes: Arc::new(Mutex::new(vec![server_process])), - metasrv_process: None, - frontend_process: None, - client: TokioMutex::new(db), - ctx: db_ctx, - is_standalone: true, - env: Env::new(self.data_home.clone()), + GreptimeDB { + server_processes: Some(Arc::new(Mutex::new(vec![server_process]))), + metasrv_process: None, + frontend_process: None, + client: TokioMutex::new(db), + ctx: db_ctx, + is_standalone: true, + env: self.clone(), + } } } async fn start_distributed(&self) -> GreptimeDB { - Self::build_db().await; + if let Some(server_addr) = self.server_addr.clone() { + self.connect_db(&server_addr) + } else { + Self::build_db().await; - let db_ctx = GreptimeDBContext::new(); + let db_ctx = GreptimeDBContext::new(); - // start a distributed GreptimeDB - let meta_server = self.start_server("metasrv", &db_ctx, true).await; + // start a distributed GreptimeDB + let meta_server = self.start_server("metasrv", &db_ctx, true).await; - let datanode_1 = self.start_server("datanode", &db_ctx, true).await; - let datanode_2 = self.start_server("datanode", &db_ctx, true).await; - let datanode_3 = self.start_server("datanode", &db_ctx, true).await; + let datanode_1 = self.start_server("datanode", &db_ctx, true).await; + let datanode_2 = self.start_server("datanode", &db_ctx, true).await; + let datanode_3 = self.start_server("datanode", &db_ctx, true).await; - let frontend = self.start_server("frontend", &db_ctx, true).await; + let frontend = self.start_server("frontend", &db_ctx, true).await; - let client = Client::with_urls(vec![SERVER_ADDR]); - let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); + let client = Client::with_urls(vec![SERVER_ADDR]); + let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); + GreptimeDB { + server_processes: Some(Arc::new(Mutex::new(vec![ + datanode_1, datanode_2, datanode_3, + ]))), + metasrv_process: Some(meta_server), + frontend_process: Some(frontend), + client: TokioMutex::new(db), + ctx: db_ctx, + is_standalone: false, + env: self.clone(), + } + } + } + + fn connect_db(&self, server_addr: &str) -> GreptimeDB { + let client = Client::with_urls(vec![server_addr.to_owned()]); + let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); GreptimeDB { - server_processes: Arc::new(Mutex::new(vec![datanode_1, datanode_2, datanode_3])), - metasrv_process: Some(meta_server), - frontend_process: Some(frontend), client: TokioMutex::new(db), - ctx: db_ctx, + server_processes: None, + metasrv_process: None, + frontend_process: None, + ctx: GreptimeDBContext { + time: 0, + datanode_id: Default::default(), + }, is_standalone: false, - env: Env::new(self.data_home.clone()), + env: self.clone(), } } @@ -244,9 +276,11 @@ impl Env { /// stop and restart the server process async fn restart_server(&self, db: &GreptimeDB) { { - let mut server_processes = db.server_processes.lock().unwrap(); - for server_process in server_processes.iter_mut() { - Env::stop_server(server_process); + if let Some(server_process) = db.server_processes.clone() { + let mut server_processes = server_process.lock().unwrap(); + for server_process in server_processes.iter_mut() { + Env::stop_server(server_process); + } } } @@ -265,8 +299,10 @@ impl Env { processes }; - let mut server_processes = db.server_processes.lock().unwrap(); - *server_processes = new_server_processes; + if let Some(server_process) = db.server_processes.clone() { + let mut server_processes = server_process.lock().unwrap(); + *server_processes = new_server_processes; + } } /// Generate config file to `/tmp/{subcommand}-{current_time}.toml` @@ -332,7 +368,7 @@ impl Env { } pub struct GreptimeDB { - server_processes: Arc>>, + server_processes: Option>>>, metasrv_process: Option, frontend_process: Option, client: TokioMutex, @@ -344,7 +380,7 @@ pub struct GreptimeDB { #[async_trait] impl Database for GreptimeDB { async fn query(&self, ctx: QueryContext, query: String) -> Box { - if ctx.context.contains_key("restart") { + if ctx.context.contains_key("restart") && self.env.server_addr.is_none() { self.env.restart_server(self).await; } @@ -383,9 +419,11 @@ impl Database for GreptimeDB { impl GreptimeDB { #![allow(clippy::print_stdout)] fn stop(&mut self) { - let mut servers = self.server_processes.lock().unwrap(); - for server in servers.iter_mut() { - Env::stop_server(server); + if let Some(server_processes) = self.server_processes.clone() { + let mut server_processes = server_processes.lock().unwrap(); + for server_process in server_processes.iter_mut() { + Env::stop_server(server_process); + } } if let Some(mut metasrv) = self.metasrv_process.take() { Env::stop_server(&mut metasrv); @@ -399,7 +437,9 @@ impl GreptimeDB { impl Drop for GreptimeDB { fn drop(&mut self) { - self.stop(); + if self.env.server_addr.is_none() { + self.stop(); + } } } diff --git a/tests/runner/src/main.rs b/tests/runner/src/main.rs index 8cf02d53668b..9c93e628a484 100644 --- a/tests/runner/src/main.rs +++ b/tests/runner/src/main.rs @@ -40,6 +40,10 @@ struct Args { /// Name of test cases to run. Accept as a regexp. #[clap(short, long, default_value = ".*")] test_filter: String, + + /// Address of the server + #[clap(short, long)] + server_addr: Option, } #[tokio::main] @@ -59,6 +63,6 @@ async fn main() { .env_config_file(args.env_config_file) .build() .unwrap(); - let runner = Runner::new(config, Env::new(data_home)); + let runner = Runner::new(config, Env::new(data_home, args.server_addr)); runner.run().await.unwrap(); }