Skip to content

Commit

Permalink
refactor: add 'connect_db()'
Browse files Browse the repository at this point in the history
  • Loading branch information
zyy17 committed Nov 6, 2023
1 parent 53b0c0b commit b1766a5
Showing 1 changed file with 56 additions and 69 deletions.
125 changes: 56 additions & 69 deletions tests/runner/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,46 +54,8 @@ impl EnvController for Env {

async fn start(&self, mode: &str, _config: Option<&Path>) -> Self::DB {
match mode {
"standalone" => {
if let Some(server_addr) = self.server_addr.clone() {
let client = Client::with_urls(vec![server_addr.clone()]);
let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
GreptimeDB {
client: TokioMutex::new(db),
server_processes: None,
metasrv_process: None,
frontend_process: None,
ctx: GreptimeDBContext {
time: 0,
datanode_id: Default::default(),
},
is_standalone: false,
env: self.clone(),
}
} else {
self.start_standalone().await
}
}
"distributed" => {
if let Some(server_addr) = self.server_addr.clone() {
let client = Client::with_urls(vec![server_addr.clone()]);
let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
GreptimeDB {
client: TokioMutex::new(db),
server_processes: None,
metasrv_process: None,
frontend_process: None,
ctx: GreptimeDBContext {
time: 0,
datanode_id: Default::default(),
},
is_standalone: false,
env: self.clone(),
}
} else {
self.start_distributed().await
}
}
"standalone" => self.start_standalone().await,
"distributed" => self.start_distributed().await,
_ => panic!("Unexpected mode: {mode}"),
}
}
Expand All @@ -114,51 +76,76 @@ impl Env {
}

async fn start_standalone(&self) -> GreptimeDB {
Self::build_db().await;
if let Some(server_addr) = self.server_addr.clone() {
self.connect_db(&server_addr)
} else {
Self::build_db().await;

let db_ctx = GreptimeDBContext::new();
let db_ctx = GreptimeDBContext::new();

let server_process = self.start_server("standalone", &db_ctx, true).await;
let server_process = self.start_server("standalone", &db_ctx, true).await;

let client = Client::with_urls(vec![SERVER_ADDR]);
let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
let client = Client::with_urls(vec![SERVER_ADDR]);
let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);

GreptimeDB {
server_processes: Some(Arc::new(Mutex::new(vec![server_process]))),
metasrv_process: None,
frontend_process: None,
client: TokioMutex::new(db),
ctx: db_ctx,
is_standalone: true,
env: self.clone(),
GreptimeDB {
server_processes: Some(Arc::new(Mutex::new(vec![server_process]))),
metasrv_process: None,
frontend_process: None,
client: TokioMutex::new(db),
ctx: db_ctx,
is_standalone: true,
env: self.clone(),
}
}
}

async fn start_distributed(&self) -> GreptimeDB {
Self::build_db().await;
if let Some(server_addr) = self.server_addr.clone() {
self.connect_db(&server_addr)
} else {
Self::build_db().await;

let db_ctx = GreptimeDBContext::new();
let db_ctx = GreptimeDBContext::new();

// start a distributed GreptimeDB
let meta_server = self.start_server("metasrv", &db_ctx, true).await;
// start a distributed GreptimeDB
let meta_server = self.start_server("metasrv", &db_ctx, true).await;

let datanode_1 = self.start_server("datanode", &db_ctx, true).await;
let datanode_2 = self.start_server("datanode", &db_ctx, true).await;
let datanode_3 = self.start_server("datanode", &db_ctx, true).await;
let datanode_1 = self.start_server("datanode", &db_ctx, true).await;
let datanode_2 = self.start_server("datanode", &db_ctx, true).await;
let datanode_3 = self.start_server("datanode", &db_ctx, true).await;

let frontend = self.start_server("frontend", &db_ctx, true).await;
let frontend = self.start_server("frontend", &db_ctx, true).await;

let client = Client::with_urls(vec![SERVER_ADDR]);
let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
let client = Client::with_urls(vec![SERVER_ADDR]);
let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);

GreptimeDB {
server_processes: Some(Arc::new(Mutex::new(vec![
datanode_1, datanode_2, datanode_3,
]))),
metasrv_process: Some(meta_server),
frontend_process: Some(frontend),
client: TokioMutex::new(db),
ctx: db_ctx,
is_standalone: false,
env: self.clone(),
}
}
}

fn connect_db(&self, server_addr: &str) -> GreptimeDB {
let client = Client::with_urls(vec![server_addr.to_owned()]);
let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
GreptimeDB {
server_processes: Some(Arc::new(Mutex::new(vec![
datanode_1, datanode_2, datanode_3,
]))),
metasrv_process: Some(meta_server),
frontend_process: Some(frontend),
client: TokioMutex::new(db),
ctx: db_ctx,
server_processes: None,
metasrv_process: None,
frontend_process: None,
ctx: GreptimeDBContext {
time: 0,
datanode_id: Default::default(),
},
is_standalone: false,
env: self.clone(),
}
Expand Down

0 comments on commit b1766a5

Please sign in to comment.