diff --git a/Cargo.lock b/Cargo.lock index e14fbddf257d..e481583bffea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1607,6 +1607,7 @@ dependencies = [ "mito2", "nu-ansi-term", "partition", + "plugins", "prost", "query", "rand", @@ -6766,6 +6767,18 @@ dependencies = [ "plotters-backend", ] +[[package]] +name = "plugins" +version = "0.4.0-nightly" +dependencies = [ + "auth", + "common-base", + "datanode", + "frontend", + "meta-srv", + "snafu", +] + [[package]] name = "pmutil" version = "0.5.3" diff --git a/Cargo.toml b/Cargo.toml index 85c62135100b..eb36a7e1f761 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ members = [ "src/object-store", "src/operator", "src/partition", + "src/plugins", "src/promql", "src/query", "src/script", @@ -61,7 +62,6 @@ license = "Apache-2.0" [workspace.dependencies] aquamarine = "0.3" arrow = { version = "43.0" } -etcd-client = "0.11" arrow-array = "43.0" arrow-flight = "43.0" arrow-schema = { version = "43.0", features = ["serde"] } @@ -76,18 +76,22 @@ datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b6f3b28b6fe91924cc8dd3d83726b766f2a706ec" } datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b6f3b28b6fe91924cc8dd3d83726b766f2a706ec" } derive_builder = "0.12" +etcd-client = "0.11" futures = "0.3" futures-util = "0.3" greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1f1dd532a111e3834cc3019c5605e2993ffb9dc3" } humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" +meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "abbd357c1e193cd270ea65ee7652334a150b628f" } +metrics = "0.20" moka = "0.12" once_cell = "1.18" opentelemetry-proto = { version = "0.2", features = ["gen-tonic", "metrics"] } parquet = "43.0" paste = "1.0" prost = "0.11" +raft-engine = { git = "https://github.com/tikv/raft-engine.git", rev = "22dfb426cd994602b57725ef080287d3e53db479" } rand = "0.8" regex = "1.8" reqwest = { version = "0.11", default-features = false, features = [ @@ -109,8 +113,6 @@ tokio-util = { version = "0.7", features = ["io-util", "compat"] } toml = "0.7" tonic = { version = "0.9", features = ["tls"] } uuid = { version = "1", features = ["serde", "v4", "fast-rng"] } -metrics = "0.20" -meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "abbd357c1e193cd270ea65ee7652334a150b628f" } ## workspaces members api = { path = "src/api" } auth = { path = "src/auth" } @@ -123,19 +125,18 @@ common-config = { path = "src/common/config" } common-datasource = { path = "src/common/datasource" } common-error = { path = "src/common/error" } common-function = { path = "src/common/function" } -common-macro = { path = "src/common/macro" } common-greptimedb-telemetry = { path = "src/common/greptimedb-telemetry" } common-grpc = { path = "src/common/grpc" } common-grpc-expr = { path = "src/common/grpc-expr" } +common-macro = { path = "src/common/macro" } common-mem-prof = { path = "src/common/mem-prof" } common-meta = { path = "src/common/meta" } +common-pprof = { path = "src/common/pprof" } common-procedure = { path = "src/common/procedure" } common-procedure-test = { path = "src/common/procedure-test" } -common-pprof = { path = "src/common/pprof" } common-query = { path = "src/common/query" } common-recordbatch = { path = "src/common/recordbatch" } common-runtime = { path = "src/common/runtime" } -substrait = { path = "src/common/substrait" } common-telemetry = { path = "src/common/telemetry" } common-test-util = { path = "src/common/test-util" } common-time = { path = "src/common/time" } @@ -149,20 +150,20 @@ meta-client = { path = "src/meta-client" } meta-srv = { path = "src/meta-srv" } mito = { path = "src/mito" } mito2 = { path = "src/mito2" } -operator = { path = "src/operator" } object-store = { path = "src/object-store" } +operator = { path = "src/operator" } partition = { path = "src/partition" } +plugins = { path = "src/plugins" } promql = { path = "src/promql" } query = { path = "src/query" } -raft-engine = { git = "https://github.com/tikv/raft-engine.git", rev = "22dfb426cd994602b57725ef080287d3e53db479" } script = { path = "src/script" } servers = { path = "src/servers" } session = { path = "src/session" } sql = { path = "src/sql" } storage = { path = "src/storage" } store-api = { path = "src/store-api" } +substrait = { path = "src/common/substrait" } table = { path = "src/table" } -table-procedure = { path = "src/table-procedure" } [workspace.dependencies.meter-macros] git = "https://github.com/GreptimeTeam/greptime-meter.git" diff --git a/src/auth/Cargo.toml b/src/auth/Cargo.toml index 3909d8078157..748da3b40f8b 100644 --- a/src/auth/Cargo.toml +++ b/src/auth/Cargo.toml @@ -4,8 +4,6 @@ version.workspace = true edition.workspace = true license.workspace = true -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [features] default = [] testing = [] diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index d04eb8f86265..9c2de2e90806 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -49,6 +49,7 @@ metrics.workspace = true mito2 = { workspace = true } nu-ansi-term = "0.46" partition = { workspace = true } +plugins.workspace = true prost.workspace = true query = { workspace = true } rand.workspace = true diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index 8fd49f7d27b6..5173d0336831 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -257,7 +257,7 @@ async fn create_query_engine(meta_addr: &str) -> Result { cached_meta_backend.clone(), datanode_clients, ); - let plugins: Arc = Default::default(); + let plugins: Plugins = Default::default(); let state = Arc::new(QueryEngineState::new( catalog_list, None, diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index e2ff160eefb5..63c7a073b626 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -31,6 +31,10 @@ pub struct Instance { impl Instance { pub async fn start(&mut self) -> Result<()> { + plugins::start_datanode_plugins(self.datanode.plugins()) + .await + .context(StartDatanodeSnafu)?; + self.datanode.start().await.context(StartDatanodeSnafu) } @@ -159,11 +163,15 @@ impl StartCommand { Ok(Options::Datanode(Box::new(opts))) } - async fn build(self, opts: DatanodeOptions) -> Result { + async fn build(self, mut opts: DatanodeOptions) -> Result { + let plugins = plugins::setup_datanode_plugins(&mut opts) + .await + .context(StartDatanodeSnafu)?; + logging::info!("Datanode start command: {:#?}", self); logging::info!("Datanode options: {:#?}", opts); - let datanode = DatanodeBuilder::new(opts, None, Default::default()) + let datanode = DatanodeBuilder::new(opts, None, plugins) .build() .await .context(StartDatanodeSnafu)?; diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index 970196ef1841..df8468f92b85 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -85,12 +85,6 @@ pub enum Error { #[snafu(display("Illegal config: {}", msg))] IllegalConfig { msg: String, location: Location }, - #[snafu(display("Illegal auth config"))] - IllegalAuthConfig { - location: Location, - source: auth::error::Error, - }, - #[snafu(display("Unsupported selector type: {}", selector_type))] UnsupportedSelectorType { selector_type: String, @@ -208,7 +202,6 @@ impl ErrorExt for Error { | Error::LoadLayeredConfig { .. } | Error::IllegalConfig { .. } | Error::InvalidReplCommand { .. } - | Error::IllegalAuthConfig { .. } | Error::ConnectEtcd { .. } => StatusCode::InvalidArguments, Error::ReplCreation { .. } | Error::Readline { .. } => StatusCode::Internal, diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 61f0bab6a3d8..0cb0cb2bd6e2 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -12,11 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - -use auth::UserProviderRef; use clap::Parser; -use common_base::Plugins; use common_telemetry::logging; use frontend::frontend::FrontendOptions; use frontend::instance::{FrontendInstance, Instance as FeInstance}; @@ -25,7 +21,7 @@ use servers::tls::{TlsMode, TlsOption}; use servers::Mode; use snafu::ResultExt; -use crate::error::{self, IllegalAuthConfigSnafu, Result}; +use crate::error::{self, Result, StartFrontendSnafu}; use crate::options::{Options, TopLevelOptions}; pub struct Instance { @@ -34,10 +30,11 @@ pub struct Instance { impl Instance { pub async fn start(&mut self) -> Result<()> { - self.frontend - .start() + plugins::start_frontend_plugins(self.frontend.plugins().clone()) .await - .context(error::StartFrontendSnafu) + .context(StartFrontendSnafu)?; + + self.frontend.start().await.context(StartFrontendSnafu) } pub async fn stop(&self) -> Result<()> { @@ -177,38 +174,32 @@ impl StartCommand { opts.mode = Mode::Distributed; } + opts.user_provider = self.user_provider.clone(); + Ok(Options::Frontend(Box::new(opts))) } - async fn build(self, opts: FrontendOptions) -> Result { + async fn build(self, mut opts: FrontendOptions) -> Result { + let plugins = plugins::setup_frontend_plugins(&mut opts) + .await + .context(StartFrontendSnafu)?; + logging::info!("Frontend start command: {:#?}", self); logging::info!("Frontend options: {:#?}", opts); - let plugins = Arc::new(load_frontend_plugins(&self.user_provider)?); - let mut instance = FeInstance::try_new_distributed(&opts, plugins.clone()) .await - .context(error::StartFrontendSnafu)?; + .context(StartFrontendSnafu)?; instance .build_servers(&opts) .await - .context(error::StartFrontendSnafu)?; + .context(StartFrontendSnafu)?; Ok(Instance { frontend: instance }) } } -pub fn load_frontend_plugins(user_provider: &Option) -> Result { - let plugins = Plugins::new(); - - if let Some(provider) = user_provider { - let provider = auth::user_provider_from_option(provider).context(IllegalAuthConfigSnafu)?; - plugins.insert::(provider); - } - Ok(plugins) -} - #[cfg(test)] mod tests { use std::io::Write; @@ -218,6 +209,7 @@ mod tests { use common_base::readable_size::ReadableSize; use common_test_util::temp_dir::create_named_temp_file; use frontend::service_config::GrpcOptions; + use servers::http::HttpOptions; use super::*; use crate::options::ENV_VAR_SEP; @@ -303,14 +295,17 @@ mod tests { #[tokio::test] async fn test_try_from_start_command_to_anymap() { - let command = StartCommand { + let mut fe_opts = FrontendOptions { + http: HttpOptions { + disable_dashboard: false, + ..Default::default() + }, user_provider: Some("static_user_provider:cmd:test=test".to_string()), - disable_dashboard: Some(false), ..Default::default() }; - let plugins = load_frontend_plugins(&command.user_provider); - let plugins = plugins.unwrap(); + let plugins = plugins::setup_frontend_plugins(&mut fe_opts).await.unwrap(); + let provider = plugins.get::().unwrap(); let result = provider .authenticate( diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index 9ae7938a15bd..c0d56828e44e 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -20,7 +20,7 @@ use meta_srv::bootstrap::MetaSrvInstance; use meta_srv::metasrv::MetaSrvOptions; use snafu::ResultExt; -use crate::error::{self, Result}; +use crate::error::{self, Result, StartMetaServerSnafu}; use crate::options::{Options, TopLevelOptions}; pub struct Instance { @@ -29,10 +29,10 @@ pub struct Instance { impl Instance { pub async fn start(&mut self) -> Result<()> { - self.instance - .start() + plugins::start_meta_srv_plugins(self.instance.plugins()) .await - .context(error::StartMetaServerSnafu) + .context(StartMetaServerSnafu)?; + self.instance.start().await.context(StartMetaServerSnafu) } pub async fn stop(&self) -> Result<()> { @@ -158,12 +158,15 @@ impl StartCommand { Ok(Options::Metasrv(Box::new(opts))) } - async fn build(self, opts: MetaSrvOptions) -> Result { - logging::info!("MetaSrv start command: {:#?}", self); + async fn build(self, mut opts: MetaSrvOptions) -> Result { + let plugins = plugins::setup_meta_srv_plugins(&mut opts) + .await + .context(StartMetaServerSnafu)?; + logging::info!("MetaSrv start command: {:#?}", self); logging::info!("MetaSrv options: {:#?}", opts); - let instance = MetaSrvInstance::new(opts) + let instance = MetaSrvInstance::new(opts, plugins) .await .context(error::BuildMetaServerSnafu)?; diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index c6482a6ad5c0..13bada3d6a71 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -44,7 +44,6 @@ use crate::error::{ IllegalConfigSnafu, InitMetadataSnafu, Result, ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, }; -use crate::frontend::load_frontend_plugins; use crate::options::{MixOptions, Options, TopLevelOptions}; #[derive(Parser)] @@ -298,8 +297,11 @@ impl StartCommand { #[allow(unused_variables)] #[allow(clippy::diverging_sub_expression)] async fn build(self, opts: MixOptions) -> Result { - let plugins = Arc::new(load_frontend_plugins(&self.user_provider)?); - let fe_opts = opts.frontend; + let mut fe_opts = opts.frontend; + let fe_plugins = plugins::setup_frontend_plugins(&mut fe_opts) + .await + .context(StartFrontendSnafu)?; + let dn_opts = opts.datanode; info!("Standalone start command: {:#?}", self); @@ -315,7 +317,7 @@ impl StartCommand { .context(StartFrontendSnafu)?; let datanode = - DatanodeBuilder::new(dn_opts.clone(), Some(kv_store.clone()), plugins.clone()) + DatanodeBuilder::new(dn_opts.clone(), Some(kv_store.clone()), Default::default()) .build() .await .context(StartDatanodeSnafu)?; @@ -335,7 +337,7 @@ impl StartCommand { // TODO: build frontend instance like in distributed mode let mut frontend = build_frontend( - plugins, + fe_plugins, kv_store, procedure_manager, catalog_manager, @@ -354,7 +356,7 @@ impl StartCommand { /// Build frontend instance in standalone mode async fn build_frontend( - plugins: Arc, + plugins: Plugins, kv_store: KvBackendRef, procedure_manager: ProcedureManagerRef, catalog_manager: CatalogManagerRef, @@ -388,13 +390,13 @@ mod tests { #[tokio::test] async fn test_try_from_start_command_to_anymap() { - let command = StartCommand { + let mut fe_opts = FrontendOptions { user_provider: Some("static_user_provider:cmd:test=test".to_string()), ..Default::default() }; - let plugins = load_frontend_plugins(&command.user_provider); - let plugins = plugins.unwrap(); + let plugins = plugins::setup_frontend_plugins(&mut fe_opts).await.unwrap(); + let provider = plugins.get::().unwrap(); let result = provider .authenticate( diff --git a/src/common/base/src/lib.rs b/src/common/base/src/lib.rs index 5fc8087aae7f..a111bf3c7376 100644 --- a/src/common/base/src/lib.rs +++ b/src/common/base/src/lib.rs @@ -23,6 +23,8 @@ use std::sync::{Arc, Mutex, MutexGuard}; pub use bit_vec::BitVec; +/// [`Plugins`] is a wrapper of Arc contents. +/// Make it Cloneable and we can treat it like an Arc struct. #[derive(Default, Clone)] pub struct Plugins { inner: Arc>>, diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 3d127faf3322..d1ad159ffa9c 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -72,6 +72,7 @@ pub struct Datanode { region_server: RegionServer, greptimedb_telemetry_task: Arc, leases_notifier: Option>, + plugins: Plugins, } impl Datanode { @@ -137,11 +138,15 @@ impl Datanode { pub fn region_server(&self) -> RegionServer { self.region_server.clone() } + + pub fn plugins(&self) -> Plugins { + self.plugins.clone() + } } pub struct DatanodeBuilder { opts: DatanodeOptions, - plugins: Arc, + plugins: Plugins, meta_client: Option, kv_backend: Option, } @@ -149,11 +154,7 @@ pub struct DatanodeBuilder { impl DatanodeBuilder { /// `kv_backend` is optional. If absent, the builder will try to build one /// by using the given `opts` - pub fn new( - opts: DatanodeOptions, - kv_backend: Option, - plugins: Arc, - ) -> Self { + pub fn new(opts: DatanodeOptions, kv_backend: Option, plugins: Plugins) -> Self { Self { opts, plugins, @@ -262,6 +263,7 @@ impl DatanodeBuilder { greptimedb_telemetry_task, region_event_receiver, leases_notifier, + plugins: self.plugins.clone(), }) } @@ -327,7 +329,7 @@ impl DatanodeBuilder { async fn new_region_server( opts: &DatanodeOptions, - plugins: Arc, + plugins: Plugins, log_store: Arc, event_listener: RegionServerEventListenerRef, ) -> Result { @@ -360,6 +362,8 @@ impl DatanodeBuilder { Ok(region_server) } + // internal utils + /// Build [RaftEngineLogStore] async fn build_log_store(opts: &DatanodeOptions) -> Result> { let data_home = normalize_dir(&opts.storage.data_home); @@ -454,7 +458,7 @@ mod tests { ..Default::default() }, None, - Arc::new(Plugins::default()), + Plugins::default(), ); let kv = Arc::new(MemoryKvBackend::default()) as _; diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index a176d663b5bb..99e8b17b9f90 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -272,6 +272,9 @@ pub enum Error { source: operator::error::Error, location: Location, }, + + #[snafu(display("Invalid auth config"))] + IllegalAuthConfig { source: auth::error::Error }, } pub type Result = std::result::Result; @@ -288,6 +291,7 @@ impl ErrorExt for Error { | Error::ColumnNotFound { .. } | Error::MissingMetasrvOpts { .. } | Error::UnsupportedFormat { .. } + | Error::IllegalAuthConfig { .. } | Error::EmptyData { .. } | Error::ColumnNoneDefaultValue { .. } | Error::IncompleteGrpcRequest { .. } => StatusCode::InvalidArguments, diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index a45dd13ae340..cade13de921b 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -41,6 +41,7 @@ pub struct FrontendOptions { pub meta_client: Option, pub logging: LoggingOptions, pub datanode: DatanodeOptions, + pub user_provider: Option, } impl Default for FrontendOptions { @@ -60,6 +61,7 @@ impl Default for FrontendOptions { meta_client: None, logging: LoggingOptions::default(), datanode: DatanodeOptions::default(), + user_provider: None, } } } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index a7cef4ba6786..8828bdd88a54 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -122,9 +122,7 @@ pub struct Instance { script_executor: Arc, statement_executor: Arc, query_engine: QueryEngineRef, - /// plugins: this map holds extensions to customize query or auth - /// behaviours. - plugins: Arc, + plugins: Plugins, servers: Arc, heartbeat_task: Option, inserter: InserterRef, @@ -132,10 +130,7 @@ pub struct Instance { } impl Instance { - pub async fn try_new_distributed( - opts: &FrontendOptions, - plugins: Arc, - ) -> Result { + pub async fn try_new_distributed(opts: &FrontendOptions, plugins: Plugins) -> Result { let meta_client = Self::create_meta_client(opts).await?; let datanode_clients = Arc::new(DatanodeClients::default()); @@ -146,7 +141,7 @@ impl Instance { pub async fn try_new_distributed_with( meta_client: Arc, datanode_clients: Arc, - plugins: Arc, + plugins: Plugins, opts: &FrontendOptions, ) -> Result { let meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone())); @@ -297,7 +292,7 @@ impl Instance { kv_backend: KvBackendRef, procedure_manager: ProcedureManagerRef, catalog_manager: CatalogManagerRef, - plugins: Arc, + plugins: Plugins, region_server: RegionServer, ) -> Result { let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend.clone())); @@ -377,7 +372,7 @@ impl Instance { &self.catalog_manager } - pub fn plugins(&self) -> Arc { + pub fn plugins(&self) -> Plugins { self.plugins.clone() } @@ -593,7 +588,7 @@ impl PrometheusHandler for Instance { } pub fn check_permission( - plugins: Arc, + plugins: Plugins, stmt: &Statement, query_ctx: &QueryContextRef, ) -> Result<()> { @@ -664,6 +659,7 @@ fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> mod tests { use std::collections::HashMap; + use common_base::Plugins; use query::query_engine::options::QueryOptions; use session::context::QueryContext; use sql::dialect::GreptimeDbDialect; @@ -674,11 +670,10 @@ mod tests { #[test] fn test_exec_validation() { let query_ctx = QueryContext::arc(); - let plugins = Plugins::new(); + let plugins: Plugins = Plugins::new(); plugins.insert(QueryOptions { disallow_cross_schema_query: true, }); - let plugins = Arc::new(plugins); let sql = r#" SELECT * FROM demo; @@ -704,7 +699,7 @@ mod tests { re.unwrap(); } - fn replace_test(template_sql: &str, plugins: Arc, query_ctx: &QueryContextRef) { + fn replace_test(template_sql: &str, plugins: Plugins, query_ctx: &QueryContextRef) { // test right let right = vec![("", ""), ("", "public."), ("greptime.", "public.")]; for (catalog, schema) in right { @@ -732,7 +727,7 @@ mod tests { template.format(&vars).unwrap() } - fn do_test(sql: &str, plugins: Arc, query_ctx: &QueryContextRef, is_ok: bool) { + fn do_test(sql: &str, plugins: Plugins, query_ctx: &QueryContextRef, is_ok: bool) { let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0]; let re = check_permission(plugins, stmt, query_ctx); if is_ok { diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 794c7000a875..073bc0d53e82 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -20,7 +20,6 @@ use auth::UserProviderRef; use common_base::Plugins; use common_runtime::Builder as RuntimeBuilder; use common_telemetry::info; -use servers::configurator::ConfiguratorRef; use servers::error::InternalIoSnafu; use servers::grpc::{GrpcServer, GrpcServerConfig}; use servers::http::HttpServerBuilder; @@ -47,7 +46,7 @@ impl Services { pub(crate) async fn build( opts: &FrontendOptions, instance: Arc, - plugins: Arc, + plugins: Plugins, ) -> Result where T: FrontendInstance, @@ -120,7 +119,7 @@ impl Services { let http_server = http_server_builder .with_metrics_handler(MetricsHandler) .with_script_handler(instance.clone()) - .with_configurator(plugins.get::()) + .with_plugins(plugins) .with_greptime_config_options(opts.to_toml_string()) .build(); result.push((Box::new(http_server), http_addr)); diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index c3b1deec613b..eaa79ee8775c 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -20,7 +20,9 @@ use api::v1::meta::heartbeat_server::HeartbeatServer; use api::v1::meta::lock_server::LockServer; use api::v1::meta::router_server::RouterServer; use api::v1::meta::store_server::StoreServer; +use common_base::Plugins; use etcd_client::Client; +use servers::configurator::ConfiguratorRef; use servers::http::{HttpServer, HttpServerBuilder}; use servers::metrics_handler::MetricsHandler; use servers::server::Server; @@ -53,22 +55,27 @@ pub struct MetaSrvInstance { opts: MetaSrvOptions, signal_sender: Option>, + + plugins: Plugins, } impl MetaSrvInstance { - pub async fn new(opts: MetaSrvOptions) -> Result { - let meta_srv = build_meta_srv(&opts).await?; + pub async fn new(opts: MetaSrvOptions, plugins: Plugins) -> Result { + let meta_srv = build_meta_srv(&opts, plugins.clone()).await?; let http_srv = Arc::new( HttpServerBuilder::new(opts.http.clone()) .with_metrics_handler(MetricsHandler) .with_greptime_config_options(opts.to_toml_string()) .build(), ); + // put meta_srv into plugins for later use + plugins.insert::>(Arc::new(meta_srv.clone())); Ok(MetaSrvInstance { meta_srv, http_srv, opts, signal_sender: None, + plugins, }) } @@ -79,8 +86,12 @@ impl MetaSrvInstance { self.signal_sender = Some(tx); - let meta_srv = - bootstrap_meta_srv_with_router(&self.opts.bind_addr, router(self.meta_srv.clone()), rx); + let mut router = router(self.meta_srv.clone()); + if let Some(configurator) = self.meta_srv.plugins().get::() { + router = configurator.config_grpc(router); + } + + let meta_srv = bootstrap_meta_srv_with_router(&self.opts.bind_addr, router, rx); let addr = self.opts.http.addr.parse().context(error::ParseAddrSnafu { addr: &self.opts.http.addr, })?; @@ -110,6 +121,10 @@ impl MetaSrvInstance { })?; Ok(()) } + + pub fn plugins(&self) -> Plugins { + self.plugins.clone() + } } pub async fn bootstrap_meta_srv_with_router( @@ -146,7 +161,7 @@ pub fn router(meta_srv: MetaSrv) -> Router { .add_service(admin::make_admin_service(meta_srv)) } -pub async fn build_meta_srv(opts: &MetaSrvOptions) -> Result { +pub async fn build_meta_srv(opts: &MetaSrvOptions, plugins: Plugins) -> Result { let (kv_store, election, lock) = if opts.use_memory_store { ( Arc::new(MemStore::new()) as _, @@ -179,14 +194,7 @@ pub async fn build_meta_srv(opts: &MetaSrvOptions) -> Result { .selector(selector) .election(election) .lock(lock) + .plugins(plugins) .build() .await } - -pub async fn make_meta_srv(opts: &MetaSrvOptions) -> Result { - let meta_srv = build_meta_srv(opts).await?; - - meta_srv.try_start().await?; - - Ok(meta_srv) -} diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index c3b27a2ea70c..39aeae24439b 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use std::time::Duration; use api::v1::meta::Peer; +use common_base::Plugins; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_grpc::channel_manager; use common_meta::ddl::DdlTaskExecutorRef; @@ -188,7 +189,8 @@ pub struct MetaSrv { ddl_executor: DdlTaskExecutorRef, table_metadata_manager: TableMetadataManagerRef, greptimedb_telemetry_task: Arc, - pubsub: Option<(PublishRef, SubscribeManagerRef)>, + + plugins: Plugins, } impl MetaSrv { @@ -208,7 +210,7 @@ impl MetaSrv { let procedure_manager = self.procedure_manager.clone(); let in_memory = self.in_memory.clone(); let leader_cached_kv_store = self.leader_cached_kv_store.clone(); - let subscribe_manager = self.subscribe_manager().cloned(); + let subscribe_manager = self.subscribe_manager(); let mut rx = election.subscribe_leader_change(); let task_handler = self.greptimedb_telemetry_task.clone(); let _handle = common_runtime::spawn_bg(async move { @@ -350,12 +352,16 @@ impl MetaSrv { &self.table_metadata_manager } - pub fn publish(&self) -> Option<&PublishRef> { - self.pubsub.as_ref().map(|suite| &suite.0) + pub fn publish(&self) -> Option { + self.plugins.get::() + } + + pub fn subscribe_manager(&self) -> Option { + self.plugins.get::() } - pub fn subscribe_manager(&self) -> Option<&SubscribeManagerRef> { - self.pubsub.as_ref().map(|suite| &suite.1) + pub fn plugins(&self) -> &Plugins { + &self.plugins } #[inline] diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 6d118a38a9c7..076484e7f999 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use std::time::Duration; use client::client_manager::DatanodeClients; +use common_base::Plugins; use common_grpc::channel_manager::ChannelConfig; use common_meta::ddl_manager::{DdlManager, DdlManagerRef}; use common_meta::distributed_time_constants; @@ -48,7 +49,7 @@ use crate::metasrv::{ ElectionRef, MetaSrv, MetaSrvOptions, MetasrvInfo, SelectorContext, SelectorRef, TABLE_ID_SEQ, }; use crate::procedure::region_failover::RegionFailoverManager; -use crate::pubsub::{PublishRef, SubscribeManagerRef}; +use crate::pubsub::PublishRef; use crate::selector::lease_based::LeaseBasedSelector; use crate::service::mailbox::MailboxRef; use crate::service::store::cached_kv::{CheckLeader, LeaderCachedKvStore}; @@ -67,7 +68,7 @@ pub struct MetaSrvBuilder { meta_peer_client: Option, lock: Option, datanode_clients: Option>, - pubsub: Option<(PublishRef, SubscribeManagerRef)>, + plugins: Option, } impl MetaSrvBuilder { @@ -82,7 +83,7 @@ impl MetaSrvBuilder { options: None, lock: None, datanode_clients: None, - pubsub: None, + plugins: None, } } @@ -131,8 +132,8 @@ impl MetaSrvBuilder { self } - pub fn pubsub(mut self, publish: PublishRef, subscribe_manager: SubscribeManagerRef) -> Self { - self.pubsub = Some((publish, subscribe_manager)); + pub fn plugins(mut self, plugins: Plugins) -> Self { + self.plugins = Some(plugins); self } @@ -149,7 +150,7 @@ impl MetaSrvBuilder { handler_group, lock, datanode_clients, - pubsub, + plugins, } = self; let options = options.unwrap_or_default(); @@ -206,11 +207,10 @@ impl MetaSrvBuilder { None }; - let publish_heartbeat_handler = if let Some((publish, _)) = pubsub.as_ref() { - Some(PublishHeartbeatHandler::new(publish.clone())) - } else { - None - }; + let publish_heartbeat_handler = plugins + .clone() + .and_then(|plugins| plugins.get::()) + .map(|publish| PublishHeartbeatHandler::new(publish.clone())); let region_lease_handler = RegionLeaseHandler::new(distributed_time_constants::REGION_LEASE_SECS); @@ -263,7 +263,7 @@ impl MetaSrvBuilder { enable_telemetry, ) .await, - pubsub, + plugins: plugins.unwrap_or_else(Plugins::default), }) } } diff --git a/src/partition/Cargo.toml b/src/partition/Cargo.toml index 4cb2def7bf88..e56b1d6df952 100644 --- a/src/partition/Cargo.toml +++ b/src/partition/Cargo.toml @@ -4,8 +4,6 @@ version.workspace = true edition.workspace = true license.workspace = true -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] api = { workspace = true } async-trait = "0.1" diff --git a/src/plugins/Cargo.toml b/src/plugins/Cargo.toml new file mode 100644 index 000000000000..a11150f7f34e --- /dev/null +++ b/src/plugins/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "plugins" +version.workspace = true +edition.workspace = true +license.workspace = true + +[dependencies] +auth.workspace = true +common-base.workspace = true +datanode.workspace = true +frontend.workspace = true +meta-srv.workspace = true +snafu.workspace = true diff --git a/src/plugins/src/datanode.rs b/src/plugins/src/datanode.rs new file mode 100644 index 000000000000..1ce02b227d37 --- /dev/null +++ b/src/plugins/src/datanode.rs @@ -0,0 +1,25 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_base::Plugins; +use datanode::config::DatanodeOptions; +use datanode::error::Result; + +pub async fn setup_datanode_plugins(_opts: &mut DatanodeOptions) -> Result { + Ok(Plugins::new()) +} + +pub async fn start_datanode_plugins(_plugins: Plugins) -> Result<()> { + Ok(()) +} diff --git a/src/plugins/src/frontend.rs b/src/plugins/src/frontend.rs new file mode 100644 index 000000000000..7ed8e96ecd3b --- /dev/null +++ b/src/plugins/src/frontend.rs @@ -0,0 +1,35 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use auth::UserProviderRef; +use common_base::Plugins; +use frontend::error::{IllegalAuthConfigSnafu, Result}; +use frontend::frontend::FrontendOptions; +use snafu::ResultExt; + +pub async fn setup_frontend_plugins(opts: &mut FrontendOptions) -> Result { + let plugins = Plugins::new(); + + if let Some(user_provider) = opts.user_provider.as_ref() { + let provider = + auth::user_provider_from_option(user_provider).context(IllegalAuthConfigSnafu)?; + plugins.insert::(provider); + } + + Ok(plugins) +} + +pub async fn start_frontend_plugins(_plugins: Plugins) -> Result<()> { + Ok(()) +} diff --git a/src/plugins/src/lib.rs b/src/plugins/src/lib.rs new file mode 100644 index 000000000000..a80ff8ff5f7e --- /dev/null +++ b/src/plugins/src/lib.rs @@ -0,0 +1,21 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod datanode; +mod frontend; +mod meta_srv; + +pub use datanode::{setup_datanode_plugins, start_datanode_plugins}; +pub use frontend::{setup_frontend_plugins, start_frontend_plugins}; +pub use meta_srv::{setup_meta_srv_plugins, start_meta_srv_plugins}; diff --git a/src/plugins/src/meta_srv.rs b/src/plugins/src/meta_srv.rs new file mode 100644 index 000000000000..80ac6d8c7f41 --- /dev/null +++ b/src/plugins/src/meta_srv.rs @@ -0,0 +1,25 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_base::Plugins; +use meta_srv::error::Result; +use meta_srv::metasrv::MetaSrvOptions; + +pub async fn setup_meta_srv_plugins(_opts: &mut MetaSrvOptions) -> Result { + Ok(Plugins::new()) +} + +pub async fn start_meta_srv_plugins(_plugins: Plugins) -> Result<()> { + Ok(()) +} diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index f8c8ed25794a..e5822f3c2482 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -69,11 +69,11 @@ use crate::{metrics, QueryEngine}; pub struct DatafusionQueryEngine { state: Arc, - plugins: Arc, + plugins: Plugins, } impl DatafusionQueryEngine { - pub fn new(state: Arc, plugins: Arc) -> Self { + pub fn new(state: Arc, plugins: Plugins) -> Self { Self { state, plugins } } diff --git a/src/query/src/query_engine.rs b/src/query/src/query_engine.rs index d88f399da316..9e94037ba3aa 100644 --- a/src/query/src/query_engine.rs +++ b/src/query/src/query_engine.rs @@ -98,7 +98,7 @@ impl QueryEngineFactory { region_query_handler: Option, table_mutation_handler: Option, with_dist_planner: bool, - plugins: Arc, + plugins: Plugins, ) -> Self { let state = Arc::new(QueryEngineState::new( catalog_manager, diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index 753015582c8a..7bc031d16e9b 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -61,7 +61,7 @@ pub struct QueryEngineState { catalog_manager: CatalogManagerRef, table_mutation_handler: Option, aggregate_functions: Arc>>, - plugins: Arc, + plugins: Plugins, } impl fmt::Debug for QueryEngineState { @@ -78,7 +78,7 @@ impl QueryEngineState { region_query_handler: Option, table_mutation_handler: Option, with_dist_planner: bool, - plugins: Arc, + plugins: Plugins, ) -> Self { let runtime_env = Arc::new(RuntimeEnv::default()); let session_config = SessionConfig::new().with_create_default_catalog_and_schema(false); diff --git a/src/query/src/tests/query_engine_test.rs b/src/query/src/tests/query_engine_test.rs index 71894eea47c2..437c15067439 100644 --- a/src/query/src/tests/query_engine_test.rs +++ b/src/query/src/tests/query_engine_test.rs @@ -127,7 +127,6 @@ async fn test_query_validate() -> Result<()> { plugins.insert(QueryOptions { disallow_cross_schema_query: true, }); - let plugins = Arc::new(plugins); let factory = QueryEngineFactory::new_with_plugins(catalog_list, None, None, false, plugins); let engine = factory.query_engine(); diff --git a/src/servers/src/configurator.rs b/src/servers/src/configurator.rs index a86aa43e6031..2b2e47d1e7fc 100644 --- a/src/servers/src/configurator.rs +++ b/src/servers/src/configurator.rs @@ -14,10 +14,15 @@ use std::sync::Arc; -use axum::Router; +use axum::Router as HttpRouter; +use tonic::transport::server::Router as GrpcRouter; pub trait Configurator: Send + Sync { - fn config_http(&self, route: Router) -> Router { + fn config_http(&self, route: HttpRouter) -> HttpRouter { + route + } + + fn config_grpc(&self, route: GrpcRouter) -> GrpcRouter { route } } diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index d6fbd8e49da8..f00d4c07a271 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -42,6 +42,7 @@ use axum::middleware::{self, Next}; use axum::response::{Html, IntoResponse, Json}; use axum::{routing, BoxError, Extension, Router}; use common_base::readable_size::ReadableSize; +use common_base::Plugins; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_query::Output; @@ -103,8 +104,8 @@ pub struct HttpServer { shutdown_tx: Mutex>>, user_provider: Option, metrics_handler: Option, - configurator: Option, greptime_config_options: Option, + plugins: Plugins, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -382,8 +383,8 @@ impl HttpServerBuilder { script_handler: None, metrics_handler: None, shutdown_tx: Mutex::new(None), - configurator: None, greptime_config_options: None, + plugins: Default::default(), }, } } @@ -438,8 +439,8 @@ impl HttpServerBuilder { self } - pub fn with_configurator(&mut self, configurator: Option) -> &mut Self { - self.inner.configurator = configurator; + pub fn with_plugins(&mut self, plugins: Plugins) -> &mut Self { + self.inner.plugins = plugins; self } @@ -724,7 +725,7 @@ impl Server for HttpServer { ); let mut app = self.make_app(); - if let Some(configurator) = self.configurator.as_ref() { + if let Some(configurator) = self.plugins.get::() { app = configurator.config_http(app); } let app = self.build(app); diff --git a/taplo.toml b/taplo.toml index b46409453e96..39c6e13aead0 100644 --- a/taplo.toml +++ b/taplo.toml @@ -32,5 +32,5 @@ allowed_blank_lines = 1 crlf = false [[rule]] -keys = ["dependencies", "dev-dependencies", "build-dependencies"] +keys = ["build-dependencies", "dependencies", "dev-dependencies", "workspace.dependencies"] formatting = { reorder_keys = true } diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 013eeb681ec6..a2d7bcbce86f 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -202,7 +202,7 @@ impl GreptimeDbClusterBuilder { .build(); meta_client.start(&[&meta_srv.server_addr]).await.unwrap(); - let mut datanode = DatanodeBuilder::new(opts, None, Arc::new(Plugins::default())) + let mut datanode = DatanodeBuilder::new(opts, None, Plugins::default()) .with_meta_client(meta_client) .build() .await @@ -234,7 +234,7 @@ impl GreptimeDbClusterBuilder { FeInstance::try_new_distributed_with( meta_client, datanode_clients, - Arc::new(Plugins::default()), + Plugins::default(), &frontend_opts, ) .await diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index a4748772bacd..f17dd303fb38 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -74,7 +74,7 @@ impl GreptimeDbStandaloneBuilder { .await .unwrap(); - let plugins = Arc::new(self.plugin.unwrap_or_default()); + let plugins = self.plugin.unwrap_or_default(); let datanode = DatanodeBuilder::new(opts.clone(), Some(kv_store.clone()), plugins.clone()) .build()