From 4cbdf64d521955294f73af674d2d1a6f35ffb2e3 Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Tue, 6 Feb 2024 10:41:37 +0800 Subject: [PATCH] chore: start plugins during standalone startup & comply with current catalog while changing database (#3282) * chore: start plugins in standalone * chore: respect current catalog in use statement for mysql * chore: reduce unnecessory convert to string * chore: reduce duplicate code --- src/cmd/src/standalone.rs | 4 ++++ src/common/catalog/src/lib.rs | 30 ++++++++++++++++++++++++++++-- src/servers/src/mysql/handler.rs | 20 +++++++++++++++----- src/session/src/lib.rs | 5 +++++ 4 files changed, 52 insertions(+), 7 deletions(-) diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 0ff334e8da85..ea963e497d1f 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -213,6 +213,10 @@ impl App for Instance { .await .context(StartWalOptionsAllocatorSnafu)?; + plugins::start_frontend_plugins(self.frontend.plugins().clone()) + .await + .context(StartFrontendSnafu)?; + self.frontend.start().await.context(StartFrontendSnafu)?; Ok(()) } diff --git a/src/common/catalog/src/lib.rs b/src/common/catalog/src/lib.rs index f64d0289baad..1a2596371709 100644 --- a/src/common/catalog/src/lib.rs +++ b/src/common/catalog/src/lib.rs @@ -56,11 +56,22 @@ pub fn build_db_string(catalog: &str, schema: &str) -> String { /// - if `[-]` is provided, we split database name with `-` and use /// `` and ``. pub fn parse_catalog_and_schema_from_db_string(db: &str) -> (&str, &str) { + match parse_optional_catalog_and_schema_from_db_string(db) { + (Some(catalog), schema) => (catalog, schema), + (None, schema) => (DEFAULT_CATALOG_NAME, schema), + } +} + +/// Attempt to parse catalog and schema from given database name +/// +/// Similar to [`parse_catalog_and_schema_from_db_string`] but returns an optional +/// catalog if it's not provided in the database name. +pub fn parse_optional_catalog_and_schema_from_db_string(db: &str) -> (Option<&str>, &str) { let parts = db.splitn(2, '-').collect::>(); if parts.len() == 2 { - (parts[0], parts[1]) + (Some(parts[0]), parts[1]) } else { - (DEFAULT_CATALOG_NAME, db) + (None, db) } } @@ -90,5 +101,20 @@ mod tests { ("catalog", "schema1-schema2"), parse_catalog_and_schema_from_db_string("catalog-schema1-schema2") ); + + assert_eq!( + (None, "fullschema"), + parse_optional_catalog_and_schema_from_db_string("fullschema") + ); + + assert_eq!( + (Some("catalog"), "schema"), + parse_optional_catalog_and_schema_from_db_string("catalog-schema") + ); + + assert_eq!( + (Some("catalog"), "schema1-schema2"), + parse_optional_catalog_and_schema_from_db_string("catalog-schema1-schema2") + ); } } diff --git a/src/servers/src/mysql/handler.rs b/src/servers/src/mysql/handler.rs index 20c603e6bbe1..9c1540dcfa62 100644 --- a/src/servers/src/mysql/handler.rs +++ b/src/servers/src/mysql/handler.rs @@ -21,7 +21,7 @@ use std::time::Duration; use ::auth::{Identity, Password, UserProviderRef}; use async_trait::async_trait; use chrono::{NaiveDate, NaiveDateTime}; -use common_catalog::parse_catalog_and_schema_from_db_string; +use common_catalog::parse_optional_catalog_and_schema_from_db_string; use common_error::ext::ErrorExt; use common_query::Output; use common_telemetry::{debug, error, logging, tracing, warn}; @@ -351,9 +351,14 @@ impl AsyncMysqlShim for MysqlInstanceShi } async fn on_init<'a>(&'a mut self, database: &'a str, w: InitWriter<'a, W>) -> Result<()> { - let (catalog, schema) = parse_catalog_and_schema_from_db_string(database); + let (catalog_from_db, schema) = parse_optional_catalog_and_schema_from_db_string(database); + let catalog = if let Some(catalog) = catalog_from_db { + catalog.to_owned() + } else { + self.session.get_catalog() + }; - if !self.query_handler.is_valid_schema(catalog, schema).await? { + if !self.query_handler.is_valid_schema(&catalog, schema).await? { return w .error( ErrorKind::ER_WRONG_DB_NAME, @@ -366,7 +371,10 @@ impl AsyncMysqlShim for MysqlInstanceShi let user_info = &self.session.user_info(); if let Some(schema_validator) = &self.user_provider { - if let Err(e) = schema_validator.authorize(catalog, schema, user_info).await { + if let Err(e) = schema_validator + .authorize(&catalog, schema, user_info) + .await + { METRIC_AUTH_FAILURE .with_label_values(&[e.status_code().as_ref()]) .inc(); @@ -380,7 +388,9 @@ impl AsyncMysqlShim for MysqlInstanceShi } } - self.session.set_catalog(catalog.into()); + if catalog_from_db.is_some() { + self.session.set_catalog(catalog) + } self.session.set_schema(schema.into()); w.ok().await.map_err(|e| e.into()) diff --git a/src/session/src/lib.rs b/src/session/src/lib.rs index 7fd563124973..0c17ec66e02b 100644 --- a/src/session/src/lib.rs +++ b/src/session/src/lib.rs @@ -98,6 +98,11 @@ impl Session { self.catalog.store(Arc::new(catalog)); } + #[inline] + pub fn get_catalog(&self) -> String { + self.catalog.load().as_ref().clone() + } + #[inline] pub fn set_schema(&self, schema: String) { self.schema.store(Arc::new(schema));