From e5ec65988b4b8c595ff708f91226dd7389f2e678 Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Wed, 7 Feb 2024 09:12:32 +0800 Subject: [PATCH] feat: administration functions (#3236) * feat: adds database() function to return current db * refactor: refactor meta src and client with new protos * feat: impl migrate_region and query_procedure_state for procedure service/client * fix: format * temp commit * feat: impl migrate_region SQL function * chore: clean code for review * fix: license header * fix: toml format * chore: update proto dependency * chore: apply suggestion Co-authored-by: Weny Xu * chore: apply suggestion Co-authored-by: Weny Xu * chore: apply suggestion Co-authored-by: JeremyHi * chore: apply suggestion Co-authored-by: fys <40801205+fengys1996@users.noreply.github.com> * chore: print key when parsing procedure id fails * chore: comment * chore: comment for MigrateRegionFunction --------- Co-authored-by: Weny Xu Co-authored-by: JeremyHi Co-authored-by: fys <40801205+fengys1996@users.noreply.github.com> --- Cargo.lock | 9 +- Cargo.toml | 2 +- src/common/function/Cargo.toml | 5 + src/common/function/src/function.rs | 4 + src/common/function/src/function_registry.rs | 7 + .../function/src/handlers.rs} | 23 ++- src/common/function/src/lib.rs | 5 +- src/common/function/src/scalars/udf.rs | 14 +- src/common/function/src/state.rs | 25 +++ src/common/function/src/system.rs | 10 +- src/common/function/src/system/database.rs | 92 +++++++++ src/common/function/src/system/timezone.rs | 90 +++++++++ src/common/function/src/table.rs | 31 ++++ .../function/src/table/migrate_region.rs | 175 ++++++++++++++++++ src/common/meta/Cargo.toml | 1 + src/common/meta/src/error.rs | 12 +- src/common/meta/src/rpc.rs | 1 + src/common/meta/src/rpc/ddl.rs | 20 +- src/common/meta/src/rpc/procedure.rs | 104 +++++++++++ src/common/procedure/src/lib.rs | 5 +- src/common/query/src/error.rs | 24 ++- src/datatypes/src/data_type.rs | 9 + src/meta-client/src/client.rs | 11 +- .../src/client/{ddl.rs => procedure.rs} | 154 ++++++++++++--- src/meta-srv/src/bootstrap.rs | 4 +- src/meta-srv/src/error.rs | 16 +- src/meta-srv/src/metasrv.rs | 17 ++ src/meta-srv/src/mocks.rs | 4 +- src/meta-srv/src/service.rs | 2 +- src/meta-srv/src/service/ddl.rs | 58 ------ src/meta-srv/src/service/procedure.rs | 135 ++++++++++++++ src/operator/Cargo.toml | 1 + src/operator/src/table.rs | 19 +- src/query/src/datafusion.rs | 4 +- src/query/src/datafusion/planner.rs | 11 +- src/query/src/error.rs | 2 +- src/query/src/lib.rs | 1 - src/query/src/query_engine.rs | 2 +- src/query/src/query_engine/state.rs | 25 ++- .../standalone/common/system/database.result | 28 +++ .../standalone/common/system/database.sql | 9 + .../standalone/common/system/timezone.result | 32 ++++ .../standalone/common/system/timezone.sql | 8 + 43 files changed, 1072 insertions(+), 139 deletions(-) rename src/{query/src/table_mutation.rs => common/function/src/handlers.rs} (63%) create mode 100644 src/common/function/src/state.rs create mode 100644 src/common/function/src/system/database.rs create mode 100644 src/common/function/src/system/timezone.rs create mode 100644 src/common/function/src/table.rs create mode 100644 src/common/function/src/table/migrate_region.rs create mode 100644 src/common/meta/src/rpc/procedure.rs rename src/meta-client/src/client/{ddl.rs => procedure.rs} (53%) delete mode 100644 src/meta-srv/src/service/ddl.rs create mode 100644 src/meta-srv/src/service/procedure.rs create mode 100644 tests/cases/standalone/common/system/database.result create mode 100644 tests/cases/standalone/common/system/database.sql diff --git a/Cargo.lock b/Cargo.lock index d2a0ced8bceb..d2fe7181f88e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1779,12 +1779,16 @@ dependencies = [ name = "common-function" version = "0.6.0" dependencies = [ + "api", "arc-swap", + "async-trait", "build-data", "chrono-tz 0.6.3", "common-error", "common-macro", "common-query", + "common-runtime", + "common-telemetry", "common-time", "datafusion", "datatypes", @@ -1798,6 +1802,7 @@ dependencies = [ "session", "snafu", "statrs", + "table", ] [[package]] @@ -1922,6 +1927,7 @@ dependencies = [ "etcd-client", "futures", "futures-util", + "hex", "humantime-serde", "hyper", "lazy_static", @@ -3782,7 +3788,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=519b1d0757404c8ff1eeb2a68d29f5ade54a1752#519b1d0757404c8ff1eeb2a68d29f5ade54a1752" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=96f1f0404f421ee560a4310c73c5071e49168168#96f1f0404f421ee560a4310c73c5071e49168168" dependencies = [ "prost 0.12.3", "serde", @@ -6037,6 +6043,7 @@ dependencies = [ "common-catalog", "common-datasource", "common-error", + "common-function", "common-grpc-expr", "common-macro", "common-meta", diff --git a/Cargo.toml b/Cargo.toml index 82c8bcb4f679..b2d877faa64c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -96,7 +96,7 @@ etcd-client = "0.12" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "519b1d0757404c8ff1eeb2a68d29f5ade54a1752" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "96f1f0404f421ee560a4310c73c5071e49168168" } humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" diff --git a/src/common/function/Cargo.toml b/src/common/function/Cargo.toml index 05dbdce23f5c..f3161a5a55fe 100644 --- a/src/common/function/Cargo.toml +++ b/src/common/function/Cargo.toml @@ -5,12 +5,16 @@ version.workspace = true license.workspace = true [dependencies] +api.workspace = true arc-swap = "1.0" +async-trait.workspace = true build-data = "0.1" chrono-tz = "0.6" common-error.workspace = true common-macro.workspace = true common-query.workspace = true +common-runtime.workspace = true +common-telemetry.workspace = true common-time.workspace = true datafusion.workspace = true datatypes.workspace = true @@ -22,6 +26,7 @@ paste = "1.0" session.workspace = true snafu.workspace = true statrs = "0.16" +table.workspace = true [dev-dependencies] ron = "0.7" diff --git a/src/common/function/src/function.rs b/src/common/function/src/function.rs index a1e43aca4a97..f47486da4502 100644 --- a/src/common/function/src/function.rs +++ b/src/common/function/src/function.rs @@ -21,16 +21,20 @@ use datatypes::data_type::ConcreteDataType; use datatypes::vectors::VectorRef; use session::context::{QueryContextBuilder, QueryContextRef}; +use crate::state::FunctionState; + /// The function execution context #[derive(Clone)] pub struct FunctionContext { pub query_ctx: QueryContextRef, + pub state: Arc, } impl Default for FunctionContext { fn default() -> Self { Self { query_ctx: QueryContextBuilder::default().build(), + state: Arc::new(FunctionState::default()), } } } diff --git a/src/common/function/src/function_registry.rs b/src/common/function/src/function_registry.rs index a1274779c3a0..ef6fb67de6a6 100644 --- a/src/common/function/src/function_registry.rs +++ b/src/common/function/src/function_registry.rs @@ -25,6 +25,7 @@ use crate::scalars::math::MathFunction; use crate::scalars::numpy::NumpyFunction; use crate::scalars::timestamp::TimestampFunction; use crate::system::SystemFunction; +use crate::table::TableFunction; #[derive(Default)] pub struct FunctionRegistry { @@ -74,13 +75,19 @@ impl FunctionRegistry { pub static FUNCTION_REGISTRY: Lazy> = Lazy::new(|| { let function_registry = FunctionRegistry::default(); + // Utility functions MathFunction::register(&function_registry); NumpyFunction::register(&function_registry); TimestampFunction::register(&function_registry); DateFunction::register(&function_registry); + // Aggregate functions AggregateFunctions::register(&function_registry); + + // System and administration functions SystemFunction::register(&function_registry); + TableFunction::register(&function_registry); + Arc::new(function_registry) }); diff --git a/src/query/src/table_mutation.rs b/src/common/function/src/handlers.rs similarity index 63% rename from src/query/src/table_mutation.rs rename to src/common/function/src/handlers.rs index bff93af93687..352009fc78b0 100644 --- a/src/query/src/table_mutation.rs +++ b/src/common/function/src/handlers.rs @@ -13,13 +13,14 @@ // limitations under the License. use std::sync::Arc; +use std::time::Duration; +use api::v1::meta::ProcedureStateResponse; use async_trait::async_trait; +use common_query::error::Result; use session::context::QueryContextRef; use table::requests::{DeleteRequest, InsertRequest}; -use crate::error::Result; - pub type AffectedRows = usize; /// A trait for handling table mutations in `QueryEngine`. @@ -30,6 +31,24 @@ pub trait TableMutationHandler: Send + Sync { /// Delete rows from the table. async fn delete(&self, request: DeleteRequest, ctx: QueryContextRef) -> Result; + + /// Migrate a region from source peer to target peer, returns the procedure id if success. + async fn migrate_region( + &self, + region_id: u64, + from_peer: u64, + to_peer: u64, + replay_timeout: Duration, + ) -> Result; +} + +/// A trait for handling meta service requests in `QueryEngine`. +#[async_trait] +pub trait MetaServiceHandler: Send + Sync { + /// Query the procedure' state by its id + async fn query_procedure_state(&self, pid: &str) -> Result; } pub type TableMutationHandlerRef = Arc; + +pub type MetaServiceHandlerRef = Arc; diff --git a/src/common/function/src/lib.rs b/src/common/function/src/lib.rs index e8bf35bc19dc..10fbf13a7a05 100644 --- a/src/common/function/src/lib.rs +++ b/src/common/function/src/lib.rs @@ -13,8 +13,11 @@ // limitations under the License. pub mod scalars; -pub mod system; +mod system; +mod table; pub mod function; pub mod function_registry; +pub mod handlers; pub mod helper; +pub mod state; diff --git a/src/common/function/src/scalars/udf.rs b/src/common/function/src/scalars/udf.rs index 0555581e6117..0f34fbcfaa59 100644 --- a/src/common/function/src/scalars/udf.rs +++ b/src/common/function/src/scalars/udf.rs @@ -25,9 +25,14 @@ use session::context::QueryContextRef; use snafu::ResultExt; use crate::function::{FunctionContext, FunctionRef}; - -/// Create a ScalarUdf from function and query context. -pub fn create_udf(func: FunctionRef, query_ctx: QueryContextRef) -> ScalarUdf { +use crate::state::FunctionState; + +/// Create a ScalarUdf from function, query context and state. +pub fn create_udf( + func: FunctionRef, + query_ctx: QueryContextRef, + state: Arc, +) -> ScalarUdf { let func_cloned = func.clone(); let return_type: ReturnTypeFunction = Arc::new(move |input_types: &[ConcreteDataType]| { Ok(Arc::new(func_cloned.return_type(input_types)?)) @@ -38,6 +43,7 @@ pub fn create_udf(func: FunctionRef, query_ctx: QueryContextRef) -> ScalarUdf { let fun: ScalarFunctionImplementation = Arc::new(move |args: &[ColumnarValue]| { let func_ctx = FunctionContext { query_ctx: query_ctx.clone(), + state: state.clone(), }; let len = args @@ -101,7 +107,7 @@ mod tests { } // create a udf and test it again - let udf = create_udf(f.clone(), query_ctx); + let udf = create_udf(f.clone(), query_ctx, Arc::new(FunctionState::default())); assert_eq!("test_and", udf.name); assert_eq!(f.signature(), udf.signature); diff --git a/src/common/function/src/state.rs b/src/common/function/src/state.rs new file mode 100644 index 000000000000..a5a4935cddac --- /dev/null +++ b/src/common/function/src/state.rs @@ -0,0 +1,25 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::handlers::{MetaServiceHandlerRef, TableMutationHandlerRef}; + +/// Shared state for SQL functions. +/// The handlers in state may be `None` in cli command-line or test cases. +#[derive(Clone, Default)] +pub struct FunctionState { + // The table mutation handler + pub table_mutation_handler: Option, + // The meta service handler + pub meta_service_handler: Option, +} diff --git a/src/common/function/src/system.rs b/src/common/function/src/system.rs index 3a753f8d7d92..94beda6966f9 100644 --- a/src/common/function/src/system.rs +++ b/src/common/function/src/system.rs @@ -12,12 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod build; -pub mod version; +mod build; +mod database; +mod timezone; +mod version; use std::sync::Arc; use build::BuildFunction; +use database::DatabaseFunction; +use timezone::TimezoneFunction; use version::VersionFunction; use crate::function_registry::FunctionRegistry; @@ -28,5 +32,7 @@ impl SystemFunction { pub fn register(registry: &FunctionRegistry) { registry.register(Arc::new(BuildFunction)); registry.register(Arc::new(VersionFunction)); + registry.register(Arc::new(DatabaseFunction)); + registry.register(Arc::new(TimezoneFunction)); } } diff --git a/src/common/function/src/system/database.rs b/src/common/function/src/system/database.rs new file mode 100644 index 000000000000..2efe8bb8e94f --- /dev/null +++ b/src/common/function/src/system/database.rs @@ -0,0 +1,92 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::{self}; +use std::sync::Arc; + +use common_query::error::Result; +use common_query::prelude::{Signature, Volatility}; +use datatypes::prelude::{ConcreteDataType, ScalarVector}; +use datatypes::vectors::{StringVector, VectorRef}; + +use crate::function::{Function, FunctionContext}; + +/// A function to return current schema name. +#[derive(Clone, Debug, Default)] +pub struct DatabaseFunction; + +const NAME: &str = "database"; + +impl Function for DatabaseFunction { + fn name(&self) -> &str { + NAME + } + + fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result { + Ok(ConcreteDataType::string_datatype()) + } + + fn signature(&self) -> Signature { + Signature::uniform(0, vec![], Volatility::Immutable) + } + + fn eval(&self, func_ctx: FunctionContext, _columns: &[VectorRef]) -> Result { + let db = func_ctx.query_ctx.current_schema(); + + Ok(Arc::new(StringVector::from_slice(&[db])) as _) + } +} + +impl fmt::Display for DatabaseFunction { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "DATABASE") + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_query::prelude::TypeSignature; + use session::context::QueryContextBuilder; + + use super::*; + #[test] + fn test_build_function() { + let build = DatabaseFunction; + assert_eq!("database", build.name()); + assert_eq!( + ConcreteDataType::string_datatype(), + build.return_type(&[]).unwrap() + ); + assert!(matches!(build.signature(), + Signature { + type_signature: TypeSignature::Uniform(0, valid_types), + volatility: Volatility::Immutable + } if valid_types == vec![] + )); + + let query_ctx = QueryContextBuilder::default() + .current_schema("test_db".to_string()) + .build(); + + let func_ctx = FunctionContext { + query_ctx, + ..Default::default() + }; + let vector = build.eval(func_ctx, &[]).unwrap(); + let expect: VectorRef = Arc::new(StringVector::from(vec!["test_db"])); + assert_eq!(expect, vector); + } +} diff --git a/src/common/function/src/system/timezone.rs b/src/common/function/src/system/timezone.rs new file mode 100644 index 000000000000..4a85e3956341 --- /dev/null +++ b/src/common/function/src/system/timezone.rs @@ -0,0 +1,90 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::{self}; +use std::sync::Arc; + +use common_query::error::Result; +use common_query::prelude::{Signature, Volatility}; +use datatypes::prelude::{ConcreteDataType, ScalarVector}; +use datatypes::vectors::{StringVector, VectorRef}; + +use crate::function::{Function, FunctionContext}; + +/// A function to return current session timezone. +#[derive(Clone, Debug, Default)] +pub struct TimezoneFunction; + +const NAME: &str = "timezone"; + +impl Function for TimezoneFunction { + fn name(&self) -> &str { + NAME + } + + fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result { + Ok(ConcreteDataType::string_datatype()) + } + + fn signature(&self) -> Signature { + Signature::uniform(0, vec![], Volatility::Immutable) + } + + fn eval(&self, func_ctx: FunctionContext, _columns: &[VectorRef]) -> Result { + let tz = func_ctx.query_ctx.timezone().to_string(); + + Ok(Arc::new(StringVector::from_slice(&[&tz])) as _) + } +} + +impl fmt::Display for TimezoneFunction { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "TIMEZONE") + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_query::prelude::TypeSignature; + use session::context::QueryContextBuilder; + + use super::*; + #[test] + fn test_build_function() { + let build = TimezoneFunction; + assert_eq!("timezone", build.name()); + assert_eq!( + ConcreteDataType::string_datatype(), + build.return_type(&[]).unwrap() + ); + assert!(matches!(build.signature(), + Signature { + type_signature: TypeSignature::Uniform(0, valid_types), + volatility: Volatility::Immutable + } if valid_types == vec![] + )); + + let query_ctx = QueryContextBuilder::default().build(); + + let func_ctx = FunctionContext { + query_ctx, + ..Default::default() + }; + let vector = build.eval(func_ctx, &[]).unwrap(); + let expect: VectorRef = Arc::new(StringVector::from(vec!["UTC"])); + assert_eq!(expect, vector); + } +} diff --git a/src/common/function/src/table.rs b/src/common/function/src/table.rs new file mode 100644 index 000000000000..64b8fd20096c --- /dev/null +++ b/src/common/function/src/table.rs @@ -0,0 +1,31 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod migrate_region; + +use std::sync::Arc; + +use migrate_region::MigrateRegionFunction; + +use crate::function_registry::FunctionRegistry; + +/// Table functions +pub(crate) struct TableFunction; + +impl TableFunction { + /// Register all table functions to [`FunctionRegistry`]. + pub fn register(registry: &FunctionRegistry) { + registry.register(Arc::new(MigrateRegionFunction)); + } +} diff --git a/src/common/function/src/table/migrate_region.rs b/src/common/function/src/table/migrate_region.rs new file mode 100644 index 000000000000..f969bada02d3 --- /dev/null +++ b/src/common/function/src/table/migrate_region.rs @@ -0,0 +1,175 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::{self}; +use std::time::Duration; + +use common_query::error::Error::ThreadJoin; +use common_query::error::{ + InvalidFuncArgsSnafu, InvalidInputTypeSnafu, MissingTableMutationHandlerSnafu, Result, +}; +use common_query::prelude::{Signature, TypeSignature, Volatility}; +use common_telemetry::logging::error; +use datatypes::prelude::{ConcreteDataType, MutableVector, ScalarVectorBuilder}; +use datatypes::value::Value; +use datatypes::vectors::{StringVectorBuilder, VectorRef}; +use snafu::{Location, OptionExt, ResultExt}; + +use crate::function::{Function, FunctionContext}; + +/// A function to migrate a region from source peer to target peer. +/// Returns the submitted procedure id if success. Only available in cluster mode. +/// +/// - `migrate_region(region_id, from_peer, to_peer)`, with default replay WAL timeout(10 seconds). +/// - `migrate_region(region_id, from_peer, to_peer, timeout(secs))` +/// +/// The parameters: +/// - `region_id`: the region id +/// - `from_peer`: the source peer id +/// - `to_peer`: the target peer id +#[derive(Clone, Debug, Default)] +pub struct MigrateRegionFunction; + +const NAME: &str = "migrate_region"; +const DEFAULT_REPLAY_TIMEOUT_SECS: u64 = 10; + +fn cast_u64_vector(vector: &VectorRef) -> Result { + vector + .cast(&ConcreteDataType::uint64_datatype()) + .context(InvalidInputTypeSnafu { + err_msg: format!( + "Failed to cast input into uint64, actual type: {:#?}", + vector.data_type(), + ), + }) +} + +impl Function for MigrateRegionFunction { + fn name(&self) -> &str { + NAME + } + + fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result { + Ok(ConcreteDataType::string_datatype()) + } + + fn signature(&self) -> Signature { + Signature::one_of( + vec![ + // migrate_region(region_id, from_peer, to_peer) + TypeSignature::Uniform(3, ConcreteDataType::numerics()), + // migrate_region(region_id, from_peer, to_peer, timeout(secs)) + TypeSignature::Uniform(4, ConcreteDataType::numerics()), + ], + Volatility::Immutable, + ) + } + + fn eval(&self, func_ctx: FunctionContext, columns: &[VectorRef]) -> Result { + let (region_ids, from_peers, to_peers, replay_timeouts) = match columns.len() { + 3 => { + let region_ids = cast_u64_vector(&columns[0])?; + let from_peers = cast_u64_vector(&columns[1])?; + let to_peers = cast_u64_vector(&columns[2])?; + + (region_ids, from_peers, to_peers, None) + } + + 4 => { + let region_ids = cast_u64_vector(&columns[0])?; + let from_peers = cast_u64_vector(&columns[1])?; + let to_peers = cast_u64_vector(&columns[2])?; + let replay_timeouts = cast_u64_vector(&columns[3])?; + + (region_ids, from_peers, to_peers, Some(replay_timeouts)) + } + + size => { + return InvalidFuncArgsSnafu { + err_msg: format!( + "The length of the args is not correct, expect exactly 3 or 4, have: {}", + size + ), + } + .fail(); + } + }; + + std::thread::spawn(move || { + let len = region_ids.len(); + let mut results = StringVectorBuilder::with_capacity(len); + + for index in 0..len { + let region_id = region_ids.get(index); + let from_peer = from_peers.get(index); + let to_peer = to_peers.get(index); + let replay_timeout = match &replay_timeouts { + Some(replay_timeouts) => replay_timeouts.get(index), + None => Value::UInt64(DEFAULT_REPLAY_TIMEOUT_SECS), + }; + + match (region_id, from_peer, to_peer, replay_timeout) { + ( + Value::UInt64(region_id), + Value::UInt64(from_peer), + Value::UInt64(to_peer), + Value::UInt64(replay_timeout), + ) => { + let func_ctx = func_ctx.clone(); + + let pid = common_runtime::block_on_read(async move { + func_ctx + .state + .table_mutation_handler + .as_ref() + .context(MissingTableMutationHandlerSnafu)? + .migrate_region( + region_id, + from_peer, + to_peer, + Duration::from_secs(replay_timeout), + ) + .await + })?; + + results.push(Some(&pid)); + } + _ => { + results.push(None); + } + } + } + + Ok(results.to_vector()) + }) + .join() + .map_err(|e| { + error!(e; "Join thread error"); + ThreadJoin { + location: Location::default(), + } + })? + } +} + +impl fmt::Display for MigrateRegionFunction { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "MIGRATE_REGION") + } +} + +#[cfg(test)] +mod tests { + // FIXME(dennis): test in the following PR. +} diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index e5985197dee9..18c8a6e5d555 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -30,6 +30,7 @@ derive_builder.workspace = true etcd-client.workspace = true futures-util.workspace = true futures.workspace = true +hex = { version = "0.4" } humantime-serde.workspace = true lazy_static.workspace = true prometheus.workspace = true diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 9bddb5c927f8..383006e3cc66 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -100,6 +100,14 @@ pub enum Error { source: common_procedure::Error, }, + #[snafu(display("Failed to parse procedure id: {key}"))] + ParseProcedureId { + location: Location, + key: String, + #[snafu(source)] + error: common_procedure::ParseIdError, + }, + #[snafu(display("Unsupported operation {}", operation))] Unsupported { operation: String, @@ -435,7 +443,9 @@ impl ErrorExt for Error { InvalidCatalogValue { source, .. } => source.status_code(), ConvertAlterTableRequest { source, .. } => source.status_code(), - InvalidNumTopics { .. } | EmptyCreateTableTasks { .. } => StatusCode::InvalidArguments, + ParseProcedureId { .. } | InvalidNumTopics { .. } | EmptyCreateTableTasks { .. } => { + StatusCode::InvalidArguments + } } } diff --git a/src/common/meta/src/rpc.rs b/src/common/meta/src/rpc.rs index 5d07408e2cd8..978a43cd25b3 100644 --- a/src/common/meta/src/rpc.rs +++ b/src/common/meta/src/rpc.rs @@ -14,6 +14,7 @@ pub mod ddl; pub mod lock; +pub mod procedure; pub mod router; pub mod store; pub mod util; diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index a9a455f878ba..957bb7e4bfe0 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -14,13 +14,13 @@ use std::result; -use api::v1::meta::submit_ddl_task_request::Task; +use api::v1::meta::ddl_task_request::Task; use api::v1::meta::{ AlterTableTask as PbAlterTableTask, AlterTableTasks as PbAlterTableTasks, CreateTableTask as PbCreateTableTask, CreateTableTasks as PbCreateTableTasks, - DropTableTask as PbDropTableTask, DropTableTasks as PbDropTableTasks, Partition, - SubmitDdlTaskRequest as PbSubmitDdlTaskRequest, - SubmitDdlTaskResponse as PbSubmitDdlTaskResponse, TruncateTableTask as PbTruncateTableTask, + DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse, + DropTableTask as PbDropTableTask, DropTableTasks as PbDropTableTasks, Partition, ProcedureId, + TruncateTableTask as PbTruncateTableTask, }; use api::v1::{AlterExpr, CreateTableExpr, DropTableExpr, TruncateTableExpr}; use base64::engine::general_purpose; @@ -146,7 +146,7 @@ pub struct SubmitDdlTaskRequest { pub task: DdlTask, } -impl TryFrom for PbSubmitDdlTaskRequest { +impl TryFrom for PbDdlTaskRequest { type Error = error::Error; fn try_from(request: SubmitDdlTaskRequest) -> Result { @@ -198,24 +198,24 @@ pub struct SubmitDdlTaskResponse { pub table_ids: Vec, } -impl TryFrom for SubmitDdlTaskResponse { +impl TryFrom for SubmitDdlTaskResponse { type Error = error::Error; - fn try_from(resp: PbSubmitDdlTaskResponse) -> Result { + fn try_from(resp: PbDdlTaskResponse) -> Result { let table_id = resp.table_id.map(|t| t.id); let table_ids = resp.table_ids.into_iter().map(|t| t.id).collect(); Ok(Self { - key: resp.key, + key: resp.pid.map(|pid| pid.key).unwrap_or_default(), table_id, table_ids, }) } } -impl From for PbSubmitDdlTaskResponse { +impl From for PbDdlTaskResponse { fn from(val: SubmitDdlTaskResponse) -> Self { Self { - key: val.key, + pid: Some(ProcedureId { key: val.key }), table_id: val .table_id .map(|table_id| api::v1::meta::TableId { id: table_id }), diff --git a/src/common/meta/src/rpc/procedure.rs b/src/common/meta/src/rpc/procedure.rs new file mode 100644 index 000000000000..9e64edb715c8 --- /dev/null +++ b/src/common/meta/src/rpc/procedure.rs @@ -0,0 +1,104 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::{ + ProcedureId as PbProcedureId, ProcedureStateResponse as PbProcedureStateResponse, + ProcedureStatus as PbProcedureStatus, +}; +use common_procedure::{ProcedureId, ProcedureState}; +use snafu::ResultExt; + +use crate::error::{ParseProcedureIdSnafu, Result}; + +/// Cast the protobuf [`ProcedureId`] to common [`ProcedureId`]. +pub fn pb_pid_to_pid(pid: &PbProcedureId) -> Result { + ProcedureId::parse_str(&String::from_utf8_lossy(&pid.key)).with_context(|_| { + ParseProcedureIdSnafu { + key: hex::encode(&pid.key), + } + }) +} + +/// Cast the common [`ProcedureId`] to protobuf [`ProcedureId`]. +pub fn pid_to_pb_pid(pid: ProcedureId) -> PbProcedureId { + PbProcedureId { + key: pid.to_string().into(), + } +} + +/// Cast the common [`ProcedureState`] to pb [`ProcedureStateResponse`]. +pub fn procedure_state_to_pb_response(state: &ProcedureState) -> PbProcedureStateResponse { + let (status, error) = match state { + ProcedureState::Running => (PbProcedureStatus::Running, String::default()), + ProcedureState::Done { .. } => (PbProcedureStatus::Done, String::default()), + ProcedureState::Retrying { error } => (PbProcedureStatus::Retrying, error.to_string()), + ProcedureState::Failed { error } => (PbProcedureStatus::Failed, error.to_string()), + }; + + PbProcedureStateResponse { + status: status.into(), + error, + ..Default::default() + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_procedure::Error; + use snafu::Location; + + use super::*; + + #[test] + fn test_pid_pb_pid_conversion() { + let pid = ProcedureId::random(); + + let pb_pid = pid_to_pb_pid(pid); + + assert_eq!(pid, pb_pid_to_pid(&pb_pid).unwrap()); + } + + #[test] + fn test_procedure_state_to_pb_response() { + let state = ProcedureState::Running; + let resp = procedure_state_to_pb_response(&state); + assert_eq!(PbProcedureStatus::Running as i32, resp.status); + assert!(resp.error.is_empty()); + + let state = ProcedureState::Done { output: None }; + let resp = procedure_state_to_pb_response(&state); + assert_eq!(PbProcedureStatus::Done as i32, resp.status); + assert!(resp.error.is_empty()); + + let state = ProcedureState::Retrying { + error: Arc::new(Error::ManagerNotStart { + location: Location::default(), + }), + }; + let resp = procedure_state_to_pb_response(&state); + assert_eq!(PbProcedureStatus::Retrying as i32, resp.status); + assert_eq!("Procedure Manager is stopped", resp.error); + + let state = ProcedureState::Failed { + error: Arc::new(Error::ManagerNotStart { + location: Location::default(), + }), + }; + let resp = procedure_state_to_pb_response(&state); + assert_eq!(PbProcedureStatus::Failed as i32, resp.status); + assert_eq!("Procedure Manager is stopped", resp.error); + } +} diff --git a/src/common/procedure/src/lib.rs b/src/common/procedure/src/lib.rs index 36ccd6b19050..cef90d8dfe09 100644 --- a/src/common/procedure/src/lib.rs +++ b/src/common/procedure/src/lib.rs @@ -25,7 +25,8 @@ pub mod watcher; pub use crate::error::{Error, Result}; pub use crate::procedure::{ - BoxedProcedure, Context, ContextProvider, LockKey, Output, Procedure, ProcedureId, - ProcedureManager, ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, StringKey, + BoxedProcedure, Context, ContextProvider, LockKey, Output, ParseIdError, Procedure, + ProcedureId, ProcedureManager, ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, + StringKey, }; pub use crate::watcher::Watcher; diff --git a/src/common/query/src/error.rs b/src/common/query/src/error.rs index 86a3d5e958c8..49d8f35e39f1 100644 --- a/src/common/query/src/error.rs +++ b/src/common/query/src/error.rs @@ -169,6 +169,21 @@ pub enum Error { source: BoxedError, }, + #[snafu(display("Failed to join thread"))] + ThreadJoin { location: Location }, + + #[snafu(display("Failed to do table mutation"))] + TableMutation { + source: BoxedError, + location: Location, + }, + + #[snafu(display("Missing TableMutationHandler, not expected"))] + MissingTableMutationHandler { location: Location }, + + #[snafu(display("Missing MetaServiceHandler, not expected"))] + MissingMetaServiceHandler { location: Location }, + #[snafu(display("Invalid function args: {}", err_msg))] InvalidFuncArgs { err_msg: String, location: Location }, } @@ -197,9 +212,11 @@ impl ErrorExt for Error { | Error::ConvertArrowSchema { source, .. } | Error::FromArrowArray { source, .. } => source.status_code(), - Error::ExecuteRepeatedly { .. } | Error::GeneralDataFusion { .. } => { - StatusCode::Unexpected - } + Error::MissingTableMutationHandler { .. } + | Error::MissingMetaServiceHandler { .. } + | Error::ExecuteRepeatedly { .. } + | Error::ThreadJoin { .. } + | Error::GeneralDataFusion { .. } => StatusCode::Unexpected, Error::UnsupportedInputDataType { .. } | Error::TypeCast { .. } @@ -208,6 +225,7 @@ impl ErrorExt for Error { Error::ConvertDfRecordBatchStream { source, .. } => source.status_code(), Error::ExecutePhysicalPlan { source, .. } => source.status_code(), Error::Execute { source, .. } => source.status_code(), + Error::TableMutation { source, .. } => source.status_code(), } } diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index 6c1ffbcc47df..6f715755e7d5 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -229,6 +229,15 @@ impl ConcreteDataType { ] } + pub fn unsigned_integers() -> Vec { + vec![ + ConcreteDataType::uint8_datatype(), + ConcreteDataType::uint16_datatype(), + ConcreteDataType::uint32_datatype(), + ConcreteDataType::uint64_datatype(), + ] + } + /// Convert arrow data type to [ConcreteDataType]. /// /// # Panics diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index a5e9d35c8aa6..4620b92bea4a 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -13,10 +13,10 @@ // limitations under the License. mod ask_leader; -mod ddl; mod heartbeat; mod load_balance; mod lock; +mod procedure; mod store; @@ -33,9 +33,9 @@ use common_meta::rpc::store::{ DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, }; use common_telemetry::info; -use ddl::Client as DdlClient; use heartbeat::Client as HeartbeatClient; use lock::Client as LockClient; +use procedure::Client as ProcedureClient; use snafu::{OptionExt, ResultExt}; use store::Client as StoreClient; @@ -157,7 +157,7 @@ impl MetaClientBuilder { } if self.enable_ddl { let mgr = self.ddl_channel_manager.unwrap_or(mgr); - client.ddl = Some(DdlClient::new( + client.ddl = Some(ProcedureClient::new( self.id, self.role, mgr, @@ -176,7 +176,7 @@ pub struct MetaClient { heartbeat: Option, store: Option, lock: Option, - ddl: Option, + ddl: Option, } #[async_trait::async_trait] @@ -328,6 +328,7 @@ impl MetaClient { Ok(()) } + /// Submit a DDL task pub async fn submit_ddl_task( &self, req: SubmitDdlTaskRequest, @@ -364,7 +365,7 @@ impl MetaClient { } #[inline] - pub fn ddl_client(&self) -> Result { + pub fn ddl_client(&self) -> Result { self.ddl .clone() .context(error::NotStartedSnafu { name: "ddl_client" }) diff --git a/src/meta-client/src/client/ddl.rs b/src/meta-client/src/client/procedure.rs similarity index 53% rename from src/meta-client/src/client/ddl.rs rename to src/meta-client/src/client/procedure.rs index d36080b1b4c9..638f19bb8a67 100644 --- a/src/meta-client/src/client/ddl.rs +++ b/src/meta-client/src/client/procedure.rs @@ -12,10 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::future::Future; use std::sync::Arc; +use std::time::Duration; -use api::v1::meta::ddl_task_client::DdlTaskClient; -use api::v1::meta::{ErrorCode, ResponseHeader, Role, SubmitDdlTaskRequest, SubmitDdlTaskResponse}; +use api::v1::meta::procedure_service_client::ProcedureServiceClient; +use api::v1::meta::{ + DdlTaskRequest, DdlTaskResponse, ErrorCode, MigrateRegionRequest, MigrateRegionResponse, + ProcedureId, ProcedureStateResponse, QueryProcedureRequest, ResponseHeader, Role, +}; use common_grpc::channel_manager::ChannelManager; use common_telemetry::tracing_context::TracingContext; use common_telemetry::{info, warn}; @@ -61,17 +66,37 @@ impl Client { inner.is_started() } - pub async fn submit_ddl_task( - &self, - req: SubmitDdlTaskRequest, - ) -> Result { + pub async fn submit_ddl_task(&self, req: DdlTaskRequest) -> Result { let inner = self.inner.read().await; inner.submit_ddl_task(req).await } + + /// Query the procedure' state by its id + pub async fn query_procedure_state(&self, pid: &str) -> Result { + let inner = self.inner.read().await; + inner.query_procedure_state(pid).await + } + + /// Migrate the region from one datanode to the other datanode: + /// - `region_id`: the migrated region id + /// - `from_peer`: the source datanode id + /// - `to_peer`: the target datanode id + /// - `replay_timeout`: replay WAL timeout after migration. + pub async fn migrate_region( + &self, + region_id: u64, + from_peer: u64, + to_peer: u64, + replay_timeout: Duration, + ) -> Result { + let inner = self.inner.read().await; + inner + .migrate_region(region_id, from_peer, to_peer, replay_timeout) + .await + } } #[derive(Debug)] - struct Inner { id: Id, role: Role, @@ -109,13 +134,13 @@ impl Inner { Ok(()) } - fn make_client(&self, addr: impl AsRef) -> Result> { + fn make_client(&self, addr: impl AsRef) -> Result> { let channel = self .channel_manager .get(addr) .context(error::CreateChannelSnafu)?; - Ok(DdlTaskClient::new(channel)) + Ok(ProcedureServiceClient::new(channel)) } #[inline] @@ -123,10 +148,7 @@ impl Inner { self.ask_leader.is_some() } - pub async fn submit_ddl_task( - &self, - mut req: SubmitDdlTaskRequest, - ) -> Result { + fn ask_leader(&self) -> Result<&AskLeader> { ensure!( self.is_started(), error::IllegalGrpcClientStateSnafu { @@ -134,22 +156,25 @@ impl Inner { } ); - req.set_header( - self.id, - self.role, - TracingContext::from_current_span().to_w3c(), - ); - let ask_leader = self.ask_leader.as_ref().unwrap(); + Ok(self.ask_leader.as_ref().unwrap()) + } + + async fn with_retry(&self, task: &str, body_fn: F, get_header: H) -> Result + where + R: Future>, + F: Fn(ProcedureServiceClient) -> R, + H: Fn(&T) -> &Option, + { + let ask_leader = self.ask_leader()?; let mut times = 0; while times < self.max_retry { if let Some(leader) = &ask_leader.get_leader() { - let mut client = self.make_client(leader)?; - match client.submit_ddl_task(req.clone()).await { + let client = self.make_client(leader)?; + match body_fn(client).await { Ok(res) => { - let res = res.into_inner(); - if is_not_leader(&res.header) { - warn!("Failed to submitting ddl to {leader}, not a leader"); + if is_not_leader(get_header(&res)) { + warn!("Failed to {task} to {leader}, not a leader"); let leader = ask_leader.ask_leader().await?; info!("DDL client updated to new leader addr: {leader}"); times += 1; @@ -160,9 +185,9 @@ impl Inner { Err(status) => { // The leader may be unreachable. if is_unreachable(&status) { - warn!("Failed to submitting ddl to {leader}, source: {status}"); + warn!("Failed to {task} to {leader}, source: {status}"); let leader = ask_leader.ask_leader().await?; - info!("DDL client updated to new leader addr: {leader}"); + info!("Procedure client updated to new leader addr: {leader}"); times += 1; continue; } else { @@ -176,11 +201,86 @@ impl Inner { } error::RetryTimesExceededSnafu { - msg: "Failed to submit DDL task", + msg: "Failed to {task}", times: self.max_retry, } .fail() } + + async fn migrate_region( + &self, + region_id: u64, + from_peer: u64, + to_peer: u64, + replay_timeout: Duration, + ) -> Result { + let mut req = MigrateRegionRequest { + region_id, + from_peer, + to_peer, + replay_timeout_secs: replay_timeout.as_secs() as u32, + ..Default::default() + }; + + req.set_header( + self.id, + self.role, + TracingContext::from_current_span().to_w3c(), + ); + + self.with_retry( + "migrate region", + move |mut client| { + let req = req.clone(); + + async move { client.migrate(req).await.map(|res| res.into_inner()) } + }, + |resp: &MigrateRegionResponse| &resp.header, + ) + .await + } + + async fn query_procedure_state(&self, pid: &str) -> Result { + let mut req = QueryProcedureRequest { + pid: Some(ProcedureId { key: pid.into() }), + ..Default::default() + }; + + req.set_header( + self.id, + self.role, + TracingContext::from_current_span().to_w3c(), + ); + + self.with_retry( + "query procedure state", + move |mut client| { + let req = req.clone(); + + async move { client.query(req).await.map(|res| res.into_inner()) } + }, + |resp: &ProcedureStateResponse| &resp.header, + ) + .await + } + + async fn submit_ddl_task(&self, mut req: DdlTaskRequest) -> Result { + req.set_header( + self.id, + self.role, + TracingContext::from_current_span().to_w3c(), + ); + + self.with_retry( + "submit ddl task", + move |mut client| { + let req = req.clone(); + async move { client.ddl(req).await.map(|res| res.into_inner()) } + }, + |resp: &DdlTaskResponse| &resp.header, + ) + .await + } } fn is_unreachable(status: &Status) -> bool { diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 4118e79e8185..c959973b2b95 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -15,9 +15,9 @@ use std::sync::Arc; use api::v1::meta::cluster_server::ClusterServer; -use api::v1::meta::ddl_task_server::DdlTaskServer; use api::v1::meta::heartbeat_server::HeartbeatServer; use api::v1::meta::lock_server::LockServer; +use api::v1::meta::procedure_service_server::ProcedureServiceServer; use api::v1::meta::store_server::StoreServer; use common_base::Plugins; use common_meta::kv_backend::chroot::ChrootKvBackend; @@ -172,7 +172,7 @@ pub fn router(meta_srv: MetaSrv) -> Router { .add_service(StoreServer::new(meta_srv.clone())) .add_service(ClusterServer::new(meta_srv.clone())) .add_service(LockServer::new(meta_srv.clone())) - .add_service(DdlTaskServer::new(meta_srv.clone())) + .add_service(ProcedureServiceServer::new(meta_srv.clone())) .add_service(admin::make_admin_service(meta_srv)) } diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 4b70d6479779..fb56bca3bbee 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -478,6 +478,15 @@ pub enum Error { source: common_procedure::Error, }, + #[snafu(display("Failed to query procedure state"))] + QueryProcedure { + location: Location, + source: common_procedure::Error, + }, + + #[snafu(display("Procedure not found: {pid}"))] + ProcedureNotFound { location: Location, pid: String }, + #[snafu(display("Failed to submit procedure"))] SubmitProcedure { location: Location, @@ -706,6 +715,7 @@ impl ErrorExt for Error { | Error::InvalidArguments { .. } | Error::InitExportMetricsTask { .. } | Error::InvalidHeartbeatRequest { .. } + | Error::ProcedureNotFound { .. } | Error::TooManyPartitions { .. } => StatusCode::InvalidArguments, Error::LeaseKeyFromUtf8 { .. } | Error::LeaseValueFromUtf8 { .. } @@ -731,9 +741,9 @@ impl ErrorExt for Error { Error::RequestDatanode { source, .. } => source.status_code(), Error::InvalidCatalogValue { source, .. } | Error::InvalidFullTableName { source, .. } => source.status_code(), - Error::SubmitProcedure { source, .. } | Error::WaitProcedure { source, .. } => { - source.status_code() - } + Error::SubmitProcedure { source, .. } + | Error::WaitProcedure { source, .. } + | Error::QueryProcedure { source, .. } => source.status_code(), Error::ShutdownServer { source, .. } | Error::StartHttp { source, .. } => { source.status_code() } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 70135f34c1d0..cddcd06885d7 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -27,6 +27,7 @@ use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBac use common_meta::peer::Peer; use common_meta::region_keeper::MemoryRegionKeeperRef; use common_meta::wal_options_allocator::WalOptionsAllocatorRef; +use common_meta::{distributed_time_constants, ClusterId}; use common_procedure::options::ProcedureConfig; use common_procedure::ProcedureManagerRef; use common_telemetry::logging::LoggingOptions; @@ -47,6 +48,7 @@ use crate::error::{ }; use crate::failure_detector::PhiAccrualFailureDetectorOptions; use crate::handler::HeartbeatHandlerGroup; +use crate::lease::lookup_alive_datanode_peer; use crate::lock::DistLockRef; use crate::procedure::region_migration::manager::RegionMigrationManagerRef; use crate::pubsub::{PublishRef, SubscribeManagerRef}; @@ -369,6 +371,21 @@ impl MetaSrv { .context(StopProcedureManagerSnafu) } + /// Lookup a peer by peer_id, return it only when it's alive. + pub(crate) async fn lookup_peer( + &self, + cluster_id: ClusterId, + peer_id: u64, + ) -> Result> { + lookup_alive_datanode_peer( + cluster_id, + peer_id, + &self.meta_peer_client, + distributed_time_constants::DATANODE_LEASE_SECS, + ) + .await + } + #[inline] pub fn options(&self) -> &MetaSrvOptions { &self.options diff --git a/src/meta-srv/src/mocks.rs b/src/meta-srv/src/mocks.rs index ebcca26301d3..c042e379243e 100644 --- a/src/meta-srv/src/mocks.rs +++ b/src/meta-srv/src/mocks.rs @@ -15,8 +15,8 @@ use std::sync::Arc; use std::time::Duration; -use api::v1::meta::ddl_task_server::DdlTaskServer; use api::v1::meta::heartbeat_server::HeartbeatServer; +use api::v1::meta::procedure_service_server::ProcedureServiceServer; use api::v1::meta::store_server::StoreServer; use client::client_manager::DatanodeClients; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; @@ -83,7 +83,7 @@ pub async fn mock( tonic::transport::Server::builder() .add_service(HeartbeatServer::new(service.clone())) .add_service(StoreServer::new(service.clone())) - .add_service(DdlTaskServer::new(service.clone())) + .add_service(ProcedureServiceServer::new(service.clone())) .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) .await }); diff --git a/src/meta-srv/src/service.rs b/src/meta-srv/src/service.rs index 3a13b1fe5085..c3e2e781e899 100644 --- a/src/meta-srv/src/service.rs +++ b/src/meta-srv/src/service.rs @@ -19,10 +19,10 @@ use tonic::{Response, Status}; pub mod admin; pub mod cluster; -pub mod ddl; mod heartbeat; pub mod lock; pub mod mailbox; +pub mod procedure; pub mod store; pub type GrpcResult = Result, Status>; diff --git a/src/meta-srv/src/service/ddl.rs b/src/meta-srv/src/service/ddl.rs deleted file mode 100644 index 44f81032b9f9..000000000000 --- a/src/meta-srv/src/service/ddl.rs +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use api::v1::meta::{ - ddl_task_server, SubmitDdlTaskRequest as PbSubmitDdlTaskRequest, - SubmitDdlTaskResponse as PbSubmitDdlTaskResponse, -}; -use common_meta::ddl::ExecutorContext; -use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest}; -use snafu::{OptionExt, ResultExt}; -use tonic::{Request, Response}; - -use super::GrpcResult; -use crate::error; -use crate::metasrv::MetaSrv; - -#[async_trait::async_trait] -impl ddl_task_server::DdlTask for MetaSrv { - async fn submit_ddl_task( - &self, - request: Request, - ) -> GrpcResult { - let PbSubmitDdlTaskRequest { header, task, .. } = request.into_inner(); - - let header = header.context(error::MissingRequestHeaderSnafu)?; - let cluster_id = header.cluster_id; - let task: DdlTask = task - .context(error::MissingRequiredParameterSnafu { param: "task" })? - .try_into() - .context(error::ConvertProtoDataSnafu)?; - - let resp = self - .ddl_executor() - .submit_ddl_task( - &ExecutorContext { - cluster_id: Some(cluster_id), - tracing_context: Some(header.tracing_context), - }, - SubmitDdlTaskRequest { task }, - ) - .await - .context(error::SubmitDdlTaskSnafu)? - .into(); - - Ok(Response::new(resp)) - } -} diff --git a/src/meta-srv/src/service/procedure.rs b/src/meta-srv/src/service/procedure.rs new file mode 100644 index 000000000000..8181ee5e4214 --- /dev/null +++ b/src/meta-srv/src/service/procedure.rs @@ -0,0 +1,135 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use api::v1::meta::{ + procedure_service_server, DdlTaskRequest as PbDdlTaskRequest, + DdlTaskResponse as PbDdlTaskResponse, MigrateRegionRequest, MigrateRegionResponse, + ProcedureStateResponse, QueryProcedureRequest, +}; +use common_meta::ddl::ExecutorContext; +use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest}; +use common_meta::rpc::procedure; +use snafu::{ensure, OptionExt, ResultExt}; +use tonic::{Request, Response}; + +use super::GrpcResult; +use crate::error; +use crate::metasrv::MetaSrv; +use crate::procedure::region_migration::manager::RegionMigrationProcedureTask; + +#[async_trait::async_trait] +impl procedure_service_server::ProcedureService for MetaSrv { + async fn query( + &self, + request: Request, + ) -> GrpcResult { + let QueryProcedureRequest { header, pid, .. } = request.into_inner(); + let _header = header.context(error::MissingRequestHeaderSnafu)?; + let pid = pid.context(error::MissingRequiredParameterSnafu { param: "pid" })?; + let pid = procedure::pb_pid_to_pid(&pid).context(error::ConvertProtoDataSnafu)?; + + let state = self + .procedure_manager() + .procedure_state(pid) + .await + .context(error::QueryProcedureSnafu)? + .context(error::ProcedureNotFoundSnafu { + pid: pid.to_string(), + })?; + + Ok(Response::new(procedure::procedure_state_to_pb_response( + &state, + ))) + } + + async fn ddl(&self, request: Request) -> GrpcResult { + let PbDdlTaskRequest { header, task, .. } = request.into_inner(); + + let header = header.context(error::MissingRequestHeaderSnafu)?; + let cluster_id = header.cluster_id; + let task: DdlTask = task + .context(error::MissingRequiredParameterSnafu { param: "task" })? + .try_into() + .context(error::ConvertProtoDataSnafu)?; + + let resp = self + .ddl_executor() + .submit_ddl_task( + &ExecutorContext { + cluster_id: Some(cluster_id), + tracing_context: Some(header.tracing_context), + }, + SubmitDdlTaskRequest { task }, + ) + .await + .context(error::SubmitDdlTaskSnafu)? + .into(); + + Ok(Response::new(resp)) + } + + async fn migrate( + &self, + request: Request, + ) -> GrpcResult { + ensure!( + self.meta_peer_client().is_leader(), + error::UnexpectedSnafu { + violated: "Trying to submit a region migration procedure to non-leader meta server" + } + ); + + let MigrateRegionRequest { + header, + region_id, + from_peer, + to_peer, + replay_timeout_secs, + .. + } = request.into_inner(); + + let header = header.context(error::MissingRequestHeaderSnafu)?; + let cluster_id = header.cluster_id; + + let from_peer = self + .lookup_peer(cluster_id, from_peer) + .await? + .context(error::PeerUnavailableSnafu { peer_id: from_peer })?; + let to_peer = self + .lookup_peer(cluster_id, to_peer) + .await? + .context(error::PeerUnavailableSnafu { peer_id: to_peer })?; + + let pid = self + .region_migration_manager() + .submit_procedure(RegionMigrationProcedureTask { + cluster_id, + region_id: region_id.into(), + from_peer, + to_peer, + replay_timeout: Duration::from_secs(replay_timeout_secs.into()), + }) + .await? + .map(procedure::pid_to_pb_pid); + + let resp = MigrateRegionResponse { + pid, + ..Default::default() + }; + + Ok(Response::new(resp)) + } +} diff --git a/src/operator/Cargo.toml b/src/operator/Cargo.toml index 29d6dc215833..91a410997db6 100644 --- a/src/operator/Cargo.toml +++ b/src/operator/Cargo.toml @@ -18,6 +18,7 @@ common-base.workspace = true common-catalog.workspace = true common-datasource.workspace = true common-error.workspace = true +common-function.workspace = true common-grpc-expr.workspace = true common-macro.workspace = true common-meta.workspace = true diff --git a/src/operator/src/table.rs b/src/operator/src/table.rs index 1d42c6e7a394..8d53a39c7c49 100644 --- a/src/operator/src/table.rs +++ b/src/operator/src/table.rs @@ -12,11 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::time::Duration; + use async_trait::async_trait; use common_error::ext::BoxedError; -use query::error as query_error; -use query::error::Result as QueryResult; -use query::table_mutation::{AffectedRows, TableMutationHandler}; +use common_function::handlers::{AffectedRows, TableMutationHandler}; +use common_query::error as query_error; +use common_query::error::Result as QueryResult; use session::context::QueryContextRef; use snafu::ResultExt; use sqlparser::ast::ObjectName; @@ -93,4 +95,15 @@ impl TableMutationHandler for TableMutationOperator { .map_err(BoxedError::new) .context(query_error::TableMutationSnafu) } + + async fn migrate_region( + &self, + _region_id: u64, + _from_peer: u64, + _to_peer: u64, + _replay_timeout: Duration, + ) -> QueryResult { + // FIXME(dennis): implemented in the following PR. + todo!(); + } } diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 6dab18487c56..121db1535bd9 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -55,7 +55,7 @@ pub use crate::datafusion::planner::DfContextProviderAdapter; use crate::error::{ CatalogSnafu, CreateRecordBatchSnafu, CreateSchemaSnafu, DataFusionSnafu, MissingTableMutationHandlerSnafu, MissingTimestampColumnSnafu, QueryExecutionSnafu, Result, - TableNotFoundSnafu, UnimplementedSnafu, UnsupportedExprSnafu, + TableMutationSnafu, TableNotFoundSnafu, UnimplementedSnafu, UnsupportedExprSnafu, }; use crate::executor::QueryExecutor; use crate::logical_optimizer::LogicalOptimizer; @@ -190,6 +190,7 @@ impl DatafusionQueryEngine { .context(MissingTableMutationHandlerSnafu)? .delete(request, query_ctx) .await + .context(TableMutationSnafu) } #[tracing::instrument(skip_all)] @@ -211,6 +212,7 @@ impl DatafusionQueryEngine { .context(MissingTableMutationHandlerSnafu)? .insert(request, query_ctx) .await + .context(TableMutationSnafu) } async fn find_table(&self, table_name: &ResolvedTableReference<'_>) -> Result { diff --git a/src/query/src/datafusion/planner.rs b/src/query/src/datafusion/planner.rs index 4e4b02b0fdb8..46865df5c2ee 100644 --- a/src/query/src/datafusion/planner.rs +++ b/src/query/src/datafusion/planner.rs @@ -109,7 +109,16 @@ impl ContextProvider for DfContextProviderAdapter { fn get_function_meta(&self, name: &str) -> Option> { self.engine_state.udf_function(name).map_or_else( || self.session_state.scalar_functions().get(name).cloned(), - |func| Some(Arc::new(create_udf(func, self.query_ctx.clone()).into())), + |func| { + Some(Arc::new( + create_udf( + func, + self.query_ctx.clone(), + self.engine_state.function_state(), + ) + .into(), + )) + }, ) } diff --git a/src/query/src/error.rs b/src/query/src/error.rs index 96db70810481..c646a9ae659c 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -247,7 +247,7 @@ pub enum Error { #[snafu(display("Table mutation error"))] TableMutation { - source: BoxedError, + source: common_query::error::Error, location: Location, }, diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs index 6ed714a50d6c..135acabb61d3 100644 --- a/src/query/src/lib.rs +++ b/src/query/src/lib.rs @@ -33,7 +33,6 @@ pub mod query_engine; mod range_select; pub mod region_query; pub mod sql; -pub mod table_mutation; pub use crate::datafusion::DfContextProviderAdapter; pub use crate::query_engine::{ diff --git a/src/query/src/query_engine.rs b/src/query/src/query_engine.rs index 2ae0b298281d..8a4edffc4aef 100644 --- a/src/query/src/query_engine.rs +++ b/src/query/src/query_engine.rs @@ -24,6 +24,7 @@ use catalog::CatalogManagerRef; use common_base::Plugins; use common_function::function::FunctionRef; use common_function::function_registry::FUNCTION_REGISTRY; +use common_function::handlers::TableMutationHandlerRef; use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_query::prelude::ScalarUdf; use common_query::Output; @@ -39,7 +40,6 @@ use crate::planner::LogicalPlanner; pub use crate::query_engine::context::QueryEngineContext; pub use crate::query_engine::state::QueryEngineState; use crate::region_query::RegionQueryHandlerRef; -use crate::table_mutation::TableMutationHandlerRef; /// Describe statement result #[derive(Debug)] diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index a8259694fa88..f5a6a828a420 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -20,7 +20,9 @@ use async_trait::async_trait; use catalog::CatalogManagerRef; use common_base::Plugins; use common_function::function::FunctionRef; +use common_function::handlers::{MetaServiceHandlerRef, TableMutationHandlerRef}; use common_function::scalars::aggregate::AggregateFunctionMetaRef; +use common_function::state::FunctionState; use common_query::physical_plan::SessionContext; use common_query::prelude::ScalarUdf; use common_telemetry::warn; @@ -48,7 +50,6 @@ use crate::optimizer::ExtensionAnalyzerRule; use crate::query_engine::options::QueryOptions; use crate::range_select::planner::RangeSelectPlanner; use crate::region_query::RegionQueryHandlerRef; -use crate::table_mutation::TableMutationHandlerRef; use crate::QueryEngineContext; /// Query engine global state @@ -59,7 +60,7 @@ use crate::QueryEngineContext; pub struct QueryEngineState { df_context: SessionContext, catalog_manager: CatalogManagerRef, - table_mutation_handler: Option, + function_state: Arc, udf_functions: Arc>>, aggregate_functions: Arc>>, extension_rules: Vec>, @@ -117,7 +118,11 @@ impl QueryEngineState { Self { df_context, catalog_manager: catalog_list, - table_mutation_handler, + function_state: Arc::new(FunctionState { + table_mutation_handler, + // FIXME(dennis): implemented in the following PR. + meta_service_handler: None, + }), aggregate_functions: Arc::new(RwLock::new(HashMap::new())), extension_rules, plugins, @@ -201,14 +206,22 @@ impl QueryEngineState { ); } - #[inline] pub fn catalog_manager(&self) -> &CatalogManagerRef { &self.catalog_manager } - #[inline] + pub fn function_state(&self) -> Arc { + self.function_state.clone() + } + + /// Returns the [`TableMutationHandlerRef`] in state. pub fn table_mutation_handler(&self) -> Option<&TableMutationHandlerRef> { - self.table_mutation_handler.as_ref() + self.function_state.table_mutation_handler.as_ref() + } + + /// Returns the [`MetaServiceHandlerRef`] in state. + pub fn meta_service_handler(&self) -> Option<&MetaServiceHandlerRef> { + self.function_state.meta_service_handler.as_ref() } pub(crate) fn disallow_cross_catalog_query(&self) -> bool { diff --git a/tests/cases/standalone/common/system/database.result b/tests/cases/standalone/common/system/database.result new file mode 100644 index 000000000000..4a85ffaddae6 --- /dev/null +++ b/tests/cases/standalone/common/system/database.result @@ -0,0 +1,28 @@ +use public; + +Affected Rows: 0 + +select database(); + ++------------+ +| database() | ++------------+ +| public | ++------------+ + +use information_schema; + +Affected Rows: 0 + +select database(); + ++--------------------+ +| database() | ++--------------------+ +| information_schema | ++--------------------+ + +use public; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/system/database.sql b/tests/cases/standalone/common/system/database.sql new file mode 100644 index 000000000000..fcad11ce8896 --- /dev/null +++ b/tests/cases/standalone/common/system/database.sql @@ -0,0 +1,9 @@ +use public; + +select database(); + +use information_schema; + +select database(); + +use public; diff --git a/tests/cases/standalone/common/system/timezone.result b/tests/cases/standalone/common/system/timezone.result index 8f3c59fbd248..586e8ba4fab9 100644 --- a/tests/cases/standalone/common/system/timezone.result +++ b/tests/cases/standalone/common/system/timezone.result @@ -15,6 +15,14 @@ SHOW VARIABLES system_time_zone; | UTC | +------------------+ +select timezone(); + ++------------+ +| timezone() | ++------------+ +| UTC | ++------------+ + CREATE TABLE test(d double, ts timestamp_ms time index); Affected Rows: 0 @@ -110,6 +118,14 @@ SHOW VARIABLES system_time_zone; | UTC | +------------------+ +select timezone(); + ++------------+ +| timezone() | ++------------+ +| +08:00 | ++------------+ + SELECT * from test; +-----+---------------------+ @@ -191,6 +207,14 @@ SHOW VARIABLES system_time_zone; | UTC | +------------------+ +select timezone(); + ++------------+ +| timezone() | ++------------+ +| -08:00 | ++------------+ + SELECT * from test; +-----+---------------------+ @@ -269,3 +293,11 @@ SHOW VARIABLES time_zone; | UTC | +-----------+ +select timezone(); + ++------------+ +| timezone() | ++------------+ +| UTC | ++------------+ + diff --git a/tests/cases/standalone/common/system/timezone.sql b/tests/cases/standalone/common/system/timezone.sql index 4b3d5f4f2d14..0bd2a9c91352 100644 --- a/tests/cases/standalone/common/system/timezone.sql +++ b/tests/cases/standalone/common/system/timezone.sql @@ -3,6 +3,8 @@ SHOW VARIABLES time_zone; SHOW VARIABLES system_time_zone; +select timezone(); + CREATE TABLE test(d double, ts timestamp_ms time index); INSERT INTO test values @@ -31,6 +33,8 @@ SHOW VARIABLES time_zone; SHOW VARIABLES system_time_zone; +select timezone(); + SELECT * from test; SELECT * from test where ts >= '2024-01-02 08:00:00'; @@ -50,6 +54,8 @@ SHOW VARIABLES time_zone; SHOW VARIABLES system_time_zone; +select timezone(); + SELECT * from test; SELECT * from test where ts >= '2024-01-02 08:00:00'; @@ -68,3 +74,5 @@ drop table test; SET TIME_ZONE = 'UTC'; SHOW VARIABLES time_zone; + +select timezone();