Skip to content

Commit

Permalink
refactor: make instance started separately, to support further integr…
Browse files Browse the repository at this point in the history
…ated into other binaries
  • Loading branch information
MichaelScofield committed Dec 12, 2023
1 parent 47e5154 commit a023d1a
Show file tree
Hide file tree
Showing 12 changed files with 386 additions and 303 deletions.
203 changes: 40 additions & 163 deletions src/cmd/src/bin/greptime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,80 +15,12 @@
#![doc = include_str!("../../../../README.md")]

use std::fmt;
use std::sync::Arc;

use clap::Parser;
use clap::{FromArgMatches, Parser, Subcommand};
use cmd::error::Result;
use cmd::options::{Options, TopLevelOptions};
use cmd::{cli, datanode, frontend, metasrv, standalone};
use common_telemetry::logging::{error, info, TracingOptions};

lazy_static::lazy_static! {
static ref APP_VERSION: prometheus::IntGaugeVec =
prometheus::register_int_gauge_vec!("app_version", "app version", &["short_version", "version"]).unwrap();
}

#[derive(Parser)]
#[clap(name = "greptimedb", version = print_version())]
struct Command {
#[clap(long)]
log_dir: Option<String>,
#[clap(long)]
log_level: Option<String>,
#[clap(subcommand)]
subcmd: SubCommand,

#[cfg(feature = "tokio-console")]
#[clap(long)]
tokio_console_addr: Option<String>,
}

pub enum Application {
Datanode(datanode::Instance),
Frontend(frontend::Instance),
Metasrv(metasrv::Instance),
Standalone(standalone::Instance),
Cli(cli::Instance),
}

impl Application {
async fn start(&mut self) -> Result<()> {
match self {
Application::Datanode(instance) => instance.start().await,
Application::Frontend(instance) => instance.start().await,
Application::Metasrv(instance) => instance.start().await,
Application::Standalone(instance) => instance.start().await,
Application::Cli(instance) => instance.start().await,
}
}

async fn stop(&self) -> Result<()> {
match self {
Application::Datanode(instance) => instance.stop().await,
Application::Frontend(instance) => instance.stop().await,
Application::Metasrv(instance) => instance.stop().await,
Application::Standalone(instance) => instance.stop().await,
Application::Cli(instance) => instance.stop().await,
}
}
}

impl Command {
async fn build(self, opts: Options) -> Result<Application> {
self.subcmd.build(opts).await
}

fn load_options(&self) -> Result<Options> {
let top_level_opts = self.top_level_options();
self.subcmd.load_options(top_level_opts)
}

fn top_level_options(&self) -> TopLevelOptions {
TopLevelOptions {
log_dir: self.log_dir.clone(),
log_level: self.log_level.clone(),
}
}
}
use cmd::options::{CliOptions, Options};
use cmd::{cli, datanode, frontend, greptimedb_cli, metasrv, standalone, start_app, App};

#[derive(Parser)]
enum SubCommand {
Expand All @@ -105,40 +37,41 @@ enum SubCommand {
}

impl SubCommand {
async fn build(self, opts: Options) -> Result<Application> {
match (self, opts) {
async fn build(self, opts: Options) -> Result<Arc<dyn App>> {
let app: Arc<dyn App> = match (self, opts) {
(SubCommand::Datanode(cmd), Options::Datanode(dn_opts)) => {
let app = cmd.build(*dn_opts).await?;
Ok(Application::Datanode(app))
Arc::new(app) as _
}
(SubCommand::Frontend(cmd), Options::Frontend(fe_opts)) => {
let app = cmd.build(*fe_opts).await?;
Ok(Application::Frontend(app))
Arc::new(app) as _
}
(SubCommand::Metasrv(cmd), Options::Metasrv(meta_opts)) => {
let app = cmd.build(*meta_opts).await?;
Ok(Application::Metasrv(app))
Arc::new(app) as _
}
(SubCommand::Standalone(cmd), Options::Standalone(opts)) => {
let app = cmd.build(*opts).await?;
Ok(Application::Standalone(app))
Arc::new(app) as _
}
(SubCommand::Cli(cmd), Options::Cli(_)) => {
let app = cmd.build().await?;
Ok(Application::Cli(app))
Arc::new(app) as _
}

_ => unreachable!(),
}
};
Ok(app)
}

fn load_options(&self, top_level_opts: TopLevelOptions) -> Result<Options> {
fn load_options(&self, cli_options: &CliOptions) -> Result<Options> {
match self {
SubCommand::Datanode(cmd) => cmd.load_options(top_level_opts),
SubCommand::Frontend(cmd) => cmd.load_options(top_level_opts),
SubCommand::Metasrv(cmd) => cmd.load_options(top_level_opts),
SubCommand::Standalone(cmd) => cmd.load_options(top_level_opts),
SubCommand::Cli(cmd) => cmd.load_options(top_level_opts),
SubCommand::Datanode(cmd) => cmd.load_options(cli_options),
SubCommand::Frontend(cmd) => cmd.load_options(cli_options),
SubCommand::Metasrv(cmd) => cmd.load_options(cli_options),
SubCommand::Standalone(cmd) => cmd.load_options(cli_options),
SubCommand::Cli(cmd) => cmd.load_options(cli_options),
}
}
}
Expand All @@ -155,90 +88,34 @@ impl fmt::Display for SubCommand {
}
}

fn print_version() -> &'static str {
concat!(
"\nbranch: ",
env!("GIT_BRANCH"),
"\ncommit: ",
env!("GIT_COMMIT"),
"\ndirty: ",
env!("GIT_DIRTY"),
"\nversion: ",
env!("CARGO_PKG_VERSION")
)
}

fn short_version() -> &'static str {
env!("CARGO_PKG_VERSION")
}

