diff --git a/Cargo.lock b/Cargo.lock index 658cd4529c34..b1a8f2b8c33b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -119,6 +119,7 @@ name = "api" version = "0.1.0" dependencies = [ "common-base", + "common-error", "common-time", "datatypes", "prost 0.11.0", @@ -276,6 +277,36 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-io" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8121296a9f05be7f34aa4196b1747243b3b62e048bb7906f644f3fbfc490cf7" +dependencies = [ + "async-lock", + "autocfg", + "concurrent-queue", + "futures-lite", + "libc", + "log", + "parking", + "polling", + "slab", + "socket2", + "waker-fn", + "winapi", +] + +[[package]] +name = "async-lock" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8101efe8695a6c17e02911402145357e718ac92d3ff88ae8419e84b1707b685" +dependencies = [ + "event-listener", + "futures-lite", +] + [[package]] name = "async-stream" version = "0.3.3" @@ -657,6 +688,12 @@ version = "3.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1ad822118d20d2c234f427000d5acc36eabe1e29a348c89b63dd60b13f28e5d" +[[package]] +name = "bytecount" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c676a478f63e9fa2dd5368a42f28bba0d6c560b775f38583c8bbaa7fcd67c9c" + [[package]] name = "bytemuck" version = "1.12.1" @@ -698,6 +735,37 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1db59621ec70f09c5e9b597b220c7a2b43611f4710dc03ceb8748637775692c" +[[package]] +name = "camino" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88ad0e1e3e88dd237a156ab9f571021b8a158caa0ae44b1968a241efb5144c1e" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo-platform" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cbdb825da8a5df079a43676dbe042702f1707b1109f713a01420fbb4cc71fa27" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo_metadata" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa" +dependencies = [ + "camino", + "cargo-platform", + "semver", + "serde", + "serde_json", +] + [[package]] name = "caseless" version = "0.2.1" @@ -958,6 +1026,7 @@ dependencies = [ "common-base", "common-error", "common-grpc", + "common-insert", "common-query", "common-recordbatch", "common-time", @@ -1660,6 +1729,7 @@ dependencies = [ "datafusion", "datafusion-common", "datatypes", + "frontend", "futures", "hyper", "log-store", @@ -1878,6 +1948,15 @@ dependencies = [ "libc", ] +[[package]] +name = "error-chain" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc" +dependencies = [ + "version_check", +] + [[package]] name = "error-code" version = "2.3.1" @@ -2015,11 +2094,13 @@ dependencies = [ "async-stream", "async-trait", "catalog", + "chrono", "client", "common-base", "common-catalog", "common-error", "common-grpc", + "common-insert", "common-query", "common-recordbatch", "common-runtime", @@ -2031,8 +2112,11 @@ dependencies = [ "datanode", "datatypes", "futures", + "futures-util", "itertools", "meta-client", + "meta-srv", + "moka", "openmetrics-parser", "prost 0.11.0", "query", @@ -2174,6 +2258,21 @@ version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbf4d2a7a308fd4578637c0b17c7e1c7ba127b8f6ba00b29f717e9655d85eb68" +[[package]] +name = "futures-lite" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + [[package]] name = "futures-macro" version = "0.3.24" @@ -2945,6 +3044,7 @@ dependencies = [ "futures", "meta-srv", "rand 0.8.5", + "serde", "snafu", "tokio", "tokio-stream", @@ -3084,6 +3184,32 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "moka" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b49a05f67020456541f4f29cbaa812016a266a86ec76f96d3873d459c68fe5e" +dependencies = [ + "async-io", + "async-lock", + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "futures-util", + "num_cpus", + "once_cell", + "parking_lot", + "quanta", + "rustc_version", + "scheduled-thread-pool", + "skeptic", + "smallvec", + "tagptr", + "thiserror", + "triomphe", + "uuid", +] + [[package]] name = "multimap" version = "0.8.3" @@ -3558,6 +3684,12 @@ version = "6.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ff7415e9ae3fff1225851df9e0d9e4e5479f947619774677a63572e55e80eff" +[[package]] +name = "parking" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" + [[package]] name = "parking_lot" version = "0.12.1" @@ -3738,9 +3870,9 @@ dependencies = [ [[package]] name = "pgwire" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e30e99a0b8acf60a6815aa8178e9ffb08178ef3ca1366673bb0d6c7ababe4c2" +checksum = "5dacbf864d6cb6a0e676c9a1162ab7b315b5c8e6c87fa9b6e0ba9ba0a569adb1" dependencies = [ "async-trait", "bytes", @@ -3753,6 +3885,7 @@ dependencies = [ "thiserror", "time 0.3.14", "tokio", + "tokio-rustls", "tokio-util", ] @@ -3919,6 +4052,20 @@ dependencies = [ "syn", ] +[[package]] +name = "polling" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab4609a838d88b73d8238967b60dd115cc08d38e2bbaf51ee1e4b695f89122e2" +dependencies = [ + "autocfg", + "cfg-if", + "libc", + "log", + "wepoll-ffi", + "winapi", +] + [[package]] name = "portable-atomic" version = "0.3.15" @@ -4126,6 +4273,17 @@ dependencies = [ "prost 0.11.0", ] +[[package]] +name = "pulldown-cmark" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d9cc634bc78768157b5cbfe988ffcd1dcba95cd2b2f03a88316c08c6d00ed63" +dependencies = [ + "bitflags", + "memchr", + "unicase", +] + [[package]] name = "quanta" version = "0.10.1" @@ -4729,6 +4887,10 @@ dependencies = [ name = "rustpython-pylib" version = "0.1.0" source = "git+https://github.com/RustPython/RustPython?rev=02a1d1d#02a1d1d7db57afbb78049599c2585cc7cd59e6d3" +dependencies = [ + "rustpython-bytecode", + "rustpython-derive", +] [[package]] name = "rustpython-vm" @@ -4865,6 +5027,15 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "scheduled-thread-pool" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "977a7519bff143a44f842fd07e80ad1329295bd71686457f18e496736f4bf9bf" +dependencies = [ + "parking_lot", +] + [[package]] name = "schemars" version = "0.8.11" @@ -4976,6 +5147,9 @@ name = "semver" version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e25dfac463d778e353db5be2449d1cce89bd6fd23c9f1ea21310ce6e5a1b29c4" +dependencies = [ + "serde", +] [[package]] name = "serde" @@ -5189,6 +5363,21 @@ version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de" +[[package]] +name = "skeptic" +version = "0.13.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8" +dependencies = [ + "bytecount", + "cargo_metadata", + "error-chain", + "glob", + "pulldown-cmark", + "tempfile", + "walkdir", +] + [[package]] name = "sketches-ddsketch" version = "0.2.0" @@ -5270,6 +5459,7 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" name = "sql" version = "0.1.0" dependencies = [ + "api", "catalog", "common-catalog", "common-error", @@ -5615,6 +5805,12 @@ dependencies = [ "tokio", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tap" version = "1.0.1" @@ -6188,6 +6384,12 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "triomphe" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1ee9bd9239c339d714d657fac840c6d2a4f9c45f4f9ec7b0975113458be78db" + [[package]] name = "try-lock" version = "0.2.3" @@ -6444,6 +6646,12 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8e76fae08f03f96e166d2dfda232190638c10e0383841252416f9cfe2ae60e6" +[[package]] +name = "waker-fn" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" + [[package]] name = "walkdir" version = "2.3.2" @@ -6572,6 +6780,15 @@ dependencies = [ "webpki", ] +[[package]] +name = "wepoll-ffi" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d743fdedc5c64377b5fc2bc036b01c7fd642205a0d96356034ae3404d49eb7fb" +dependencies = [ + "cc", +] + [[package]] name = "which" version = "4.3.0" diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 16063edee066..e0503f3d8d2c 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -1,8 +1,8 @@ node_id = 42 +mode = 'distributed' rpc_addr = '0.0.0.0:3001' wal_dir = '/tmp/greptimedb/wal' rpc_runtime_size = 8 -mode = "standalone" mysql_addr = '0.0.0.0:3306' mysql_runtime_size = 4 @@ -11,7 +11,7 @@ type = 'File' data_dir = '/tmp/greptimedb/data/' [meta_client_opts] -metasrv_addr = "1.1.1.1:3002" +metasrv_addr = '1.1.1.1:3002' timeout_millis = 3000 connect_timeout_millis = 5000 -tcp_nodelay = true +tcp_nodelay = false diff --git a/config/frontend.example.toml b/config/frontend.example.toml index d8a9fb2b333c..a9cb1e969f0d 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -1,4 +1,9 @@ +mode = 'distributed' +datanode_rpc_addr = '127.0.0.1:3001' http_addr = '0.0.0.0:4000' -grpc_addr = '0.0.0.0:4001' -mysql_addr = '0.0.0.0:4003' -mysql_runtime_size = 4 + +[meta_client_opts] +metasrv_addr = '1.1.1.1:3002' +timeout_millis = 3000 +connect_timeout_millis = 5000 +tcp_nodelay = false diff --git a/config/standalone.example.toml b/config/standalone.example.toml index eec26b1f59f7..1c3d3f3fad5b 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -1,14 +1,34 @@ +node_id = 0 mode = 'standalone' -datanode_rpc_addr = '127.0.0.1:3001' http_addr = '0.0.0.0:4000' +datanode_mysql_addr = '0.0.0.0:3306' +datanode_mysql_runtime_size = 4 +wal_dir = '/tmp/greptimedb/wal/' + +[storage] +type = 'File' +data_dir = '/tmp/greptimedb/data/' [grpc_options] addr = '0.0.0.0:4001' -runtime_size = 4 +runtime_size = 8 [mysql_options] -addr = '0.0.0.0:4003' -runtime_size = 4 +addr = '0.0.0.0:4002' +runtime_size = 2 [influxdb_options] enable = true + +[opentsdb_options] +addr = '0.0.0.0:4242' +enable = true +runtime_size = 2 + +[prometheus_options] +enable = true + +[postgres_options] +addr = '0.0.0.0:4003' +runtime_size = 2 +check_pwd = false diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index 03a3bfcac27c..6da004f16956 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -18,6 +18,7 @@ datanode = { path = "../datanode" } frontend = { path = "../frontend" } futures = "0.3" meta-srv = { path = "../meta-srv" } +serde = "1.0" snafu = { version = "0.7", features = ["backtraces"] } tokio = { version = "1.18", features = ["full"] } toml = "0.5" diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 9ab84d1ff762..639899a8aa1c 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -86,7 +86,8 @@ impl TryFrom for DatanodeOptions { // commandline options opts.meta_client_opts.metasrv_addr = meta_addr.clone(); opts.node_id = node_id; - opts.mode = Mode::Distributed(vec![meta_addr]); + opts.metasrv_addr = Some(vec![meta_addr]); + opts.mode = Mode::Distributed; } (None, None) => { opts.mode = Mode::Standalone; @@ -140,7 +141,7 @@ mod tests { ); assert_eq!(5000, options.meta_client_opts.connect_timeout_millis); assert_eq!(3000, options.meta_client_opts.timeout_millis); - assert!(options.meta_client_opts.tcp_nodelay); + assert!(!options.meta_client_opts.tcp_nodelay); match options.storage { ObjectStoreConfig::File { data_dir } => { @@ -173,7 +174,7 @@ mod tests { }) .unwrap() .mode; - assert_matches!(mode, Mode::Distributed(_)); + assert_matches!(mode, Mode::Distributed); assert!(DatanodeOptions::try_from(StartCommand { node_id: None, diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 52d4dbfcd266..b20e3c0e80e3 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -110,13 +110,14 @@ impl TryFrom for FrontendOptions { opts.influxdb_options = Some(InfluxdbOptions { enable }); } if let Some(metasrv_addr) = cmd.metasrv_addr { - opts.mode = Mode::Distributed( + opts.metasrv_addr = Some( metasrv_addr .split(',') .into_iter() .map(|x| x.trim().to_string()) .collect::>(), ); + opts.mode = Mode::Distributed; } Ok(opts) } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 1e8e122ef114..432af68ce285 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -1,6 +1,6 @@ use clap::Parser; use common_telemetry::info; -use datanode::datanode::{Datanode, DatanodeOptions}; +use datanode::datanode::{Datanode, DatanodeOptions, ObjectStoreConfig}; use datanode::instance::InstanceRef; use frontend::frontend::{Frontend, FrontendOptions, Mode}; use frontend::grpc::GrpcOptions; @@ -9,6 +9,8 @@ use frontend::instance::Instance as FeInstance; use frontend::mysql::MysqlOptions; use frontend::opentsdb::OpentsdbOptions; use frontend::postgres::PostgresOptions; +use frontend::prometheus::PrometheusOptions; +use serde::{Deserialize, Serialize}; use snafu::ResultExt; use tokio::try_join; @@ -42,6 +44,68 @@ impl SubCommand { } } +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct StandaloneOptions { + pub http_addr: Option, + pub grpc_options: Option, + pub mysql_options: Option, + pub postgres_options: Option, + pub opentsdb_options: Option, + pub influxdb_options: Option, + pub prometheus_options: Option, + pub mode: Mode, + pub wal_dir: String, + pub storage: ObjectStoreConfig, + pub datanode_mysql_addr: String, + pub datanode_mysql_runtime_size: usize, +} + +impl Default for StandaloneOptions { + fn default() -> Self { + Self { + http_addr: Some("0.0.0.0:4000".to_string()), + grpc_options: Some(GrpcOptions::default()), + mysql_options: Some(MysqlOptions::default()), + postgres_options: Some(PostgresOptions::default()), + opentsdb_options: Some(OpentsdbOptions::default()), + influxdb_options: Some(InfluxdbOptions::default()), + prometheus_options: Some(PrometheusOptions::default()), + mode: Mode::Standalone, + wal_dir: "/tmp/greptimedb/wal".to_string(), + storage: ObjectStoreConfig::default(), + datanode_mysql_addr: "0.0.0.0:3306".to_string(), + datanode_mysql_runtime_size: 4, + } + } +} + +impl StandaloneOptions { + fn frontend_options(self) -> FrontendOptions { + FrontendOptions { + http_addr: self.http_addr, + grpc_options: self.grpc_options, + mysql_options: self.mysql_options, + postgres_options: self.postgres_options, + opentsdb_options: self.opentsdb_options, + influxdb_options: self.influxdb_options, + prometheus_options: self.prometheus_options, + mode: self.mode, + datanode_rpc_addr: "127.0.0.1:3001".to_string(), + metasrv_addr: None, + } + } + + fn datanode_options(self) -> DatanodeOptions { + DatanodeOptions { + wal_dir: self.wal_dir, + storage: self.storage, + mysql_addr: self.datanode_mysql_addr, + mysql_runtime_size: self.datanode_mysql_runtime_size, + ..Default::default() + } + } +} + #[derive(Debug, Parser)] struct StartCommand { #[clap(long)] @@ -62,8 +126,21 @@ struct StartCommand { impl StartCommand { async fn run(self) -> Result<()> { + let config_file = self.config_file.clone(); let fe_opts = FrontendOptions::try_from(self)?; - let dn_opts = DatanodeOptions::default(); + let dn_opts: DatanodeOptions = { + let opts: StandaloneOptions = if let Some(path) = config_file { + toml_loader::from_file!(&path)? + } else { + StandaloneOptions::default() + }; + opts.datanode_options() + }; + + info!( + "Standalone frontend options: {:#?}, datanode options: {:#?}", + fe_opts, dn_opts + ); let mut datanode = Datanode::new(dn_opts.clone()) .await @@ -102,12 +179,14 @@ impl TryFrom for FrontendOptions { type Error = Error; fn try_from(cmd: StartCommand) -> std::result::Result { - let mut opts: FrontendOptions = if let Some(path) = cmd.config_file { + let opts: StandaloneOptions = if let Some(path) = cmd.config_file { toml_loader::from_file!(&path)? } else { - FrontendOptions::default() + StandaloneOptions::default() }; + let mut opts = opts.frontend_options(); + opts.mode = Mode::Standalone; if let Some(addr) = cmd.http_addr { @@ -186,8 +265,8 @@ mod tests { "0.0.0.0:4001".to_string(), fe_opts.grpc_options.unwrap().addr ); - assert_eq!("0.0.0.0:4003", fe_opts.mysql_options.as_ref().unwrap().addr); - assert_eq!(4, fe_opts.mysql_options.as_ref().unwrap().runtime_size); + assert_eq!("0.0.0.0:4002", fe_opts.mysql_options.as_ref().unwrap().addr); + assert_eq!(2, fe_opts.mysql_options.as_ref().unwrap().runtime_size); assert!(fe_opts.influxdb_options.as_ref().unwrap().enable); } } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 59ceb64234ef..669ac2bf949a 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -34,6 +34,7 @@ pub struct DatanodeOptions { pub wal_dir: String, pub storage: ObjectStoreConfig, pub mode: Mode, + pub metasrv_addr: Option>, } impl Default for DatanodeOptions { @@ -48,6 +49,7 @@ impl Default for DatanodeOptions { wal_dir: "/tmp/greptimedb/wal".to_string(), storage: ObjectStoreConfig::default(), mode: Mode::Standalone, + metasrv_addr: None, } } } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 98bfe85e2014..a0543fd93402 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -57,7 +57,7 @@ impl Instance { let meta_client = match opts.mode { Mode::Standalone => None, - Mode::Distributed(_) => { + Mode::Distributed => { let meta_client = new_metasrv_client(opts.node_id, &opts.meta_client_opts).await?; Some(Arc::new(meta_client)) } @@ -89,7 +89,7 @@ impl Instance { ) } - Mode::Distributed(_) => { + Mode::Distributed => { let catalog = Arc::new(catalog::remote::RemoteCatalogManager::new( table_engine.clone(), opts.node_id, @@ -108,7 +108,7 @@ impl Instance { let heartbeat_task = match opts.mode { Mode::Standalone => None, - Mode::Distributed(_) => Some(HeartbeatTask::new( + Mode::Distributed => Some(HeartbeatTask::new( opts.node_id, /*node id not set*/ opts.rpc_addr.clone(), meta_client.as_ref().unwrap().clone(), diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index 31519c0313d2..2b3a36e3a170 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -24,6 +24,7 @@ pub struct FrontendOptions { pub prometheus_options: Option, pub mode: Mode, pub datanode_rpc_addr: String, + pub metasrv_addr: Option>, } impl Default for FrontendOptions { @@ -38,6 +39,7 @@ impl Default for FrontendOptions { prometheus_options: Some(PrometheusOptions::default()), mode: Mode::Standalone, datanode_rpc_addr: "127.0.0.1:3001".to_string(), + metasrv_addr: None, } } } @@ -85,6 +87,5 @@ where #[serde(rename_all = "lowercase")] pub enum Mode { Standalone, - // with meta server's addr - Distributed(Vec), + Distributed, } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 375c5e0db574..687c21fc8bf9 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -113,7 +113,11 @@ impl Instance { instance.dist_instance = match &opts.mode { Mode::Standalone => None, - Mode::Distributed(metasrv_addr) => { + Mode::Distributed => { + let metasrv_addr = opts + .metasrv_addr + .clone() + .expect("Forgot to set metasrv_addr"); info!( "Creating Frontend instance in distributed mode with Meta server addr {:?}", metasrv_addr @@ -557,7 +561,7 @@ impl SqlQueryHandler for Instance { .map_err(BoxedError::new) .context(server_error::ExecuteQuerySnafu { query }) } - Mode::Distributed(_) => { + Mode::Distributed => { let affected = self .sql_dist_insert(insert) .await diff --git a/src/frontend/src/instance/influxdb.rs b/src/frontend/src/instance/influxdb.rs index 51d31cc8a989..c938a4591a47 100644 --- a/src/frontend/src/instance/influxdb.rs +++ b/src/frontend/src/instance/influxdb.rs @@ -31,7 +31,7 @@ impl InfluxdbLineProtocolHandler for Instance { query: &request.lines, })?; } - Mode::Distributed(_) => { + Mode::Distributed => { self.dist_insert(request.try_into()?) .await .map_err(BoxedError::new) diff --git a/src/frontend/src/instance/opentsdb.rs b/src/frontend/src/instance/opentsdb.rs index 273a7957e7d7..75fe56f32184 100644 --- a/src/frontend/src/instance/opentsdb.rs +++ b/src/frontend/src/instance/opentsdb.rs @@ -23,7 +23,7 @@ impl OpentsdbProtocolHandler for Instance { data_point: format!("{:?}", data_point), })?; } - Mode::Distributed(_) => { + Mode::Distributed => { self.dist_insert(vec![data_point.as_grpc_insert()]) .await .map_err(BoxedError::new) diff --git a/src/frontend/src/instance/prometheus.rs b/src/frontend/src/instance/prometheus.rs index 88b2a19b527c..97b77d5266ec 100644 --- a/src/frontend/src/instance/prometheus.rs +++ b/src/frontend/src/instance/prometheus.rs @@ -105,7 +105,7 @@ impl PrometheusProtocolHandler for Instance { msg: "failed to write prometheus remote request", })?; } - Mode::Distributed(_) => { + Mode::Distributed => { let inserts = prometheus::write_request_to_insert_exprs(database, request)?; self.dist_insert(inserts)