Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: extract plugins crate #2487

Merged
merged 28 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
4110060
chore: move frontend plugins fn
shuiyisong Sep 12, 2023
a696af0
chore: move datanode plugins to fn
shuiyisong Sep 12, 2023
2749149
chore: add opt plugins
shuiyisong Sep 13, 2023
b4233a3
chore: add plugins to meta-srv
shuiyisong Sep 13, 2023
1c0abf8
chore: setup meta plugins, wait for router extension
shuiyisong Sep 13, 2023
7f90747
chore: merge develop
shuiyisong Sep 14, 2023
2cf91a2
chore: try use configurator for grpc too
shuiyisong Sep 14, 2023
f281de3
chore: minor fix fmt
shuiyisong Sep 14, 2023
3178225
chore: minor fix fmt
shuiyisong Sep 14, 2023
4e83db7
chore: add start meta_srv for hook
shuiyisong Sep 14, 2023
2ea4707
Merge branch 'develop' into refactor/plugins
shuiyisong Sep 14, 2023
0495fd3
Merge branch 'develop' into refactor/plugins
shuiyisong Sep 15, 2023
8d253ee
chore: merge develop
shuiyisong Sep 19, 2023
3d7eeb7
chore: merge develop
shuiyisong Sep 19, 2023
8f840a1
chore: merge develop
shuiyisong Sep 19, 2023
230de52
chore: merge develop
shuiyisong Sep 21, 2023
7ad202c
chore: merge develop
shuiyisong Sep 22, 2023
aecdfca
chore: merge develop
shuiyisong Sep 25, 2023
40bb86c
chore: minor fix
shuiyisong Sep 25, 2023
80dd634
chore: replace Arc<Plugins> with PluginsRef
shuiyisong Sep 25, 2023
f312a39
chore: fix header
shuiyisong Sep 25, 2023
a4a6b84
chore: merge develop
shuiyisong Sep 28, 2023
617682f
chore: merge develop
shuiyisong Oct 8, 2023
40bf496
chore: merge develop
shuiyisong Oct 8, 2023
ab70f60
chore: remove empty file
shuiyisong Oct 8, 2023
1cd4af2
chore: modify comments
shuiyisong Oct 8, 2023
cbe3014
chore: remove PluginsRef type alias
shuiyisong Oct 8, 2023
0ce1377
chore: remove `OptPlugins`
shuiyisong Oct 9, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 10 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ members = [
"src/object-store",
"src/operator",
"src/partition",
"src/plugins",
"src/promql",
"src/query",
"src/script",
Expand All @@ -61,7 +62,6 @@ license = "Apache-2.0"
[workspace.dependencies]
aquamarine = "0.3"
arrow = { version = "43.0" }
etcd-client = "0.11"
arrow-array = "43.0"
arrow-flight = "43.0"
arrow-schema = { version = "43.0", features = ["serde"] }
Expand All @@ -76,18 +76,22 @@ datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "c0b0fca548e99d020c76e1a1cd7132aab26000e1" }
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "c0b0fca548e99d020c76e1a1cd7132aab26000e1" }
derive_builder = "0.12"
etcd-client = "0.11"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1f1dd532a111e3834cc3019c5605e2993ffb9dc3" }
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "abbd357c1e193cd270ea65ee7652334a150b628f" }
metrics = "0.20"
moka = "0.12"
once_cell = "1.18"
opentelemetry-proto = { version = "0.2", features = ["gen-tonic", "metrics"] }
parquet = "43.0"
paste = "1.0"
prost = "0.11"
raft-engine = { git = "https://github.com/tikv/raft-engine.git", rev = "22dfb426cd994602b57725ef080287d3e53db479" }
rand = "0.8"
regex = "1.8"
reqwest = { version = "0.11", default-features = false, features = [
Expand All @@ -109,8 +113,6 @@ tokio-util = { version = "0.7", features = ["io-util", "compat"] }
toml = "0.7"
tonic = { version = "0.9", features = ["tls"] }
uuid = { version = "1", features = ["serde", "v4", "fast-rng"] }
metrics = "0.20"
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "abbd357c1e193cd270ea65ee7652334a150b628f" }
## workspaces members
api = { path = "src/api" }
auth = { path = "src/auth" }
Expand All @@ -123,19 +125,18 @@ common-config = { path = "src/common/config" }
common-datasource = { path = "src/common/datasource" }
common-error = { path = "src/common/error" }
common-function = { path = "src/common/function" }
common-macro = { path = "src/common/macro" }
common-greptimedb-telemetry = { path = "src/common/greptimedb-telemetry" }
common-grpc = { path = "src/common/grpc" }
common-grpc-expr = { path = "src/common/grpc-expr" }
common-macro = { path = "src/common/macro" }
common-mem-prof = { path = "src/common/mem-prof" }
common-meta = { path = "src/common/meta" }
common-pprof = { path = "src/common/pprof" }
common-procedure = { path = "src/common/procedure" }
common-procedure-test = { path = "src/common/procedure-test" }
common-pprof = { path = "src/common/pprof" }
common-query = { path = "src/common/query" }
common-recordbatch = { path = "src/common/recordbatch" }
common-runtime = { path = "src/common/runtime" }
substrait = { path = "src/common/substrait" }
common-telemetry = { path = "src/common/telemetry" }
common-test-util = { path = "src/common/test-util" }
common-time = { path = "src/common/time" }
Expand All @@ -149,20 +150,20 @@ meta-client = { path = "src/meta-client" }
meta-srv = { path = "src/meta-srv" }
mito = { path = "src/mito" }
mito2 = { path = "src/mito2" }
operator = { path = "src/operator" }
object-store = { path = "src/object-store" }
operator = { path = "src/operator" }
partition = { path = "src/partition" }
plugins = { path = "src/plugins" }
promql = { path = "src/promql" }
query = { path = "src/query" }
raft-engine = { git = "https://github.com/tikv/raft-engine.git", rev = "22dfb426cd994602b57725ef080287d3e53db479" }
script = { path = "src/script" }
servers = { path = "src/servers" }
session = { path = "src/session" }
sql = { path = "src/sql" }
storage = { path = "src/storage" }
store-api = { path = "src/store-api" }
substrait = { path = "src/common/substrait" }
table = { path = "src/table" }
table-procedure = { path = "src/table-procedure" }

[workspace.dependencies.meter-macros]
git = "https://github.com/GreptimeTeam/greptime-meter.git"
Expand Down
1 change: 1 addition & 0 deletions src/cmd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ metrics.workspace = true
mito2 = { workspace = true }
nu-ansi-term = "0.46"
partition = { workspace = true }
plugins.workspace = true
prost.workspace = true
query = { workspace = true }
rand.workspace = true
Expand Down
4 changes: 2 additions & 2 deletions src/cmd/src/cli/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::time::Instant;
use catalog::kvbackend::{CachedMetaKvBackend, KvBackendCatalogManager};
use client::client_manager::DatanodeClients;
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_base::Plugins;
use common_base::PluginsRef;
use common_error::ext::ErrorExt;
use common_query::Output;
use common_recordbatch::RecordBatches;
Expand Down Expand Up @@ -257,7 +257,7 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
cached_meta_backend.clone(),
datanode_clients,
);
let plugins: Arc<Plugins> = Default::default();
let plugins: PluginsRef = Default::default();
let state = Arc::new(QueryEngineState::new(
catalog_list,
None,
Expand Down
11 changes: 10 additions & 1 deletion src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use common_telemetry::logging;
use datanode::config::DatanodeOptions;
use datanode::datanode::{Datanode, DatanodeBuilder};
use meta_client::MetaClientOptions;
use plugins::OptPlugins;
use servers::Mode;
use snafu::ResultExt;

Expand All @@ -31,6 +32,10 @@ pub struct Instance {

impl Instance {
pub async fn start(&mut self) -> Result<()> {
plugins::start_datanode_plugins(self.datanode.plugins())
.await
.context(StartDatanodeSnafu)?;

self.datanode.start().await.context(StartDatanodeSnafu)
}

Expand Down Expand Up @@ -160,10 +165,14 @@ impl StartCommand {
}

async fn build(self, opts: DatanodeOptions) -> Result<Instance> {
let OptPlugins { opts, plugins } = plugins::setup_datanode_plugins(opts)
.await
.context(StartDatanodeSnafu)?;

logging::info!("Datanode start command: {:#?}", self);
logging::info!("Datanode options: {:#?}", opts);

let datanode = DatanodeBuilder::new(opts, None, Default::default())
let datanode = DatanodeBuilder::new(opts, None, plugins)
.build()
.await
.context(StartDatanodeSnafu)?;
Expand Down
7 changes: 0 additions & 7 deletions src/cmd/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,6 @@ pub enum Error {
#[snafu(display("Illegal config: {}", msg))]
IllegalConfig { msg: String, location: Location },

#[snafu(display("Illegal auth config"))]
IllegalAuthConfig {
location: Location,
source: auth::error::Error,
},

#[snafu(display("Unsupported selector type: {}", selector_type))]
UnsupportedSelectorType {
selector_type: String,
Expand Down Expand Up @@ -208,7 +202,6 @@ impl ErrorExt for Error {
| Error::LoadLayeredConfig { .. }
| Error::IllegalConfig { .. }
| Error::InvalidReplCommand { .. }
| Error::IllegalAuthConfig { .. }
| Error::ConnectEtcd { .. } => StatusCode::InvalidArguments,

Error::ReplCreation { .. } | Error::Readline { .. } => StatusCode::Internal,
Expand Down
49 changes: 23 additions & 26 deletions src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use auth::UserProviderRef;
use clap::Parser;
use common_base::Plugins;
use common_telemetry::logging;
use frontend::frontend::FrontendOptions;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
use meta_client::MetaClientOptions;
use plugins::OptPlugins;
use servers::tls::{TlsMode, TlsOption};
use servers::Mode;
use snafu::ResultExt;

use crate::error::{self, IllegalAuthConfigSnafu, Result};
use crate::error::{self, Result, StartFrontendSnafu};
use crate::options::{Options, TopLevelOptions};

pub struct Instance {
Expand All @@ -34,10 +31,11 @@ pub struct Instance {

impl Instance {
pub async fn start(&mut self) -> Result<()> {
self.frontend
.start()
plugins::start_frontend_plugins(self.frontend.plugins().clone())
.await
.context(error::StartFrontendSnafu)
.context(StartFrontendSnafu)?;

self.frontend.start().await.context(StartFrontendSnafu)
}

pub async fn stop(&self) -> Result<()> {
Expand Down Expand Up @@ -177,38 +175,32 @@ impl StartCommand {
opts.mode = Mode::Distributed;
}

opts.user_provider = self.user_provider.clone();

Ok(Options::Frontend(Box::new(opts)))
}

async fn build(self, opts: FrontendOptions) -> Result<Instance> {
let OptPlugins { opts, plugins } = plugins::setup_frontend_plugins(opts)
.await
.context(StartFrontendSnafu)?;

logging::info!("Frontend start command: {:#?}", self);
logging::info!("Frontend options: {:#?}", opts);

let plugins = Arc::new(load_frontend_plugins(&self.user_provider)?);

let mut instance = FeInstance::try_new_distributed(&opts, plugins.clone())
.await
.context(error::StartFrontendSnafu)?;
.context(StartFrontendSnafu)?;

instance
.build_servers(&opts)
.await
.context(error::StartFrontendSnafu)?;
.context(StartFrontendSnafu)?;

Ok(Instance { frontend: instance })
}
}

pub fn load_frontend_plugins(user_provider: &Option<String>) -> Result<Plugins> {
let plugins = Plugins::new();

if let Some(provider) = user_provider {
let provider = auth::user_provider_from_option(provider).context(IllegalAuthConfigSnafu)?;
plugins.insert::<UserProviderRef>(provider);
}
Ok(plugins)
}

#[cfg(test)]
mod tests {
use std::io::Write;
Expand All @@ -218,6 +210,8 @@ mod tests {
use common_base::readable_size::ReadableSize;
use common_test_util::temp_dir::create_named_temp_file;
use frontend::service_config::GrpcOptions;
use plugins::OptPlugins;
use servers::http::HttpOptions;

use super::*;
use crate::options::ENV_VAR_SEP;
Expand Down Expand Up @@ -303,14 +297,17 @@ mod tests {

#[tokio::test]
async fn test_try_from_start_command_to_anymap() {
let command = StartCommand {
let fe_opts = FrontendOptions {
http: HttpOptions {
disable_dashboard: false,
..Default::default()
},
user_provider: Some("static_user_provider:cmd:test=test".to_string()),
disable_dashboard: Some(false),
..Default::default()
};

let plugins = load_frontend_plugins(&command.user_provider);
let plugins = plugins.unwrap();
let OptPlugins { plugins, .. } = plugins::setup_frontend_plugins(fe_opts).await.unwrap();

let provider = plugins.get::<UserProviderRef>().unwrap();
let result = provider
.authenticate(
Expand Down
16 changes: 10 additions & 6 deletions src/cmd/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ use clap::Parser;
use common_telemetry::logging;
use meta_srv::bootstrap::MetaSrvInstance;
use meta_srv::metasrv::MetaSrvOptions;
use plugins::OptPlugins;
use snafu::ResultExt;

use crate::error::{self, Result};
use crate::error::{self, Result, StartMetaServerSnafu};
use crate::options::{Options, TopLevelOptions};

pub struct Instance {
Expand All @@ -29,10 +30,10 @@ pub struct Instance {

impl Instance {
pub async fn start(&mut self) -> Result<()> {
self.instance
.start()
plugins::start_meta_srv_plugins(self.instance.plugins())
.await
.context(error::StartMetaServerSnafu)
.context(StartMetaServerSnafu)?;
self.instance.start().await.context(StartMetaServerSnafu)
}

pub async fn stop(&self) -> Result<()> {
Expand Down Expand Up @@ -159,11 +160,14 @@ impl StartCommand {
}

async fn build(self, opts: MetaSrvOptions) -> Result<Instance> {
logging::info!("MetaSrv start command: {:#?}", self);
let OptPlugins { opts, plugins } = plugins::setup_meta_srv_plugins(opts)
.await
.context(StartMetaServerSnafu)?;

logging::info!("MetaSrv start command: {:#?}", self);
logging::info!("MetaSrv options: {:#?}", opts);

let instance = MetaSrvInstance::new(opts)
let instance = MetaSrvInstance::new(opts, plugins)
.await
.context(error::BuildMetaServerSnafu)?;

Expand Down
Loading