// {app_name}-{branch_name}-{commit_short}
// The branch name (tag) of a release build should already contain the short
// version so the full version doesn't concat the short version explicitly.
fn full_version() -> &'static str {
concat!(
"greptimedb-",
env!("GIT_BRANCH"),
"-",
env!("GIT_COMMIT_SHORT")
)
}

fn log_env_flags() {
info!("command line arguments");
for argument in std::env::args() {
info!("argument: {}", argument);
}
}

#[cfg(not(windows))]
#[global_allocator]
static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;

#[tokio::main]
async fn main() -> Result<()> {
let cmd = Command::parse();
let app_name = &cmd.subcmd.to_string();

let opts = cmd.load_options()?;
let logging_opts = opts.logging_options();
let tracing_opts = TracingOptions {
#[cfg(feature = "tokio-console")]
tokio_console_addr: cmd.tokio_console_addr.clone(),
let cli = greptimedb_cli();

let cli = SubCommand::augment_subcommands(cli);

let args = cli.get_matches();

let subcmd = match SubCommand::from_arg_matches(&args) {
Ok(subcmd) => subcmd,
Err(e) => e.exit(),
};

common_telemetry::set_panic_hook();
let _guard =
common_telemetry::init_global_logging(app_name, logging_opts, tracing_opts, opts.node_id());

// Report app version as gauge.
APP_VERSION
.with_label_values(&[short_version(), full_version()])
.inc();

// Log version and argument flags.
info!(
"short_version: {}, full_version: {}",
short_version(),
full_version()
);
log_env_flags();

let mut app = cmd.build(opts).await?;

tokio::select! {
result = app.start() => {
if let Err(err) = result {
error!(err; "Fatal error occurs!");
}
}
_ = tokio::signal::ctrl_c() => {
if let Err(err) = app.stop().await {
error!(err; "Fatal error occurs!");
}
info!("Goodbye!");
}
}
let app_name = subcmd.to_string();

let cli_options = CliOptions::new(&args);

let opts = subcmd.load_options(&cli_options)?;

let logging_options = opts.logging_options().clone();

let node_id = opts.node_id();

let app = subcmd.build(opts).await?;

Ok(())
start_app(app_name, app, cli_options, &logging_options, node_id).await
}
50 changes: 31 additions & 19 deletions src/cmd/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,22 @@
// limitations under the License.

mod bench;

// Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373
#[allow(unused)]
mod cmd;
mod export;
mod helper;

// Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373
#[allow(unused)]
mod repl;
// TODO(weny): Removes it
#[allow(deprecated)]
mod upgrade;

use std::sync::Arc;

use async_trait::async_trait;
use bench::BenchTableMetadataCommand;
use clap::Parser;
Expand All @@ -30,27 +38,31 @@ use upgrade::UpgradeCommand;

use self::export::ExportCommand;
use crate::error::Result;
use crate::options::{Options, TopLevelOptions};
use crate::options::{CliOptions, Options};
use crate::App;

#[async_trait]
pub trait Tool {
pub trait Tool: Send + Sync {
async fn do_work(&self) -> Result<()>;
}

pub enum Instance {
Repl(Repl),
Tool(Box<dyn Tool>),
pub struct Instance {
tool: Arc<dyn Tool>,
}

impl Instance {
pub async fn start(&mut self) -> Result<()> {
match self {
Instance::Repl(repl) => repl.run().await,
Instance::Tool(tool) => tool.do_work().await,
}
fn new(tool: Arc<dyn Tool>) -> Self {
Self { tool }
}
}

pub async fn stop(&self) -> Result<()> {
#[async_trait]
impl App for Instance {
async fn start(&self) -> Result<()> {
self.tool.do_work().await
}

async fn stop(&self) -> Result<()> {
Ok(())
}
}
Expand All @@ -66,14 +78,15 @@ impl Command {
self.cmd.build().await
}

pub fn load_options(&self, top_level_opts: TopLevelOptions) -> Result<Options> {
pub fn load_options(&self, cli_options: &CliOptions) -> Result<Options> {
let mut logging_opts = LoggingOptions::default();
if let Some(dir) = top_level_opts.log_dir {
logging_opts.dir = dir;
}
if top_level_opts.log_level.is_some() {
logging_opts.level = top_level_opts.log_level;

if let Some(dir) = &cli_options.log_dir {
logging_opts.dir = dir.clone();
}

logging_opts.level = cli_options.log_level.clone();

Ok(Options::Cli(Box::new(logging_opts)))
}
}
Expand Down Expand Up @@ -110,7 +123,6 @@ pub(crate) struct AttachCommand {
impl AttachCommand {
#[allow(dead_code)]
async fn build(self) -> Result<Instance> {
let repl = Repl::try_new(&self).await?;
Ok(Instance::Repl(repl))
unimplemented!("Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373")
}
}
2 changes: 1 addition & 1 deletion src/cmd/src/cli/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl BenchTableMetadataCommand {
table_metadata_manager,
count: self.count,
};
Ok(Instance::Tool(Box::new(tool)))
Ok(Instance::new(Arc::new(tool)))
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/cmd/src/cli/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl ExportCommand {
}));
}

Ok(Instance::Tool(Box::new(Export {
Ok(Instance::new(Arc::new(Export {
client: database_client,
catalog,
schema,
Expand Down
2 changes: 1 addition & 1 deletion src/cmd/src/cli/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl UpgradeCommand {
skip_schema_keys: self.skip_schema_keys,
skip_table_route_keys: self.skip_table_route_keys,
};
Ok(Instance::Tool(Box::new(tool)))
Ok(Instance::new(Arc::new(tool)))
}
}

Expand Down
Loading

0 comments on commit a023d1a

Please sign in to comment.