Skip to content

Commit

Permalink
refactor: options and sample configurations (#514)
Browse files Browse the repository at this point in the history
* refactor: options and sample configurations

* chore: newline at end of file

* chore: format code

* chore: remove comment and set sample configurations to default values

* chore: use single quoted string in sample configuration files
  • Loading branch information
killme2008 authored Nov 15, 2022
1 parent ba27e0d commit 6c6eeda
Show file tree
Hide file tree
Showing 15 changed files with 363 additions and 32 deletions.
221 changes: 219 additions & 2 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
node_id = 42
mode = 'distributed'
rpc_addr = '0.0.0.0:3001'
wal_dir = '/tmp/greptimedb/wal'
rpc_runtime_size = 8
mode = "standalone"
mysql_addr = '0.0.0.0:3306'
mysql_runtime_size = 4

Expand All @@ -11,7 +11,7 @@ type = 'File'
data_dir = '/tmp/greptimedb/data/'

[meta_client_opts]
metasrv_addr = "1.1.1.1:3002"
metasrv_addr = '1.1.1.1:3002'
timeout_millis = 3000
connect_timeout_millis = 5000
tcp_nodelay = true
tcp_nodelay = false
11 changes: 8 additions & 3 deletions config/frontend.example.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
mode = 'distributed'
datanode_rpc_addr = '127.0.0.1:3001'
http_addr = '0.0.0.0:4000'
grpc_addr = '0.0.0.0:4001'
mysql_addr = '0.0.0.0:4003'
mysql_runtime_size = 4

[meta_client_opts]
metasrv_addr = '1.1.1.1:3002'
timeout_millis = 3000
connect_timeout_millis = 5000
tcp_nodelay = false
28 changes: 24 additions & 4 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
@@ -1,14 +1,34 @@
node_id = 0
mode = 'standalone'
datanode_rpc_addr = '127.0.0.1:3001'
http_addr = '0.0.0.0:4000'
datanode_mysql_addr = '0.0.0.0:3306'
datanode_mysql_runtime_size = 4
wal_dir = '/tmp/greptimedb/wal/'

[storage]
type = 'File'
data_dir = '/tmp/greptimedb/data/'

[grpc_options]
addr = '0.0.0.0:4001'
runtime_size = 4
runtime_size = 8

[mysql_options]
addr = '0.0.0.0:4003'
runtime_size = 4
addr = '0.0.0.0:4002'
runtime_size = 2

[influxdb_options]
enable = true

[opentsdb_options]
addr = '0.0.0.0:4242'
enable = true
runtime_size = 2

[prometheus_options]
enable = true

[postgres_options]
addr = '0.0.0.0:4003'
runtime_size = 2
check_pwd = false
1 change: 1 addition & 0 deletions src/cmd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ datanode = { path = "../datanode" }
frontend = { path = "../frontend" }
futures = "0.3"
meta-srv = { path = "../meta-srv" }
serde = "1.0"
snafu = { version = "0.7", features = ["backtraces"] }
tokio = { version = "1.18", features = ["full"] }
toml = "0.5"
Expand Down
7 changes: 4 additions & 3 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ impl TryFrom<StartCommand> for DatanodeOptions {
// commandline options
opts.meta_client_opts.metasrv_addr = meta_addr.clone();
opts.node_id = node_id;
opts.mode = Mode::Distributed(vec![meta_addr]);
opts.metasrv_addr = Some(vec![meta_addr]);
opts.mode = Mode::Distributed;
}
(None, None) => {
opts.mode = Mode::Standalone;
Expand Down Expand Up @@ -140,7 +141,7 @@ mod tests {
);
assert_eq!(5000, options.meta_client_opts.connect_timeout_millis);
assert_eq!(3000, options.meta_client_opts.timeout_millis);
assert!(options.meta_client_opts.tcp_nodelay);
assert!(!options.meta_client_opts.tcp_nodelay);

match options.storage {
ObjectStoreConfig::File { data_dir } => {
Expand Down Expand Up @@ -173,7 +174,7 @@ mod tests {
})
.unwrap()
.mode;
assert_matches!(mode, Mode::Distributed(_));
assert_matches!(mode, Mode::Distributed);

assert!(DatanodeOptions::try_from(StartCommand {
node_id: None,
Expand Down
3 changes: 2 additions & 1 deletion src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,14 @@ impl TryFrom<StartCommand> for FrontendOptions {
opts.influxdb_options = Some(InfluxdbOptions { enable });
}
if let Some(metasrv_addr) = cmd.metasrv_addr {
opts.mode = Mode::Distributed(
opts.metasrv_addr = Some(
metasrv_addr
.split(',')
.into_iter()
.map(|x| x.trim().to_string())
.collect::<Vec<String>>(),
);
opts.mode = Mode::Distributed;
}
Ok(opts)
}
Expand Down
91 changes: 85 additions & 6 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use clap::Parser;
use common_telemetry::info;
use datanode::datanode::{Datanode, DatanodeOptions};
use datanode::datanode::{Datanode, DatanodeOptions, ObjectStoreConfig};
use datanode::instance::InstanceRef;
use frontend::frontend::{Frontend, FrontendOptions, Mode};
use frontend::grpc::GrpcOptions;
Expand All @@ -9,6 +9,8 @@ use frontend::instance::Instance as FeInstance;
use frontend::mysql::MysqlOptions;
use frontend::opentsdb::OpentsdbOptions;
use frontend::postgres::PostgresOptions;
use frontend::prometheus::PrometheusOptions;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use tokio::try_join;

Expand Down Expand Up @@ -42,6 +44,68 @@ impl SubCommand {
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct StandaloneOptions {
pub http_addr: Option<String>,
pub grpc_options: Option<GrpcOptions>,
pub mysql_options: Option<MysqlOptions>,
pub postgres_options: Option<PostgresOptions>,
pub opentsdb_options: Option<OpentsdbOptions>,
pub influxdb_options: Option<InfluxdbOptions>,
pub prometheus_options: Option<PrometheusOptions>,
pub mode: Mode,
pub wal_dir: String,
pub storage: ObjectStoreConfig,
pub datanode_mysql_addr: String,
pub datanode_mysql_runtime_size: usize,
}

impl Default for StandaloneOptions {
fn default() -> Self {
Self {
http_addr: Some("0.0.0.0:4000".to_string()),
grpc_options: Some(GrpcOptions::default()),
mysql_options: Some(MysqlOptions::default()),
postgres_options: Some(PostgresOptions::default()),
opentsdb_options: Some(OpentsdbOptions::default()),
influxdb_options: Some(InfluxdbOptions::default()),
prometheus_options: Some(PrometheusOptions::default()),
mode: Mode::Standalone,
wal_dir: "/tmp/greptimedb/wal".to_string(),
storage: ObjectStoreConfig::default(),
datanode_mysql_addr: "0.0.0.0:3306".to_string(),
datanode_mysql_runtime_size: 4,
}
}
}

impl StandaloneOptions {
fn frontend_options(self) -> FrontendOptions {
FrontendOptions {
http_addr: self.http_addr,
grpc_options: self.grpc_options,
mysql_options: self.mysql_options,
postgres_options: self.postgres_options,
opentsdb_options: self.opentsdb_options,
influxdb_options: self.influxdb_options,
prometheus_options: self.prometheus_options,
mode: self.mode,
datanode_rpc_addr: "127.0.0.1:3001".to_string(),
metasrv_addr: None,
}
}

fn datanode_options(self) -> DatanodeOptions {
DatanodeOptions {
wal_dir: self.wal_dir,
storage: self.storage,
mysql_addr: self.datanode_mysql_addr,
mysql_runtime_size: self.datanode_mysql_runtime_size,
..Default::default()
}
}
}

#[derive(Debug, Parser)]
struct StartCommand {
#[clap(long)]
Expand All @@ -62,8 +126,21 @@ struct StartCommand {

impl StartCommand {
async fn run(self) -> Result<()> {
let config_file = self.config_file.clone();
let fe_opts = FrontendOptions::try_from(self)?;
let dn_opts = DatanodeOptions::default();
let dn_opts: DatanodeOptions = {
let opts: StandaloneOptions = if let Some(path) = config_file {
toml_loader::from_file!(&path)?
} else {
StandaloneOptions::default()
};
opts.datanode_options()
};

info!(
"Standalone frontend options: {:#?}, datanode options: {:#?}",
fe_opts, dn_opts
);

let mut datanode = Datanode::new(dn_opts.clone())
.await
Expand Down Expand Up @@ -102,12 +179,14 @@ impl TryFrom<StartCommand> for FrontendOptions {
type Error = Error;

fn try_from(cmd: StartCommand) -> std::result::Result<Self, Self::Error> {
let mut opts: FrontendOptions = if let Some(path) = cmd.config_file {
let opts: StandaloneOptions = if let Some(path) = cmd.config_file {
toml_loader::from_file!(&path)?
} else {
FrontendOptions::default()
StandaloneOptions::default()
};

let mut opts = opts.frontend_options();

opts.mode = Mode::Standalone;

if let Some(addr) = cmd.http_addr {
Expand Down Expand Up @@ -186,8 +265,8 @@ mod tests {
"0.0.0.0:4001".to_string(),
fe_opts.grpc_options.unwrap().addr
);
assert_eq!("0.0.0.0:4003", fe_opts.mysql_options.as_ref().unwrap().addr);
assert_eq!(4, fe_opts.mysql_options.as_ref().unwrap().runtime_size);
assert_eq!("0.0.0.0:4002", fe_opts.mysql_options.as_ref().unwrap().addr);
assert_eq!(2, fe_opts.mysql_options.as_ref().unwrap().runtime_size);
assert!(fe_opts.influxdb_options.as_ref().unwrap().enable);
}
}
2 changes: 2 additions & 0 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub struct DatanodeOptions {
pub wal_dir: String,
pub storage: ObjectStoreConfig,
pub mode: Mode,
pub metasrv_addr: Option<Vec<String>>,
}

impl Default for DatanodeOptions {
Expand All @@ -48,6 +49,7 @@ impl Default for DatanodeOptions {
wal_dir: "/tmp/greptimedb/wal".to_string(),
storage: ObjectStoreConfig::default(),
mode: Mode::Standalone,
metasrv_addr: None,
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/datanode/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl Instance {

let meta_client = match opts.mode {
Mode::Standalone => None,
Mode::Distributed(_) => {
Mode::Distributed => {
let meta_client = new_metasrv_client(opts.node_id, &opts.meta_client_opts).await?;
Some(Arc::new(meta_client))
}
Expand Down Expand Up @@ -89,7 +89,7 @@ impl Instance {
)
}

Mode::Distributed(_) => {
Mode::Distributed => {
let catalog = Arc::new(catalog::remote::RemoteCatalogManager::new(
table_engine.clone(),
opts.node_id,
Expand All @@ -108,7 +108,7 @@ impl Instance {

let heartbeat_task = match opts.mode {
Mode::Standalone => None,
Mode::Distributed(_) => Some(HeartbeatTask::new(
Mode::Distributed => Some(HeartbeatTask::new(
opts.node_id, /*node id not set*/
opts.rpc_addr.clone(),
meta_client.as_ref().unwrap().clone(),
Expand Down
5 changes: 3 additions & 2 deletions src/frontend/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub struct FrontendOptions {
pub prometheus_options: Option<PrometheusOptions>,
pub mode: Mode,
pub datanode_rpc_addr: String,
pub metasrv_addr: Option<Vec<String>>,
}

impl Default for FrontendOptions {
Expand All @@ -38,6 +39,7 @@ impl Default for FrontendOptions {
prometheus_options: Some(PrometheusOptions::default()),
mode: Mode::Standalone,
datanode_rpc_addr: "127.0.0.1:3001".to_string(),
metasrv_addr: None,
}
}
}
Expand Down Expand Up @@ -85,6 +87,5 @@ where
#[serde(rename_all = "lowercase")]
pub enum Mode {
Standalone,
// with meta server's addr
Distributed(Vec<String>),
Distributed,
}
8 changes: 6 additions & 2 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,11 @@ impl Instance {

instance.dist_instance = match &opts.mode {
Mode::Standalone => None,
Mode::Distributed(metasrv_addr) => {
Mode::Distributed => {
let metasrv_addr = opts
.metasrv_addr
.clone()
.expect("Forgot to set metasrv_addr");
info!(
"Creating Frontend instance in distributed mode with Meta server addr {:?}",
metasrv_addr
Expand Down Expand Up @@ -557,7 +561,7 @@ impl SqlQueryHandler for Instance {
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })
}
Mode::Distributed(_) => {
Mode::Distributed => {
let affected = self
.sql_dist_insert(insert)
.await
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/instance/influxdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl InfluxdbLineProtocolHandler for Instance {
query: &request.lines,
})?;
}
Mode::Distributed(_) => {
Mode::Distributed => {
self.dist_insert(request.try_into()?)
.await
.map_err(BoxedError::new)
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/instance/opentsdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl OpentsdbProtocolHandler for Instance {
data_point: format!("{:?}", data_point),
})?;
}
Mode::Distributed(_) => {
Mode::Distributed => {
self.dist_insert(vec![data_point.as_grpc_insert()])
.await
.map_err(BoxedError::new)
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/instance/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl PrometheusProtocolHandler for Instance {
msg: "failed to write prometheus remote request",
})?;
}
Mode::Distributed(_) => {
Mode::Distributed => {
let inserts = prometheus::write_request_to_insert_exprs(database, request)?;

self.dist_insert(inserts)
Expand Down

0 comments on commit 6c6eeda

Please sign in to comment.