From 1536a0239519e09e0c545a2911eea3c5a8919ddb Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Fri, 23 Feb 2024 20:43:27 +0800 Subject: [PATCH] feat: admin_fn macros for administration functions --- Cargo.lock | 1 + src/common/function/Cargo.toml | 1 + src/common/function/src/handlers.rs | 12 + src/common/function/src/helper.rs | 9 + .../function/src/system/procedure_state.rs | 165 ++++-------- .../function/src/table/migrate_region.rs | 218 +++++++--------- src/common/macro/src/admin_fn.rs | 234 ++++++++++++++++++ src/common/macro/src/lib.rs | 10 +- src/common/macro/src/range_fn.rs | 56 +---- src/common/macro/src/utils.rs | 69 ++++++ src/datatypes/src/value.rs | 89 ++++--- src/operator/src/table.rs | 25 ++ 12 files changed, 567 insertions(+), 322 deletions(-) create mode 100644 src/common/macro/src/admin_fn.rs create mode 100644 src/common/macro/src/utils.rs diff --git a/Cargo.lock b/Cargo.lock index 689d6b030d6e..e8c96a28f923 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1791,6 +1791,7 @@ dependencies = [ "session", "snafu", "statrs", + "store-api", "table", ] diff --git a/src/common/function/Cargo.toml b/src/common/function/Cargo.toml index e2714bf99c0c..d6570782d489 100644 --- a/src/common/function/Cargo.toml +++ b/src/common/function/Cargo.toml @@ -34,6 +34,7 @@ serde_json.workspace = true session.workspace = true snafu.workspace = true statrs = "0.16" +store-api.workspace = true table.workspace = true [dev-dependencies] diff --git a/src/common/function/src/handlers.rs b/src/common/function/src/handlers.rs index 61b501be7019..11175a87bd73 100644 --- a/src/common/function/src/handlers.rs +++ b/src/common/function/src/handlers.rs @@ -19,6 +19,7 @@ use common_base::AffectedRows; use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse}; use common_query::error::Result; use session::context::QueryContextRef; +use store_api::storage::RegionId; use table::requests::{CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest}; /// A trait for handling table mutations in `QueryEngine`. @@ -40,6 +41,17 @@ pub trait TableMutationHandler: Send + Sync { request: CompactTableRequest, ctx: QueryContextRef, ) -> Result; + + /// Trigger a flush task for a table region. + async fn flush_region(&self, region_id: RegionId, ctx: QueryContextRef) + -> Result; + + /// Trigger a compaction task for a table region. + async fn compact_region( + &self, + region_id: RegionId, + ctx: QueryContextRef, + ) -> Result; } /// A trait for handling procedure service requests in `QueryEngine`. diff --git a/src/common/function/src/helper.rs b/src/common/function/src/helper.rs index 6f549d6619e3..8a52b2536f29 100644 --- a/src/common/function/src/helper.rs +++ b/src/common/function/src/helper.rs @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_query::error::Result; use common_query::prelude::{Signature, TypeSignature, Volatility}; use datatypes::prelude::ConcreteDataType; +use session::context::QueryContextRef; /// Create a function signature with oneof signatures of interleaving two arguments. pub fn one_of_sigs2(args1: Vec, args2: Vec) -> Signature { @@ -27,3 +29,10 @@ pub fn one_of_sigs2(args1: Vec, args2: Vec) Signature::one_of(sigs, Volatility::Immutable) } + +pub fn table_idents_to_full_name( + _name: &str, + _query_ctx: &QueryContextRef, +) -> Result<(String, String, String)> { + todo!() +} diff --git a/src/common/function/src/system/procedure_state.rs b/src/common/function/src/system/procedure_state.rs index 4f6305078465..f5e5e625fedf 100644 --- a/src/common/function/src/system/procedure_state.rs +++ b/src/common/function/src/system/procedure_state.rs @@ -13,9 +13,9 @@ // limitations under the License. use std::fmt; -use std::sync::Arc; use api::v1::meta::ProcedureStatus; +use common_macro::admin_fn; use common_meta::rpc::procedure::ProcedureStateResponse; use common_query::error::Error::ThreadJoin; use common_query::error::{ @@ -25,24 +25,13 @@ use common_query::error::{ use common_query::prelude::{Signature, Volatility}; use common_telemetry::error; use datatypes::prelude::*; -use datatypes::vectors::{ConstantVector, Helper, StringVector, VectorRef}; +use datatypes::vectors::VectorRef; use serde::Serialize; use snafu::{ensure, Location, OptionExt}; +use crate::ensure_greptime; use crate::function::{Function, FunctionContext}; - -const NAME: &str = "procedure_state"; - -/// A function to query procedure state by its id. -/// Such as `procedure_state(pid)`. -#[derive(Clone, Debug, Default)] -pub struct ProcedureStateFunction; - -impl fmt::Display for ProcedureStateFunction { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "PROCEDURE_STATE") - } -} +use crate::handlers::ProcedureServiceHandlerRef; #[derive(Serialize)] struct ProcedureStateJson { @@ -51,105 +40,57 @@ struct ProcedureStateJson { error: Option, } -impl Function for ProcedureStateFunction { - fn name(&self) -> &str { - NAME - } - - fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result { - Ok(ConcreteDataType::string_datatype()) - } - - fn signature(&self) -> Signature { - Signature::uniform( - 1, - vec![ConcreteDataType::string_datatype()], - Volatility::Immutable, - ) - } - - fn eval(&self, func_ctx: FunctionContext, columns: &[VectorRef]) -> Result { - crate::ensure_greptime!(func_ctx); - - ensure!( - columns.len() == 1, - InvalidFuncArgsSnafu { - err_msg: format!( - "The length of the args is not correct, expect 1, have: {}", - columns.len() - ), - } - ); +/// A function to query procedure state by its id. +/// Such as `procedure_state(pid)`. +#[admin_fn( + name = "ProcedureStateFunction", + display_name = "procedure_state", + sig_fn = "signature", + ret = "string" +)] +pub(crate) async fn procedure_state( + procedure_service_handler: &ProcedureServiceHandlerRef, + params: &[ValueRef<'_>], +) -> Result { + ensure!( + params.len() == 1, + InvalidFuncArgsSnafu { + err_msg: format!( + "The length of the args is not correct, expect 1, have: {}", + params.len() + ), + } + ); - let pids = columns[0].clone(); - let expect_len = pids.len(); - let is_const = pids.is_const(); - - match pids.data_type() { - ConcreteDataType::String(_) => { - // TODO(dennis): datafusion UDF doesn't support async function currently - std::thread::spawn(move || { - let pids: &StringVector = if is_const { - let pids: &ConstantVector = unsafe { Helper::static_cast(&pids) }; - unsafe { Helper::static_cast(pids.inner()) } - } else { - unsafe { Helper::static_cast(&pids) } - }; - - let procedure_service_handler = func_ctx - .state - .procedure_service_handler - .as_ref() - .context(MissingProcedureServiceHandlerSnafu)?; - - let states = pids - .iter_data() - .map(|pid| { - if let Some(pid) = pid { - let ProcedureStateResponse { status, error, .. } = - common_runtime::block_on_read(async move { - procedure_service_handler.query_procedure_state(pid).await - })?; - - let status = ProcedureStatus::try_from(status) - .map(|v| v.as_str_name()) - .unwrap_or("Unknown"); - - let state = ProcedureStateJson { - status: status.to_string(), - error: if error.is_empty() { None } else { Some(error) }, - }; - - Ok(Some(serde_json::to_string(&state).unwrap_or_default())) - } else { - Ok(None) - } - }) - .collect::>>()?; - - let results: VectorRef = Arc::new(StringVector::from(states)); - - if is_const { - Ok(Arc::new(ConstantVector::new(results, expect_len)) as _) - } else { - Ok(results) - } - }) - .join() - .map_err(|e| { - error!(e; "Join thread error"); - ThreadJoin { - location: Location::default(), - } - })? - } - _ => UnsupportedInputDataTypeSnafu { - function: NAME, - datatypes: columns.iter().map(|c| c.data_type()).collect::>(), - } - .fail(), + let ValueRef::String(pid) = params[0] else { + return UnsupportedInputDataTypeSnafu { + function: "procedure_state", + datatypes: params.iter().map(|v| v.data_type()).collect::>(), } - } + .fail(); + }; + + let ProcedureStateResponse { status, error, .. } = + procedure_service_handler.query_procedure_state(pid).await?; + let status = ProcedureStatus::try_from(status) + .map(|v| v.as_str_name()) + .unwrap_or("Unknown"); + + let state = ProcedureStateJson { + status: status.to_string(), + error: if error.is_empty() { None } else { Some(error) }, + }; + let json = serde_json::to_string(&state).unwrap_or_default(); + + Ok(Value::from(json)) +} + +fn signature() -> Signature { + Signature::uniform( + 1, + vec![ConcreteDataType::string_datatype()], + Volatility::Immutable, + ) } #[cfg(test)] diff --git a/src/common/function/src/table/migrate_region.rs b/src/common/function/src/table/migrate_region.rs index 6447c6de6b3d..936b7bff1915 100644 --- a/src/common/function/src/table/migrate_region.rs +++ b/src/common/function/src/table/migrate_region.rs @@ -15,6 +15,7 @@ use std::fmt::{self}; use std::time::Duration; +use common_macro::admin_fn; use common_meta::rpc::procedure::MigrateRegionRequest; use common_query::error::Error::ThreadJoin; use common_query::error::{ @@ -22,12 +23,29 @@ use common_query::error::{ }; use common_query::prelude::{Signature, TypeSignature, Volatility}; use common_telemetry::logging::error; -use datatypes::prelude::{ConcreteDataType, MutableVector, ScalarVectorBuilder}; -use datatypes::value::Value; -use datatypes::vectors::{StringVectorBuilder, VectorRef}; +use datatypes::data_type::DataType; +use datatypes::prelude::ConcreteDataType; +use datatypes::types::cast::cast; +use datatypes::value::{Value, ValueRef}; +use datatypes::vectors::VectorRef; use snafu::{Location, OptionExt, ResultExt}; +use crate::ensure_greptime; use crate::function::{Function, FunctionContext}; +use crate::handlers::ProcedureServiceHandlerRef; + +const DEFAULT_REPLAY_TIMEOUT_SECS: u64 = 10; + +fn cast_u64(value: &ValueRef) -> Result> { + cast((*value).into(), &ConcreteDataType::uint64_datatype()) + .context(InvalidInputTypeSnafu { + err_msg: format!( + "Failed to cast input into uint64, actual type: {:#?}", + value.data_type(), + ), + }) + .map(|v| v.as_u64()) +} /// A function to migrate a region from source peer to target peer. /// Returns the submitted procedure id if success. Only available in cluster mode. @@ -39,137 +57,81 @@ use crate::function::{Function, FunctionContext}; /// - `region_id`: the region id /// - `from_peer`: the source peer id /// - `to_peer`: the target peer id -#[derive(Clone, Debug, Default)] -pub struct MigrateRegionFunction; - -const NAME: &str = "migrate_region"; -const DEFAULT_REPLAY_TIMEOUT_SECS: u64 = 10; - -fn cast_u64_vector(vector: &VectorRef) -> Result { - vector - .cast(&ConcreteDataType::uint64_datatype()) - .context(InvalidInputTypeSnafu { - err_msg: format!( - "Failed to cast input into uint64, actual type: {:#?}", - vector.data_type(), - ), - }) -} - -impl Function for MigrateRegionFunction { - fn name(&self) -> &str { - NAME - } - - fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result { - Ok(ConcreteDataType::string_datatype()) - } - - fn signature(&self) -> Signature { - Signature::one_of( - vec![ - // migrate_region(region_id, from_peer, to_peer) - TypeSignature::Uniform(3, ConcreteDataType::numerics()), - // migrate_region(region_id, from_peer, to_peer, timeout(secs)) - TypeSignature::Uniform(4, ConcreteDataType::numerics()), - ], - Volatility::Immutable, - ) - } - - fn eval(&self, func_ctx: FunctionContext, columns: &[VectorRef]) -> Result { - crate::ensure_greptime!(func_ctx); - - let (region_ids, from_peers, to_peers, replay_timeouts) = match columns.len() { - 3 => { - let region_ids = cast_u64_vector(&columns[0])?; - let from_peers = cast_u64_vector(&columns[1])?; - let to_peers = cast_u64_vector(&columns[2])?; - - (region_ids, from_peers, to_peers, None) +#[admin_fn( + name = "MigrateRegionFunction", + display_name = "migrate_region", + sig_fn = "signature", + ret = "string" +)] +pub(crate) async fn migrate_region( + procedure_service_handler: &ProcedureServiceHandlerRef, + params: &[ValueRef<'_>], +) -> Result { + let (region_id, from_peer, to_peer, replay_timeout) = match params.len() { + 3 => { + let region_id = cast_u64(¶ms[0])?; + let from_peer = cast_u64(¶ms[1])?; + let to_peer = cast_u64(¶ms[2])?; + + ( + region_id, + from_peer, + to_peer, + Some(DEFAULT_REPLAY_TIMEOUT_SECS), + ) + } + + 4 => { + let region_id = cast_u64(¶ms[0])?; + let from_peer = cast_u64(¶ms[1])?; + let to_peer = cast_u64(¶ms[2])?; + let replay_timeout = cast_u64(¶ms[3])?; + + (region_id, from_peer, to_peer, replay_timeout) + } + + size => { + return InvalidFuncArgsSnafu { + err_msg: format!( + "The length of the args is not correct, expect exactly 3 or 4, have: {}", + size + ), } - - 4 => { - let region_ids = cast_u64_vector(&columns[0])?; - let from_peers = cast_u64_vector(&columns[1])?; - let to_peers = cast_u64_vector(&columns[2])?; - let replay_timeouts = cast_u64_vector(&columns[3])?; - - (region_ids, from_peers, to_peers, Some(replay_timeouts)) - } - - size => { - return InvalidFuncArgsSnafu { - err_msg: format!( - "The length of the args is not correct, expect exactly 3 or 4, have: {}", - size - ), - } - .fail(); - } - }; - - // TODO(dennis): datafusion UDF doesn't support async function currently - std::thread::spawn(move || { - let len = region_ids.len(); - let mut results = StringVectorBuilder::with_capacity(len); - let procedure_service_handler = func_ctx - .state - .procedure_service_handler - .as_ref() - .context(MissingProcedureServiceHandlerSnafu)?; - - for index in 0..len { - let region_id = region_ids.get(index); - let from_peer = from_peers.get(index); - let to_peer = to_peers.get(index); - let replay_timeout = match &replay_timeouts { - Some(replay_timeouts) => replay_timeouts.get(index), - None => Value::UInt64(DEFAULT_REPLAY_TIMEOUT_SECS), - }; - - match (region_id, from_peer, to_peer, replay_timeout) { - ( - Value::UInt64(region_id), - Value::UInt64(from_peer), - Value::UInt64(to_peer), - Value::UInt64(replay_timeout), - ) => { - let pid = common_runtime::block_on_read(async move { - procedure_service_handler - .migrate_region(MigrateRegionRequest { - region_id, - from_peer, - to_peer, - replay_timeout: Duration::from_secs(replay_timeout), - }) - .await - })?; - - results.push(pid.as_deref()) - } - _ => { - results.push(None); - } - } + .fail(); + } + }; + + match (region_id, from_peer, to_peer, replay_timeout) { + (Some(region_id), Some(from_peer), Some(to_peer), Some(replay_timeout)) => { + let pid = procedure_service_handler + .migrate_region(MigrateRegionRequest { + region_id, + from_peer, + to_peer, + replay_timeout: Duration::from_secs(replay_timeout), + }) + .await?; + + match pid { + Some(pid) => Ok(Value::from(pid)), + None => Ok(Value::Null), } + } - Ok(results.to_vector()) - }) - .join() - .map_err(|e| { - error!(e; "Join thread error"); - ThreadJoin { - location: Location::default(), - } - })? + _ => Ok(Value::Null), } } -impl fmt::Display for MigrateRegionFunction { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "MIGRATE_REGION") - } +fn signature() -> Signature { + Signature::one_of( + vec![ + // migrate_region(region_id, from_peer, to_peer) + TypeSignature::Uniform(3, ConcreteDataType::numerics()), + // migrate_region(region_id, from_peer, to_peer, timeout(secs)) + TypeSignature::Uniform(4, ConcreteDataType::numerics()), + ], + Volatility::Immutable, + ) } #[cfg(test)] diff --git a/src/common/macro/src/admin_fn.rs b/src/common/macro/src/admin_fn.rs new file mode 100644 index 000000000000..6030d7a13d97 --- /dev/null +++ b/src/common/macro/src/admin_fn.rs @@ -0,0 +1,234 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use proc_macro::TokenStream; +use quote::quote; +use syn::spanned::Spanned; +use syn::{ + parse_macro_input, Attribute, AttributeArgs, Ident, ItemFn, Signature, Type, TypePath, + TypeReference, Visibility, +}; + +use crate::utils::{extract_arg_map, extract_input_types, get_ident}; + +/// Internal util macro to early return on error. +macro_rules! ok { + ($item:expr) => { + match $item { + Ok(item) => item, + Err(e) => return e.into_compile_error().into(), + } + }; +} + +/// Internal util macro to to create an error. +macro_rules! error { + ($span:expr, $msg: expr) => { + Err(syn::Error::new($span, $msg)) + }; +} + +pub(crate) fn process_admin_fn(args: TokenStream, input: TokenStream) -> TokenStream { + let mut result = TokenStream::new(); + + // extract arg map + let arg_pairs = parse_macro_input!(args as AttributeArgs); + let arg_span = arg_pairs[0].span(); + let arg_map = ok!(extract_arg_map(arg_pairs)); + + // decompose the fn block + let compute_fn = parse_macro_input!(input as ItemFn); + let ItemFn { + attrs, + vis, + sig, + block, + } = compute_fn; + + // extract fn arg list + let Signature { + inputs, + ident: fn_name, + .. + } = &sig; + + let arg_types = ok!(extract_input_types(inputs)); + if arg_types.is_empty() { + ok!(error!( + sig.span(), + "Expect at least one argument for admin fn" + )); + } + let handler_type = ok!(extract_handler_type(&arg_types)); + + // build the struct and its impl block + // only do this when `display_name` is specified + if let Ok(display_name) = get_ident(&arg_map, "display_name", arg_span) { + let struct_code = build_struct( + attrs, + vis, + fn_name, + ok!(get_ident(&arg_map, "name", arg_span)), + ok!(get_ident(&arg_map, "sig_fn", arg_span)), + ok!(get_ident(&arg_map, "ret", arg_span)), + handler_type, + display_name, + ); + result.extend(struct_code); + } + + // preserve this fn + let input_fn_code: TokenStream = quote! { + #sig { #block } + } + .into(); + + result.extend(input_fn_code); + result +} + +/// Retrieve the handler type, `ProcedureServiceHandlerRef` or `TableMutationHandlerRef`. +fn extract_handler_type(arg_types: &[Type]) -> Result<&Ident, syn::Error> { + match &arg_types[0] { + Type::Reference(TypeReference { elem, .. }) => match &**elem { + Type::Path(TypePath { path, .. }) => Ok(&path + .segments + .first() + .expect("Expected a reference of handler") + .ident), + other => { + error!(other.span(), "Expected a reference of handler") + } + }, + other => { + error!(other.span(), "Expected a reference of handler") + } + } +} + +/// Build the function struct +#[allow(clippy::too_many_arguments)] +fn build_struct( + attrs: Vec, + vis: Visibility, + fn_name: &Ident, + name: Ident, + sig_fn: Ident, + ret: Ident, + handler_type: &Ident, + display_name_ident: Ident, +) -> TokenStream { + let display_name = display_name_ident.to_string(); + let ret = Ident::new(&format!("{ret}_datatype"), ret.span()); + let uppcase_display_name = display_name.to_uppercase(); + // Get the handler name in function state by the argument ident + let (handler, snafu_type) = match handler_type.to_string().as_str() { + "ProcedureServiceHandlerRef" => ( + Ident::new("procedure_service_handler", handler_type.span()), + Ident::new("MissingProcedureServiceHandlerSnafu", handler_type.span()), + ), + + "TableMutationHandlerRef" => ( + Ident::new("table_mutation_handler", handler_type.span()), + Ident::new("MissingTableMutationHandlerSnafu", handler_type.span()), + ), + handler => ok!(error!( + handler_type.span(), + format!("Unknown handler type: {handler}") + )), + }; + + quote! { + #(#attrs)* + #[derive(Debug)] + #vis struct #name; + + impl fmt::Display for #name { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, #uppcase_display_name) + } + } + + + impl Function for #name { + fn name(&self) -> &'static str { + #display_name + } + + fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result { + Ok(ConcreteDataType::#ret()) + } + + fn signature(&self) -> Signature { + #sig_fn() + } + + fn eval(&self, func_ctx: FunctionContext, columns: &[VectorRef]) -> Result { + // Ensure under the `greptime` catalog for security + ensure_greptime!(func_ctx); + + let columns_num = columns.len(); + let rows_num = if columns.is_empty() { + 1 + } else { + columns[0].len() + }; + let columns = Vec::from(columns); + + std::thread::spawn(move || { + let handler = func_ctx + .state + .#handler + .as_ref() + .context(#snafu_type)?; + + let mut builder = ConcreteDataType::#ret() + .create_mutable_vector(rows_num); + + if columns_num == 0 { + let result = common_runtime::block_on_read(async move { + #fn_name(handler, &[]).await + })?; + + builder.push_value_ref(result.as_value_ref()); + } else { + for i in 0..rows_num { + let args: Vec<_> = columns.iter() + .map(|vector| vector.get_ref(i)) + .collect(); + + let result = common_runtime::block_on_read(async move { + #fn_name(handler, &args).await + })?; + + builder.push_value_ref(result.as_value_ref()); + } + } + + Ok(builder.to_vector()) + }) + .join() + .map_err(|e| { + error!(e; "Join thread error"); + ThreadJoin { + location: Location::default(), + } + })? + + } + + } + } + .into() +} diff --git a/src/common/macro/src/lib.rs b/src/common/macro/src/lib.rs index f33f308a86a9..7736b24bbb0d 100644 --- a/src/common/macro/src/lib.rs +++ b/src/common/macro/src/lib.rs @@ -12,17 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod admin_fn; mod aggr_func; mod print_caller; mod range_fn; mod stack_trace_debug; - +mod utils; use aggr_func::{impl_aggr_func_type_store, impl_as_aggr_func_creator}; use print_caller::process_print_caller; use proc_macro::TokenStream; use range_fn::process_range_fn; use syn::{parse_macro_input, DeriveInput}; +use crate::admin_fn::process_admin_fn; + /// Make struct implemented trait [AggrFuncTypeStore], which is necessary when writing UDAF. /// This derive macro is expect to be used along with attribute macro [macro@as_aggr_func_creator]. #[proc_macro_derive(AggrFuncTypeStore)] @@ -68,6 +71,11 @@ pub fn range_fn(args: TokenStream, input: TokenStream) -> TokenStream { process_range_fn(args, input) } +#[proc_macro_attribute] +pub fn admin_fn(args: TokenStream, input: TokenStream) -> TokenStream { + process_admin_fn(args, input) +} + /// Attribute macro to print the caller to the annotated function. /// The caller is printed as its filename and the call site line number. /// diff --git a/src/common/macro/src/range_fn.rs b/src/common/macro/src/range_fn.rs index da0c997eff0a..622e21ef6c73 100644 --- a/src/common/macro/src/range_fn.rs +++ b/src/common/macro/src/range_fn.rs @@ -12,20 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; - use proc_macro::TokenStream; -use proc_macro2::Span; use quote::quote; -use syn::punctuated::Punctuated; use syn::spanned::Spanned; -use syn::token::Comma; use syn::{ - parse_macro_input, Attribute, AttributeArgs, FnArg, Ident, ItemFn, Meta, MetaNameValue, - NestedMeta, Signature, Type, TypeReference, Visibility, + parse_macro_input, Attribute, AttributeArgs, Ident, ItemFn, Signature, Type, TypeReference, + Visibility, }; -/// Internal util macro to early return on error. +use crate::utils::{extract_arg_map, extract_input_types, get_ident}; + macro_rules! ok { ($item:expr) => { match $item { @@ -89,48 +85,6 @@ pub(crate) fn process_range_fn(args: TokenStream, input: TokenStream) -> TokenSt result } -/// Extract a String <-> Ident map from the attribute args. -fn extract_arg_map(args: Vec) -> Result, syn::Error> { - args.into_iter() - .map(|meta| { - if let NestedMeta::Meta(Meta::NameValue(MetaNameValue { path, lit, .. })) = meta { - let name = path.get_ident().unwrap().to_string(); - let ident = match lit { - syn::Lit::Str(lit_str) => lit_str.parse::(), - _ => Err(syn::Error::new( - lit.span(), - "Unexpected attribute format. Expected `name = \"value\"`", - )), - }?; - Ok((name, ident)) - } else { - Err(syn::Error::new( - meta.span(), - "Unexpected attribute format. Expected `name = \"value\"`", - )) - } - }) - .collect::, syn::Error>>() -} - -/// Helper function to get an Ident from the previous arg map. -fn get_ident(map: &HashMap, key: &str, span: Span) -> Result { - map.get(key) - .cloned() - .ok_or_else(|| syn::Error::new(span, format!("Expect attribute {key} but not found"))) -} - -/// Extract the argument list from the annotated function. -fn extract_input_types(inputs: &Punctuated) -> Result, syn::Error> { - inputs - .iter() - .map(|arg| match arg { - FnArg::Receiver(receiver) => Err(syn::Error::new(receiver.span(), "expected bool")), - FnArg::Typed(pat_type) => Ok(*pat_type.ty.clone()), - }) - .collect() -} - fn build_struct( attrs: Vec, vis: Visibility, @@ -214,7 +168,7 @@ fn build_calc_fn( #( let #range_array_names = RangeArray::try_new(extract_array(&input[#param_numbers])?.to_data().into())?; )* - // TODO(ruihang): add ensure!() + // TODO(ruihang): add ensure!() let mut result_array = Vec::new(); for index in 0..#first_range_array_name.len(){ diff --git a/src/common/macro/src/utils.rs b/src/common/macro/src/utils.rs new file mode 100644 index 000000000000..a4587092efd9 --- /dev/null +++ b/src/common/macro/src/utils.rs @@ -0,0 +1,69 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use proc_macro2::Span; +use syn::punctuated::Punctuated; +use syn::spanned::Spanned; +use syn::token::Comma; +use syn::{FnArg, Ident, Meta, MetaNameValue, NestedMeta, Type}; + +/// Extract a String <-> Ident map from the attribute args. +pub(crate) fn extract_arg_map(args: Vec) -> Result, syn::Error> { + args.into_iter() + .map(|meta| { + if let NestedMeta::Meta(Meta::NameValue(MetaNameValue { path, lit, .. })) = meta { + let name = path.get_ident().unwrap().to_string(); + let ident = match lit { + syn::Lit::Str(lit_str) => lit_str.parse::(), + _ => Err(syn::Error::new( + lit.span(), + "Unexpected attribute format. Expected `name = \"value\"`", + )), + }?; + Ok((name, ident)) + } else { + Err(syn::Error::new( + meta.span(), + "Unexpected attribute format. Expected `name = \"value\"`", + )) + } + }) + .collect::, syn::Error>>() +} + +/// Helper function to get an Ident from the previous arg map. +pub(crate) fn get_ident( + map: &HashMap, + key: &str, + span: Span, +) -> Result { + map.get(key) + .cloned() + .ok_or_else(|| syn::Error::new(span, format!("Expect attribute {key} but not found"))) +} + +/// Extract the argument list from the annotated function. +pub(crate) fn extract_input_types( + inputs: &Punctuated, +) -> Result, syn::Error> { + inputs + .iter() + .map(|arg| match arg { + FnArg::Receiver(receiver) => Err(syn::Error::new(receiver.span(), "expected bool")), + FnArg::Typed(pat_type) => Ok(*pat_type.ty.clone()), + }) + .collect() +} diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index bf3445a922d9..031f3556552b 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -124,37 +124,45 @@ impl Display for Value { } } -impl Value { - /// Returns data type of the value. - /// - /// # Panics - /// Panics if the data type is not supported. - pub fn data_type(&self) -> ConcreteDataType { - match self { - Value::Null => ConcreteDataType::null_datatype(), - Value::Boolean(_) => ConcreteDataType::boolean_datatype(), - Value::UInt8(_) => ConcreteDataType::uint8_datatype(), - Value::UInt16(_) => ConcreteDataType::uint16_datatype(), - Value::UInt32(_) => ConcreteDataType::uint32_datatype(), - Value::UInt64(_) => ConcreteDataType::uint64_datatype(), - Value::Int8(_) => ConcreteDataType::int8_datatype(), - Value::Int16(_) => ConcreteDataType::int16_datatype(), - Value::Int32(_) => ConcreteDataType::int32_datatype(), - Value::Int64(_) => ConcreteDataType::int64_datatype(), - Value::Float32(_) => ConcreteDataType::float32_datatype(), - Value::Float64(_) => ConcreteDataType::float64_datatype(), - Value::String(_) => ConcreteDataType::string_datatype(), - Value::Binary(_) => ConcreteDataType::binary_datatype(), - Value::Date(_) => ConcreteDataType::date_datatype(), - Value::DateTime(_) => ConcreteDataType::datetime_datatype(), - Value::Time(t) => ConcreteDataType::time_datatype(*t.unit()), - Value::Timestamp(v) => ConcreteDataType::timestamp_datatype(v.unit()), - Value::Interval(v) => ConcreteDataType::interval_datatype(v.unit()), - Value::List(list) => ConcreteDataType::list_datatype(list.datatype().clone()), - Value::Duration(d) => ConcreteDataType::duration_datatype(d.unit()), - Value::Decimal128(d) => ConcreteDataType::decimal128_datatype(d.precision(), d.scale()), +macro_rules! define_data_type_func { + ($struct: ident) => { + /// Returns data type of the value. + /// + /// # Panics + /// Panics if the data type is not supported. + pub fn data_type(&self) -> ConcreteDataType { + match self { + $struct::Null => ConcreteDataType::null_datatype(), + $struct::Boolean(_) => ConcreteDataType::boolean_datatype(), + $struct::UInt8(_) => ConcreteDataType::uint8_datatype(), + $struct::UInt16(_) => ConcreteDataType::uint16_datatype(), + $struct::UInt32(_) => ConcreteDataType::uint32_datatype(), + $struct::UInt64(_) => ConcreteDataType::uint64_datatype(), + $struct::Int8(_) => ConcreteDataType::int8_datatype(), + $struct::Int16(_) => ConcreteDataType::int16_datatype(), + $struct::Int32(_) => ConcreteDataType::int32_datatype(), + $struct::Int64(_) => ConcreteDataType::int64_datatype(), + $struct::Float32(_) => ConcreteDataType::float32_datatype(), + $struct::Float64(_) => ConcreteDataType::float64_datatype(), + $struct::String(_) => ConcreteDataType::string_datatype(), + $struct::Binary(_) => ConcreteDataType::binary_datatype(), + $struct::Date(_) => ConcreteDataType::date_datatype(), + $struct::DateTime(_) => ConcreteDataType::datetime_datatype(), + $struct::Time(t) => ConcreteDataType::time_datatype(*t.unit()), + $struct::Timestamp(v) => ConcreteDataType::timestamp_datatype(v.unit()), + $struct::Interval(v) => ConcreteDataType::interval_datatype(v.unit()), + $struct::List(list) => ConcreteDataType::list_datatype(list.datatype().clone()), + $struct::Duration(d) => ConcreteDataType::duration_datatype(d.unit()), + $struct::Decimal128(d) => { + ConcreteDataType::decimal128_datatype(d.precision(), d.scale()) + } + } } - } + }; +} + +impl Value { + define_data_type_func!(Value); /// Returns true if this is a null value. pub fn is_null(&self) -> bool { @@ -250,6 +258,17 @@ impl Value { } } + /// Cast Value to u64. Return None if value is not a valid uint64 data type. + pub fn as_u64(&self) -> Option { + match self { + Value::UInt8(v) => Some(*v as _), + Value::UInt16(v) => Some(*v as _), + Value::UInt32(v) => Some(*v as _), + Value::UInt64(v) => Some(*v), + _ => None, + } + } + /// Returns the logical type of the value. pub fn logical_type_id(&self) -> LogicalTypeId { match self { @@ -938,6 +957,8 @@ macro_rules! impl_as_for_value_ref { } impl<'a> ValueRef<'a> { + define_data_type_func!(ValueRef); + /// Returns true if this is null. pub fn is_null(&self) -> bool { matches!(self, ValueRef::Null) @@ -1143,6 +1164,14 @@ impl<'a> ListValueRef<'a> { ListValueRef::Ref { val } => Value::List(val.clone()), } } + + /// Returns the inner element's data type. + fn datatype(&self) -> ConcreteDataType { + match self { + ListValueRef::Indexed { vector, .. } => vector.data_type(), + ListValueRef::Ref { val } => val.datatype().clone(), + } + } } impl<'a> PartialEq for ListValueRef<'a> { diff --git a/src/operator/src/table.rs b/src/operator/src/table.rs index 60d96b776360..568fb3468d75 100644 --- a/src/operator/src/table.rs +++ b/src/operator/src/table.rs @@ -21,6 +21,7 @@ use common_query::error::Result as QueryResult; use session::context::QueryContextRef; use snafu::ResultExt; use sqlparser::ast::ObjectName; +use store_api::storage::RegionId; use table::requests::{ CompactTableRequest, DeleteRequest as TableDeleteRequest, FlushTableRequest, InsertRequest as TableInsertRequest, @@ -127,4 +128,28 @@ impl TableMutationHandler for TableMutationOperator { .map_err(BoxedError::new) .context(query_error::TableMutationSnafu) } + + async fn flush_region( + &self, + region_id: RegionId, + ctx: QueryContextRef, + ) -> QueryResult { + self.requester + .handle_region_flush(region_id, ctx) + .await + .map_err(BoxedError::new) + .context(query_error::TableMutationSnafu) + } + + async fn compact_region( + &self, + region_id: RegionId, + ctx: QueryContextRef, + ) -> QueryResult { + self.requester + .handle_region_compaction(region_id, ctx) + .await + .map_err(BoxedError::new) + .context(query_error::TableMutationSnafu) + } }