From 2aba4114808b6a77edad9df6650d76e9d5838636 Mon Sep 17 00:00:00 2001 From: zyy17 Date: Mon, 6 Nov 2023 17:29:01 +0800 Subject: [PATCH] refactor: add 'connect_db()' --- tests/runner/src/env.rs | 125 ++++++++++++++++++---------------------- 1 file changed, 56 insertions(+), 69 deletions(-) diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index 524730f8118c..732a7c13c87c 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -54,46 +54,8 @@ impl EnvController for Env { async fn start(&self, mode: &str, _config: Option<&Path>) -> Self::DB { match mode { - "standalone" => { - if let Some(server_addr) = self.server_addr.clone() { - let client = Client::with_urls(vec![server_addr.clone()]); - let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); - GreptimeDB { - client: TokioMutex::new(db), - server_processes: None, - metasrv_process: None, - frontend_process: None, - ctx: GreptimeDBContext { - time: 0, - datanode_id: Default::default(), - }, - is_standalone: false, - env: self.clone(), - } - } else { - self.start_standalone().await - } - } - "distributed" => { - if let Some(server_addr) = self.server_addr.clone() { - let client = Client::with_urls(vec![server_addr.clone()]); - let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); - GreptimeDB { - client: TokioMutex::new(db), - server_processes: None, - metasrv_process: None, - frontend_process: None, - ctx: GreptimeDBContext { - time: 0, - datanode_id: Default::default(), - }, - is_standalone: false, - env: self.clone(), - } - } else { - self.start_distributed().await - } - } + "standalone" => self.start_standalone().await, + "distributed" => self.start_distributed().await, _ => panic!("Unexpected mode: {mode}"), } } @@ -114,51 +76,76 @@ impl Env { } async fn start_standalone(&self) -> GreptimeDB { - Self::build_db().await; + if let Some(server_addr) = self.server_addr.clone() { + return self.connect_db(&server_addr); + } else { + Self::build_db().await; - let db_ctx = GreptimeDBContext::new(); + let db_ctx = GreptimeDBContext::new(); - let server_process = self.start_server("standalone", &db_ctx, true).await; + let server_process = self.start_server("standalone", &db_ctx, true).await; - let client = Client::with_urls(vec![SERVER_ADDR]); - let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); + let client = Client::with_urls(vec![SERVER_ADDR]); + let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); - GreptimeDB { - server_processes: Some(Arc::new(Mutex::new(vec![server_process]))), - metasrv_process: None, - frontend_process: None, - client: TokioMutex::new(db), - ctx: db_ctx, - is_standalone: true, - env: self.clone(), + GreptimeDB { + server_processes: Some(Arc::new(Mutex::new(vec![server_process]))), + metasrv_process: None, + frontend_process: None, + client: TokioMutex::new(db), + ctx: db_ctx, + is_standalone: true, + env: self.clone(), + } } } async fn start_distributed(&self) -> GreptimeDB { - Self::build_db().await; + if let Some(server_addr) = self.server_addr.clone() { + return self.connect_db(&server_addr); + } else { + Self::build_db().await; - let db_ctx = GreptimeDBContext::new(); + let db_ctx = GreptimeDBContext::new(); - // start a distributed GreptimeDB - let meta_server = self.start_server("metasrv", &db_ctx, true).await; + // start a distributed GreptimeDB + let meta_server = self.start_server("metasrv", &db_ctx, true).await; - let datanode_1 = self.start_server("datanode", &db_ctx, true).await; - let datanode_2 = self.start_server("datanode", &db_ctx, true).await; - let datanode_3 = self.start_server("datanode", &db_ctx, true).await; + let datanode_1 = self.start_server("datanode", &db_ctx, true).await; + let datanode_2 = self.start_server("datanode", &db_ctx, true).await; + let datanode_3 = self.start_server("datanode", &db_ctx, true).await; - let frontend = self.start_server("frontend", &db_ctx, true).await; + let frontend = self.start_server("frontend", &db_ctx, true).await; - let client = Client::with_urls(vec![SERVER_ADDR]); - let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); + let client = Client::with_urls(vec![SERVER_ADDR]); + let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); + + GreptimeDB { + server_processes: Some(Arc::new(Mutex::new(vec![ + datanode_1, datanode_2, datanode_3, + ]))), + metasrv_process: Some(meta_server), + frontend_process: Some(frontend), + client: TokioMutex::new(db), + ctx: db_ctx, + is_standalone: false, + env: self.clone(), + } + } + } + fn connect_db(&self, server_addr: &String) -> GreptimeDB { + let client = Client::with_urls(vec![server_addr.clone()]); + let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); GreptimeDB { - server_processes: Some(Arc::new(Mutex::new(vec![ - datanode_1, datanode_2, datanode_3, - ]))), - metasrv_process: Some(meta_server), - frontend_process: Some(frontend), client: TokioMutex::new(db), - ctx: db_ctx, + server_processes: None, + metasrv_process: None, + frontend_process: None, + ctx: GreptimeDBContext { + time: 0, + datanode_id: Default::default(), + }, is_standalone: false, env: self.clone(), }