Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add '--server-addr' in sqlness runner #2692

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 81 additions & 41 deletions tests/runner/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ const METASRV_ADDR: &str = "127.0.0.1:3002";
const SERVER_ADDR: &str = "127.0.0.1:4001";
const DEFAULT_LOG_LEVEL: &str = "--log-level=debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info";

#[derive(Clone)]
pub struct Env {
data_home: PathBuf,
server_addr: Option<String>,
}

#[allow(clippy::print_stdout)]
Expand All @@ -66,56 +68,86 @@ impl EnvController for Env {

#[allow(clippy::print_stdout)]
impl Env {
pub fn new(data_home: PathBuf) -> Self {
Self { data_home }
pub fn new(data_home: PathBuf, server_addr: Option<String>) -> Self {
Self {
data_home,
server_addr,
}
}

async fn start_standalone(&self) -> GreptimeDB {
Self::build_db().await;
if let Some(server_addr) = self.server_addr.clone() {
self.connect_db(&server_addr)
} else {
Self::build_db().await;

let db_ctx = GreptimeDBContext::new();
let db_ctx = GreptimeDBContext::new();

let server_process = self.start_server("standalone", &db_ctx, true).await;
let server_process = self.start_server("standalone", &db_ctx, true).await;

let client = Client::with_urls(vec![SERVER_ADDR]);
let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
let client = Client::with_urls(vec![SERVER_ADDR]);
let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);

GreptimeDB {
server_processes: Arc::new(Mutex::new(vec![server_process])),
metasrv_process: None,
frontend_process: None,
client: TokioMutex::new(db),
ctx: db_ctx,
is_standalone: true,
env: Env::new(self.data_home.clone()),
GreptimeDB {
server_processes: Some(Arc::new(Mutex::new(vec![server_process]))),
metasrv_process: None,
frontend_process: None,
client: TokioMutex::new(db),
ctx: db_ctx,
is_standalone: true,
env: self.clone(),
}
}
}

async fn start_distributed(&self) -> GreptimeDB {
Self::build_db().await;
if let Some(server_addr) = self.server_addr.clone() {
self.connect_db(&server_addr)
} else {
Self::build_db().await;

let db_ctx = GreptimeDBContext::new();
let db_ctx = GreptimeDBContext::new();

// start a distributed GreptimeDB
let meta_server = self.start_server("metasrv", &db_ctx, true).await;
// start a distributed GreptimeDB
let meta_server = self.start_server("metasrv", &db_ctx, true).await;

let datanode_1 = self.start_server("datanode", &db_ctx, true).await;
let datanode_2 = self.start_server("datanode", &db_ctx, true).await;
let datanode_3 = self.start_server("datanode", &db_ctx, true).await;
let datanode_1 = self.start_server("datanode", &db_ctx, true).await;
let datanode_2 = self.start_server("datanode", &db_ctx, true).await;
let datanode_3 = self.start_server("datanode", &db_ctx, true).await;

let frontend = self.start_server("frontend", &db_ctx, true).await;
let frontend = self.start_server("frontend", &db_ctx, true).await;

let client = Client::with_urls(vec![SERVER_ADDR]);
let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
let client = Client::with_urls(vec![SERVER_ADDR]);
let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);

GreptimeDB {
server_processes: Some(Arc::new(Mutex::new(vec![
datanode_1, datanode_2, datanode_3,
]))),
metasrv_process: Some(meta_server),
frontend_process: Some(frontend),
client: TokioMutex::new(db),
ctx: db_ctx,
is_standalone: false,
env: self.clone(),
}
}
}

fn connect_db(&self, server_addr: &str) -> GreptimeDB {
let client = Client::with_urls(vec![server_addr.to_owned()]);
let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
GreptimeDB {
server_processes: Arc::new(Mutex::new(vec![datanode_1, datanode_2, datanode_3])),
metasrv_process: Some(meta_server),
frontend_process: Some(frontend),
client: TokioMutex::new(db),
ctx: db_ctx,
server_processes: None,
metasrv_process: None,
frontend_process: None,
ctx: GreptimeDBContext {
time: 0,
datanode_id: Default::default(),
},
is_standalone: false,
env: Env::new(self.data_home.clone()),
env: self.clone(),
}
}

Expand Down Expand Up @@ -244,9 +276,11 @@ impl Env {
/// stop and restart the server process
async fn restart_server(&self, db: &GreptimeDB) {
{
let mut server_processes = db.server_processes.lock().unwrap();
for server_process in server_processes.iter_mut() {
Env::stop_server(server_process);
if let Some(server_process) = db.server_processes.clone() {
let mut server_processes = server_process.lock().unwrap();
for server_process in server_processes.iter_mut() {
Env::stop_server(server_process);
}
}
}

Expand All @@ -265,8 +299,10 @@ impl Env {
processes
};

let mut server_processes = db.server_processes.lock().unwrap();
*server_processes = new_server_processes;
if let Some(server_process) = db.server_processes.clone() {
let mut server_processes = server_process.lock().unwrap();
*server_processes = new_server_processes;
}
}

/// Generate config file to `/tmp/{subcommand}-{current_time}.toml`
Expand Down Expand Up @@ -332,7 +368,7 @@ impl Env {
}

pub struct GreptimeDB {
server_processes: Arc<Mutex<Vec<Child>>>,
server_processes: Option<Arc<Mutex<Vec<Child>>>>,
metasrv_process: Option<Child>,
frontend_process: Option<Child>,
client: TokioMutex<DB>,
Expand All @@ -344,7 +380,7 @@ pub struct GreptimeDB {
#[async_trait]
impl Database for GreptimeDB {
async fn query(&self, ctx: QueryContext, query: String) -> Box<dyn Display> {
if ctx.context.contains_key("restart") {
if ctx.context.contains_key("restart") && self.env.server_addr.is_none() {
self.env.restart_server(self).await;
}

Expand Down Expand Up @@ -383,9 +419,11 @@ impl Database for GreptimeDB {
impl GreptimeDB {
#![allow(clippy::print_stdout)]
fn stop(&mut self) {
let mut servers = self.server_processes.lock().unwrap();
for server in servers.iter_mut() {
Env::stop_server(server);
if let Some(server_processes) = self.server_processes.clone() {
let mut server_processes = server_processes.lock().unwrap();
for server_process in server_processes.iter_mut() {
Env::stop_server(server_process);
}
}
if let Some(mut metasrv) = self.metasrv_process.take() {
Env::stop_server(&mut metasrv);
Expand All @@ -399,7 +437,9 @@ impl GreptimeDB {

impl Drop for GreptimeDB {
fn drop(&mut self) {
self.stop();
if self.env.server_addr.is_none() {
self.stop();
}
}
}

Expand Down
6 changes: 5 additions & 1 deletion tests/runner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ struct Args {
/// Name of test cases to run. Accept as a regexp.
#[clap(short, long, default_value = ".*")]
test_filter: String,

/// Address of the server
#[clap(short, long)]
server_addr: Option<String>,
}

#[tokio::main]
Expand All @@ -59,6 +63,6 @@ async fn main() {
.env_config_file(args.env_config_file)
.build()
.unwrap();
let runner = Runner::new(config, Env::new(data_home));
let runner = Runner::new(config, Env::new(data_home, args.server_addr));
runner.run().await.unwrap();
}
Loading