From d37b87bf641655d775f72997a268d6e881541e6d Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 23 Nov 2023 14:24:59 +0800 Subject: [PATCH] refactor(error): simplify not implemented error Instantiation Signed-off-by: Bugen Zhao --- src/common/src/error.rs | 78 +++++++++++-- src/common/src/types/to_binary.rs | 19 ++-- src/frontend/src/binder/expr/binary_op.rs | 7 +- src/frontend/src/binder/expr/function.rs | 104 ++++++------------ src/frontend/src/binder/expr/mod.rs | 45 ++------ src/frontend/src/binder/expr/value.rs | 3 +- src/frontend/src/binder/query.rs | 3 +- .../src/binder/relation/table_function.rs | 25 ++--- .../src/binder/relation/table_or_source.rs | 15 ++- src/frontend/src/binder/set_expr.rs | 7 +- src/frontend/src/binder/statement.rs | 9 +- src/frontend/src/binder/update.rs | 7 +- src/frontend/src/binder/values.rs | 9 +- src/frontend/src/expr/type_inference/func.rs | 16 ++- src/frontend/src/expr/window_function.rs | 18 ++- .../src/handler/alter_source_column.rs | 11 +- .../src/handler/alter_table_column.rs | 11 +- src/frontend/src/handler/create_function.rs | 12 +- src/frontend/src/handler/create_table.rs | 25 +---- src/frontend/src/handler/drop_function.rs | 6 +- src/frontend/src/handler/drop_schema.rs | 7 +- src/frontend/src/handler/explain.rs | 11 +- src/frontend/src/handler/extended_handle.rs | 21 +--- src/frontend/src/handler/mod.rs | 33 ++---- src/frontend/src/handler/show.rs | 15 +-- src/frontend/src/handler/transaction.rs | 11 +- .../src/optimizer/plan_node/logical_agg.rs | 38 +++---- .../plan_node/logical_over_window.rs | 55 ++++----- .../src/optimizer/plan_node/logical_share.rs | 8 +- .../src/optimizer/plan_node/logical_source.rs | 8 +- .../optimizer/plan_node/logical_sys_scan.rs | 13 +-- .../src/optimizer/plan_node/logical_topn.rs | 12 +- src/frontend/src/planner/relation.rs | 13 +-- src/frontend/src/planner/select.rs | 17 +-- src/stream/src/executor/error.rs | 10 +- 35 files changed, 263 insertions(+), 439 deletions(-) diff --git a/src/common/src/error.rs b/src/common/src/error.rs index 3e6cef078ad71..4663687ade9e6 100644 --- a/src/common/src/error.rs +++ b/src/common/src/error.rs @@ -69,6 +69,61 @@ impl Display for TrackingIssue { } } +#[derive(Error, Debug)] +#[error("Feature is not yet implemented: {feature}. {tracking_issue}")] +pub struct NotImplemented { + pub feature: String, + pub tracking_issue: TrackingIssue, +} + +impl From for NotImplemented +where + S: Into, +{ + fn from(feature: S) -> Self { + Self::new(feature) + } +} + +impl NotImplemented { + pub fn new(feature: impl Into) -> Self { + Self::with_tracking_issue(feature, TrackingIssue::none()) + } + + pub fn with_tracking_issue( + feature: impl Into, + tracking_issue: impl Into, + ) -> Self { + Self { + feature: feature.into(), + tracking_issue: tracking_issue.into(), + } + } +} + +#[macro_export] +macro_rules! not_implemented { + (issue = $issue:expr, $($arg:tt)*) => { + $crate::error::NotImplemented::with_tracking_issue( + ::std::format!($($arg)*), + $issue, + ) + }; + ($($arg:tt)*) => { + not_implemented!(issue = None, $($arg)*) + }; +} + +#[macro_export(local_inner_macros)] +macro_rules! bail_not_implemented { + (issue = $issue:expr, $($arg:tt)*) => { + return Err(not_implemented!(issue = $issue, $($arg)*).into()) + }; + ($($arg:tt)*) => { + bail_not_implemented!(issue = None, $($arg)*) + }; +} + #[derive(Error, Debug)] pub enum ErrorCode { #[error("internal error: {0}")] @@ -86,8 +141,8 @@ pub enum ErrorCode { #[backtrace] BoxedError, ), - #[error("Feature is not yet implemented: {0}\n{1}")] - NotImplemented(String, TrackingIssue), + #[error(transparent)] + NotImplemented(#[from] NotImplemented), // Tips: Use this only if it's intended to reject the query #[error("Not supported: {0}\nHINT: {1}")] NotSupported(String, String), @@ -195,6 +250,13 @@ pub enum ErrorCode { UnrecognizedConfigurationParameter(String), } +// TODO(error-handling): automatically generate this impl. +impl From for RwError { + fn from(value: NotImplemented) -> Self { + ErrorCode::from(value).into() + } +} + pub fn internal_error(msg: impl Into) -> RwError { ErrorCode::InternalError(msg.into()).into() } @@ -468,14 +530,8 @@ macro_rules! ensure_eq { #[macro_export] macro_rules! bail { - ($msg:literal $(,)?) => { - return Err($crate::error::anyhow_error!($msg).into()) - }; - ($err:expr $(,)?) => { - return Err($crate::error::anyhow_error!($err).into()) - }; - ($fmt:expr, $($arg:tt)*) => { - return Err($crate::error::anyhow_error!($fmt, $($arg)*).into()) + ($($arg:tt)*) => { + return Err($crate::error::anyhow_error!($($arg)*).into()) }; } @@ -605,7 +661,7 @@ mod tests { check_grpc_error(ErrorCode::TaskNotFound, Code::Internal); check_grpc_error(ErrorCode::InternalError(String::new()), Code::Internal); check_grpc_error( - ErrorCode::NotImplemented(String::new(), None.into()), + ErrorCode::NotImplemented(NotImplemented::new("test")), Code::Internal, ); } diff --git a/src/common/src/types/to_binary.rs b/src/common/src/types/to_binary.rs index 46b5a61589493..91f7257db1cc1 100644 --- a/src/common/src/types/to_binary.rs +++ b/src/common/src/types/to_binary.rs @@ -16,7 +16,7 @@ use bytes::{Bytes, BytesMut}; use postgres_types::{ToSql, Type}; use super::{DataType, DatumRef, ScalarRefImpl, F32, F64}; -use crate::error::TrackingIssue; +use crate::error::NotImplemented; /// Error type for [`ToBinary`] trait. #[derive(thiserror::Error, Debug)] @@ -24,8 +24,8 @@ pub enum ToBinaryError { #[error(transparent)] ToSql(Box), - #[error("Feature is not yet implemented: {0}\n{1}")] - NotImplemented(String, TrackingIssue), + #[error(transparent)] + NotImplemented(#[from] NotImplemented), } pub type Result = std::result::Result; @@ -87,15 +87,10 @@ impl ToBinary for ScalarRefImpl<'_> { ScalarRefImpl::Time(v) => v.to_binary_with_type(ty), ScalarRefImpl::Bytea(v) => v.to_binary_with_type(ty), ScalarRefImpl::Jsonb(v) => v.to_binary_with_type(ty), - ScalarRefImpl::Struct(_) | ScalarRefImpl::List(_) => { - Err(ToBinaryError::NotImplemented( - format!( - "the pgwire extended-mode encoding for {} is unsupported", - ty - ), - Some(7949).into(), - )) - } + ScalarRefImpl::Struct(_) | ScalarRefImpl::List(_) => bail_not_implemented!( + issue = 7949, + "the pgwire extended-mode encoding for {ty} is unsupported" + ), } } } diff --git a/src/frontend/src/binder/expr/binary_op.rs b/src/frontend/src/binder/expr/binary_op.rs index 856906f13c04c..275d4f152636f 100644 --- a/src/frontend/src/binder/expr/binary_op.rs +++ b/src/frontend/src/binder/expr/binary_op.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_common::bail_not_implemented; use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::types::DataType; use risingwave_sqlparser::ast::{BinaryOperator, Expr}; @@ -186,11 +187,7 @@ impl Binder { func_types.push(ExprType::Not); ExprType::RegexpEq } - _ => { - return Err( - ErrorCode::NotImplemented(format!("binary op: {:?}", op), 112.into()).into(), - ) - } + _ => bail_not_implemented!(issue = 112, "binary op: {:?}", op), }; func_types.push(final_type); Ok(func_types) diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index 96435d4e9f656..d4a6644afaa43 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -24,7 +24,7 @@ use risingwave_common::catalog::{INFORMATION_SCHEMA_SCHEMA_NAME, PG_CATALOG_SCHE use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::session_config::USER_NAME_WILD_CARD; use risingwave_common::types::{DataType, ScalarImpl, Timestamptz}; -use risingwave_common::{GIT_SHA, RW_VERSION}; +use risingwave_common::{bail_not_implemented, not_implemented, GIT_SHA, RW_VERSION}; use risingwave_expr::aggregate::{agg_kinds, AggKind}; use risingwave_expr::window_function::{ Frame, FrameBound, FrameBounds, FrameExclusion, WindowFuncKind, @@ -68,28 +68,22 @@ impl Binder { // FIXME: handle schema correctly, so that the functions are hidden if the schema is not in the search path. let function_name = name.real_value(); if function_name != "_pg_expandarray" { - return Err(ErrorCode::NotImplemented( - format!("Unsupported function name under schema: {}", schema_name), - 12422.into(), - ) - .into()); + bail_not_implemented!( + issue = 12422, + "Unsupported function name under schema: {}", + schema_name + ); } function_name } else { - return Err(ErrorCode::NotImplemented( - format!("Unsupported function name under schema: {}", schema_name), - 12422.into(), - ) - .into()); + bail_not_implemented!( + issue = 12422, + "Unsupported function name under schema: {}", + schema_name + ); } } - _ => { - return Err(ErrorCode::NotImplemented( - format!("qualified function: {}", f.name), - 112.into(), - ) - .into()); - } + _ => bail_not_implemented!(issue = 112, "qualified function {}", f.name), }; // agg calls @@ -141,11 +135,11 @@ impl Binder { )) .into()); } else if f.over.is_some() { - return Err(ErrorCode::NotImplemented( - format!("Unrecognized window function: {}", function_name), - 8961.into(), - ) - .into()); + bail_not_implemented!( + issue = 8961, + "Unrecognized window function: {}", + function_name + ); } // table function @@ -272,25 +266,13 @@ impl Binder { .and_then(|expr| expr.enforce_bool_clause("FILTER"))?; self.context.clause = clause; if expr.has_subquery() { - return Err(ErrorCode::NotImplemented( - "subquery in filter clause".to_string(), - None.into(), - ) - .into()); + bail_not_implemented!("subquery in filter clause"); } if expr.has_agg_call() { - return Err(ErrorCode::NotImplemented( - "aggregation function in filter clause".to_string(), - None.into(), - ) - .into()); + bail_not_implemented!("aggregation function in filter clause"); } if expr.has_table_function() { - return Err(ErrorCode::NotImplemented( - "table function in filter clause".to_string(), - None.into(), - ) - .into()); + bail_not_implemented!("table function in filter clause"); } Condition::with_expr(expr) } @@ -348,11 +330,7 @@ impl Binder { .flatten_ok() .try_collect()?; if args.iter().any(|arg| arg.as_literal().is_none()) { - return Err(ErrorCode::NotImplemented( - "non-constant direct arguments for ordered-set aggregation is not supported now".to_string(), - None.into() - ) - .into()); + bail_not_implemented!("non-constant direct arguments for ordered-set aggregation is not supported now"); } args }; @@ -470,12 +448,7 @@ impl Binder { // restrict arguments[1..] to be constant because we don't support multiple distinct key // indices for now if args.iter().skip(1).any(|arg| arg.as_literal().is_none()) { - return Err(ErrorCode::NotImplemented( - "non-constant arguments other than the first one for DISTINCT aggregation is not supported now" - .to_string(), - None.into(), - ) - .into()); + bail_not_implemented!("non-constant arguments other than the first one for DISTINCT aggregation is not supported now"); } // restrict ORDER BY to align with PG, which says: @@ -520,14 +493,11 @@ impl Binder { match exclusion { WindowFrameExclusion::CurrentRow => FrameExclusion::CurrentRow, WindowFrameExclusion::Group | WindowFrameExclusion::Ties => { - return Err(ErrorCode::NotImplemented( - format!( - "window frame exclusion `{}` is not supported yet", - exclusion - ), - 9124.into(), - ) - .into()); + bail_not_implemented!( + issue = 9124, + "window frame exclusion `{}` is not supported yet", + exclusion + ); } WindowFrameExclusion::NoOthers => FrameExclusion::NoOthers, } @@ -556,14 +526,11 @@ impl Binder { FrameBounds::Rows(start, end) } WindowFrameUnits::Range | WindowFrameUnits::Groups => { - return Err(ErrorCode::NotImplemented( - format!( - "window frame in `{}` mode is not supported yet", - frame.units - ), - 9124.into(), - ) - .into()); + bail_not_implemented!( + issue = 9124, + "window frame in `{}` mode is not supported yet", + frame.units + ); } }; if !bounds.is_valid() { @@ -972,10 +939,7 @@ impl Binder { .map_err(|_| no_match_err)?; let ExprImpl::Literal(literal) = &input else { - return Err(ErrorCode::NotImplemented( - "Only boolean literals are supported in `current_schemas`.".to_string(), None.into() - ) - .into()); + bail_not_implemented!("Only boolean literals are supported in `current_schemas`."); }; let Some(bool) = literal.get_data().as_ref().map(|bool| bool.clone().into_bool()) else { @@ -1277,7 +1241,7 @@ impl Binder { ) }; - ErrorCode::NotImplemented(err_msg, 112.into()).into() + not_implemented!(issue = 112, "{}", err_msg).into() }), } } diff --git a/src/frontend/src/binder/expr/mod.rs b/src/frontend/src/binder/expr/mod.rs index 3297fa5071350..c52c42dbc973d 100644 --- a/src/frontend/src/binder/expr/mod.rs +++ b/src/frontend/src/binder/expr/mod.rs @@ -17,6 +17,7 @@ use risingwave_common::catalog::{ColumnDesc, ColumnId}; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::types::DataType; use risingwave_common::util::iter_util::zip_eq_fast; +use risingwave_common::{bail_not_implemented, not_implemented}; use risingwave_sqlparser::ast::{ Array, BinaryOperator, DataType as AstDataType, Expr, Function, JsonPredicateType, ObjectName, Query, StructField, TrimWhereField, UnaryOperator, @@ -168,11 +169,7 @@ impl Binder { } => self.bind_overlay(*expr, *new_substring, *start, count), Expr::Parameter { index } => self.bind_parameter(index), Expr::Collate { expr, collation } => self.bind_collate(*expr, collation), - _ => Err(ErrorCode::NotImplemented( - format!("unsupported expression {:?}", expr), - 112.into(), - ) - .into()), + _ => bail_not_implemented!(issue = 112, "unsupported expression {:?}", expr), } } @@ -184,12 +181,11 @@ impl Binder { vec![self.bind_string(field.clone())?.into(), arg], ) .map_err(|_| { - ErrorCode::NotImplemented( - format!( - "function extract({} from {:?}) doesn't exist", - field, arg_type - ), - 112.into(), + not_implemented!( + issue = 112, + "function extract({} from {:?}) doesn't exist", + field, + arg_type ) })? .into()) @@ -291,13 +287,7 @@ impl Binder { } UnaryOperator::PGSquareRoot => ExprType::Sqrt, UnaryOperator::PGCubeRoot => ExprType::Cbrt, - _ => { - return Err(ErrorCode::NotImplemented( - format!("unsupported unary expression: {:?}", op), - 112.into(), - ) - .into()) - } + _ => bail_not_implemented!(issue = 112, "unsupported unary expression: {:?}", op), }; let expr = self.bind_expr_inner(expr)?; FunctionCall::new(func_type, vec![expr]).map(|f| f.into()) @@ -539,11 +529,7 @@ impl Binder { pub fn bind_collate(&mut self, expr: Expr, collation: ObjectName) -> Result { if !["C", "POSIX"].contains(&collation.real_value().as_str()) { - return Err(ErrorCode::NotImplemented( - "Collate collation other than `C` or `POSIX` is not implemented".into(), - None.into(), - ) - .into()); + bail_not_implemented!("Collate collation other than `C` or `POSIX` is not implemented"); } let bound_inner = self.bind_expr_inner(expr)?; @@ -592,12 +578,7 @@ pub fn bind_struct_field(column_def: &StructField) -> Result { } pub fn bind_data_type(data_type: &AstDataType) -> Result { - let new_err = || { - ErrorCode::NotImplemented( - format!("unsupported data type: {:}", data_type), - None.into(), - ) - }; + let new_err = || not_implemented!("unsupported data type: {:}", data_type); let data_type = match data_type { AstDataType::Boolean => DataType::Boolean, AstDataType::SmallInt => DataType::Int16, @@ -615,11 +596,7 @@ pub fn bind_data_type(data_type: &AstDataType) -> Result { AstDataType::Interval => DataType::Interval, AstDataType::Array(datatype) => DataType::List(Box::new(bind_data_type(datatype)?)), AstDataType::Char(..) => { - return Err(ErrorCode::NotImplemented( - "CHAR is not supported, please use VARCHAR instead\n".to_string(), - None.into(), - ) - .into()) + bail_not_implemented!("CHAR is not supported, please use VARCHAR instead") } AstDataType::Struct(types) => DataType::new_struct( types diff --git a/src/frontend/src/binder/expr/value.rs b/src/frontend/src/binder/expr/value.rs index 54559266a136f..82c36f9c1973e 100644 --- a/src/frontend/src/binder/expr/value.rs +++ b/src/frontend/src/binder/expr/value.rs @@ -13,6 +13,7 @@ // limitations under the License. use itertools::Itertools; +use risingwave_common::bail_not_implemented; use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::types::{DataType, DateTimeField, Decimal, Interval, ScalarImpl}; use risingwave_sqlparser::ast::{DateTimeField as AstDateTimeField, Expr, Value}; @@ -38,7 +39,7 @@ impl Binder { last_field: None, fractional_seconds_precision: None, } => self.bind_interval(value, leading_field), - _ => Err(ErrorCode::NotImplemented(format!("value: {:?}", value), None.into()).into()), + _ => bail_not_implemented!("value: {:?}", value), } } diff --git a/src/frontend/src/binder/query.rs b/src/frontend/src/binder/query.rs index a3b78343c6041..349894a30bf9d 100644 --- a/src/frontend/src/binder/query.rs +++ b/src/frontend/src/binder/query.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use std::rc::Rc; +use risingwave_common::bail_not_implemented; use risingwave_common::catalog::Schema; use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::types::DataType; @@ -278,7 +279,7 @@ impl Binder { fn bind_with(&mut self, with: With) -> Result<()> { if with.recursive { - Err(ErrorCode::NotImplemented("recursive cte".into(), None.into()).into()) + bail_not_implemented!("recursive cte"); } else { for cte_table in with.cte_tables { let Cte { alias, query, .. } = cte_table; diff --git a/src/frontend/src/binder/relation/table_function.rs b/src/frontend/src/binder/relation/table_function.rs index 032791bfab30c..1d2c8491de657 100644 --- a/src/frontend/src/binder/relation/table_function.rs +++ b/src/frontend/src/binder/relation/table_function.rs @@ -15,6 +15,7 @@ use std::str::FromStr; use itertools::Itertools; +use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{ Field, Schema, PG_CATALOG_SCHEMA_NAME, RW_INTERNAL_TABLE_FUNCTION_NAME, }; @@ -49,14 +50,10 @@ impl Binder { { if func_name.eq_ignore_ascii_case(RW_INTERNAL_TABLE_FUNCTION_NAME) { if with_ordinality { - return Err(ErrorCode::NotImplemented( - format!( - "WITH ORDINALITY for internal/system table function {}", - func_name - ), - None.into(), - ) - .into()); + bail_not_implemented!( + "WITH ORDINALITY for internal/system table function {}", + func_name + ); } return self.bind_internal_table(args, alias); } @@ -66,14 +63,10 @@ impl Binder { ) { if with_ordinality { - return Err(ErrorCode::NotImplemented( - format!( - "WITH ORDINALITY for internal/system table function {}", - func_name - ), - None.into(), - ) - .into()); + bail_not_implemented!( + "WITH ORDINALITY for internal/system table function {}", + func_name + ); } return self.bind_relation_by_name_inner( Some(PG_CATALOG_SCHEMA_NAME), diff --git a/src/frontend/src/binder/relation/table_or_source.rs b/src/frontend/src/binder/relation/table_or_source.rs index cd2d2ef45efab..2302d3730360a 100644 --- a/src/frontend/src/binder/relation/table_or_source.rs +++ b/src/frontend/src/binder/relation/table_or_source.rs @@ -16,6 +16,7 @@ use std::ops::Deref; use std::sync::Arc; use itertools::Itertools; +use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{is_system_schema, Field}; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::session_config::USER_NAME_WILD_CARD; @@ -100,19 +101,17 @@ impl Binder { { self.resolve_view_relation(&view_catalog.clone())? } else { - return Err(ErrorCode::NotImplemented( - format!( - r###"{}.{} is not supported, please use `SHOW` commands for now. + bail_not_implemented!( + issue = 1695, + r###"{}.{} is not supported, please use `SHOW` commands for now. `SHOW TABLES`, `SHOW MATERIALIZED VIEWS`, `DESCRIBE `, `SHOW COLUMNS FROM [table]` "###, - schema_name, table_name - ), - 1695.into(), - ) - .into()); + schema_name, + table_name + ); } } else if let Ok((table_catalog, schema_name)) = self.catalog diff --git a/src/frontend/src/binder/set_expr.rs b/src/frontend/src/binder/set_expr.rs index a91f6774b8c1d..91085a4084ee4 100644 --- a/src/frontend/src/binder/set_expr.rs +++ b/src/frontend/src/binder/set_expr.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_common::bail_not_implemented; use risingwave_common::catalog::Schema; use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::util::iter_util::ZipEqFast; @@ -198,11 +199,7 @@ impl Binder { match op { SetOperator::Union => {} SetOperator::Intersect | SetOperator::Except => { - return Err(ErrorCode::NotImplemented( - format!("{} all", op), - None.into(), - ) - .into()) + bail_not_implemented!("{} all", op); } } } diff --git a/src/frontend/src/binder/statement.rs b/src/frontend/src/binder/statement.rs index 027f78e7705a9..4eef5e7f7dd41 100644 --- a/src/frontend/src/binder/statement.rs +++ b/src/frontend/src/binder/statement.rs @@ -12,8 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_common::bail_not_implemented; use risingwave_common::catalog::Field; -use risingwave_common::error::{ErrorCode, Result}; +use risingwave_common::error::Result; use risingwave_sqlparser::ast::Statement; use super::delete::BoundDelete; @@ -82,11 +83,7 @@ impl Binder { Statement::Query(q) => Ok(BoundStatement::Query(self.bind_query(*q)?.into())), - _ => Err(ErrorCode::NotImplemented( - format!("unsupported statement {:?}", stmt), - None.into(), - ) - .into()), + _ => bail_not_implemented!("unsupported statement {:?}", stmt), } } } diff --git a/src/frontend/src/binder/update.rs b/src/frontend/src/binder/update.rs index aabe2a5bd43ca..e2ea54eb77e77 100644 --- a/src/frontend/src/binder/update.rs +++ b/src/frontend/src/binder/update.rs @@ -17,6 +17,7 @@ use std::collections::{BTreeMap, HashMap}; use fixedbitset::FixedBitSet; use itertools::Itertools; +use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{Schema, TableVersionId}; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::util::iter_util::ZipEqFast; @@ -135,11 +136,7 @@ impl Binder { // (col1, col2) = (subquery) (_ids, AssignmentValue::Expr(Expr::Subquery(_))) => { - return Err(ErrorCode::NotImplemented( - "subquery on the right side of multi-assignment".to_owned(), - None.into(), - ) - .into()) + bail_not_implemented!("subquery on the right side of multi-assignment"); } // (col1, col2) = (expr1, expr2) // TODO: support `DEFAULT` in multiple assignments diff --git a/src/frontend/src/binder/values.rs b/src/frontend/src/binder/values.rs index 5ce2527adebb7..d1286c1c01375 100644 --- a/src/frontend/src/binder/values.rs +++ b/src/frontend/src/binder/values.rs @@ -13,6 +13,7 @@ // limitations under the License. use itertools::Itertools; +use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::types::DataType; @@ -142,14 +143,10 @@ impl Binder { .flatten() .any(|expr| expr.has_subquery()) { - return Err(ErrorCode::NotImplemented("Subquery in VALUES".into(), None.into()).into()); + bail_not_implemented!("Subquery in VALUES"); } if bound_values.is_correlated(1) { - return Err(ErrorCode::NotImplemented( - "CorrelatedInputRef in VALUES".into(), - None.into(), - ) - .into()); + bail_not_implemented!("CorrelatedInputRef in VALUES"); } Ok(bound_values) } diff --git a/src/frontend/src/expr/type_inference/func.rs b/src/frontend/src/expr/type_inference/func.rs index d760ee039499a..b58935e810176 100644 --- a/src/frontend/src/expr/type_inference/func.rs +++ b/src/frontend/src/expr/type_inference/func.rs @@ -14,6 +14,7 @@ use itertools::Itertools as _; use num_integer::Integer as _; +use risingwave_common::bail_not_implemented; use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::types::{DataType, StructType}; use risingwave_common::util::iter_util::ZipEqFast; @@ -629,15 +630,12 @@ fn infer_type_name<'a>( let mut candidates = top_matches(&candidates, inputs); if candidates.is_empty() { - return Err(ErrorCode::NotImplemented( - format!( - "{:?}{:?}", - func_type, - inputs.iter().map(TypeDebug).collect_vec() - ), - 112.into(), - ) - .into()); + bail_not_implemented!( + issue = 112, + "{:?}{:?}", + func_type, + inputs.iter().map(TypeDebug).collect_vec() + ); } // After this line `candidates` will never be empty, as the narrow rules will retain original diff --git a/src/frontend/src/expr/window_function.rs b/src/frontend/src/expr/window_function.rs index 371a00dc6b62a..d82cf4fb788d0 100644 --- a/src/frontend/src/expr/window_function.rs +++ b/src/frontend/src/expr/window_function.rs @@ -13,6 +13,7 @@ // limitations under the License. use itertools::Itertools; +use risingwave_common::bail_not_implemented; use risingwave_common::error::{ErrorCode, RwError}; use risingwave_common::types::DataType; use risingwave_expr::window_function::{Frame, WindowFuncKind}; @@ -72,21 +73,16 @@ impl WindowFunction { .into()); } if !offset.is_const() { - return Err(ErrorCode::NotImplemented( - format!("non-const `offset` of `{kind}` function is not supported yet"), - None.into(), - ) - .into()); + bail_not_implemented!( + "non-const `offset` of `{kind}` function is not supported yet" + ); } Ok(value.return_type()) } (Lag | Lead, [_value, _offset, _default]) => { - Err(RwError::from(ErrorCode::NotImplemented( - format!( - "`{kind}` window function with `default` argument is not supported yet" - ), - None.into(), - ))) + bail_not_implemented!( + "`{kind}` window function with `default` argument is not supported yet" + ); } (Aggregate(agg_kind), args) => { diff --git a/src/frontend/src/handler/alter_source_column.rs b/src/frontend/src/handler/alter_source_column.rs index 385a1010b50c9..c6dfa2a6f97cc 100644 --- a/src/frontend/src/handler/alter_source_column.rs +++ b/src/frontend/src/handler/alter_source_column.rs @@ -14,6 +14,7 @@ use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; +use risingwave_common::bail_not_implemented; use risingwave_common::catalog::ColumnId; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_connector::source::{extract_source_struct, SourceEncode, SourceStruct}; @@ -62,16 +63,10 @@ pub async fn handle_alter_source_column( let SourceStruct { encode, .. } = extract_source_struct(&catalog.info)?; match encode { SourceEncode::Avro | SourceEncode::Protobuf => { - return Err(RwError::from(ErrorCode::NotImplemented( - "Alter source with schema registry".into(), - None.into(), - ))); + bail_not_implemented!("Alter source with schema registry") } SourceEncode::Json if catalog.info.use_schema_registry => { - return Err(RwError::from(ErrorCode::NotImplemented( - "Alter source with schema registry".into(), - None.into(), - ))); + bail_not_implemented!("Alter source with schema registry") } SourceEncode::Invalid | SourceEncode::Native => { return Err(RwError::from(ErrorCode::NotSupported( diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index fa64794710025..000add71a5a55 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use anyhow::Context; use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; +use risingwave_common::bail_not_implemented; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; @@ -97,10 +98,7 @@ pub async fn handle_alter_table_column( if let Some(source_schema) = &source_schema { if schema_has_schema_registry(source_schema) { - return Err(RwError::from(ErrorCode::NotImplemented( - "Alter table with source having schema registry".into(), - None.into(), - ))); + bail_not_implemented!("Alter table with source having schema registry"); } } @@ -140,10 +138,7 @@ pub async fn handle_alter_table_column( cascade, } => { if cascade { - Err(ErrorCode::NotImplemented( - "drop column cascade".to_owned(), - 6903.into(), - ))? + bail_not_implemented!(issue = 6903, "drop column cascade"); } // Locate the column by name and remove it. diff --git a/src/frontend/src/handler/create_function.rs b/src/frontend/src/handler/create_function.rs index 9d9db08204e49..67f617ed5546d 100644 --- a/src/frontend/src/handler/create_function.rs +++ b/src/frontend/src/handler/create_function.rs @@ -39,18 +39,10 @@ pub async fn handle_create_function( params: CreateFunctionBody, ) -> Result { if or_replace { - return Err(ErrorCode::NotImplemented( - "CREATE OR REPLACE FUNCTION".to_string(), - None.into(), - ) - .into()); + bail_not_implemented!("CREATE OR REPLACE FUNCTION"); } if temporary { - return Err(ErrorCode::NotImplemented( - "CREATE TEMPORARY FUNCTION".to_string(), - None.into(), - ) - .into()); + bail_not_implemented!("CREATE TEMPORARY FUNCTION"); } let language = match params.language { Some(lang) => { diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index fe6f8a865c19b..80dd00842f402 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -20,6 +20,7 @@ use either::Either; use fixedbitset::FixedBitSet; use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; +use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{ CdcTableDesc, ColumnCatalog, ColumnDesc, TableId, TableVersionId, DEFAULT_SCHEMA_NAME, INITIAL_SOURCE_VERSION_ID, INITIAL_TABLE_VERSION_ID, USER_COLUMN_ID_OFFSET, @@ -137,13 +138,7 @@ fn ensure_column_options_supported(c: &ColumnDef) -> Result<()> { ColumnOption::GeneratedColumns(_) => {} ColumnOption::DefaultColumns(_) => {} ColumnOption::Unique { is_primary: true } => {} - _ => { - return Err(ErrorCode::NotImplemented( - format!("column constraints \"{}\"", option_def), - None.into(), - ) - .into()) - } + _ => bail_not_implemented!(issue = None, "column constraints \"{}\"", option_def), } } Ok(()) @@ -177,11 +172,9 @@ pub fn bind_sql_columns(column_defs: &[ColumnDef]) -> Result> // // But we don't support real collation, we simply ignore it here. if !["C", "POSIX"].contains(&collation.real_value().as_str()) { - return Err(ErrorCode::NotImplemented( - "Collate collation other than `C` or `POSIX` is not implemented".into(), - None.into(), - ) - .into()); + bail_not_implemented!( + "Collate collation other than `C` or `POSIX` is not implemented" + ); } match data_type { @@ -359,13 +352,7 @@ pub fn ensure_table_constraints_supported(table_constraints: &[TableConstraint]) columns: _, is_primary: true, } => {} - _ => { - return Err(ErrorCode::NotImplemented( - format!("table constraint \"{}\"", constraint), - None.into(), - ) - .into()) - } + _ => bail_not_implemented!("table constraint \"{}\"", constraint), } } Ok(()) diff --git a/src/frontend/src/handler/drop_function.rs b/src/frontend/src/handler/drop_function.rs index c3bda771ec7cd..c4c3578258a86 100644 --- a/src/frontend/src/handler/drop_function.rs +++ b/src/frontend/src/handler/drop_function.rs @@ -27,11 +27,7 @@ pub async fn handle_drop_function( _option: Option, ) -> Result { if func_desc.len() != 1 { - return Err(ErrorCode::NotImplemented( - "only support dropping 1 function".to_string(), - None.into(), - ) - .into()); + bail_not_implemented!("only support dropping 1 function"); } let func_desc = func_desc.remove(0); diff --git a/src/frontend/src/handler/drop_schema.rs b/src/frontend/src/handler/drop_schema.rs index c0c71b97d2de8..90b1aa4ba3a53 100644 --- a/src/frontend/src/handler/drop_schema.rs +++ b/src/frontend/src/handler/drop_schema.rs @@ -13,6 +13,7 @@ // limitations under the License. use pgwire::pg_response::{PgResponse, StatementType}; +use risingwave_common::bail_not_implemented; use risingwave_common::catalog::is_system_schema; use risingwave_common::error::{ErrorCode, Result}; use risingwave_sqlparser::ast::{DropMode, ObjectName}; @@ -81,11 +82,7 @@ pub async fn handle_drop_schema( } } Some(DropMode::Cascade) => { - return Err(ErrorCode::NotImplemented( - "drop schema with cascade mode".to_string(), - 6773.into(), - ) - .into()) + bail_not_implemented!(issue = 6773, "drop schema with cascade mode"); } }; diff --git a/src/frontend/src/handler/explain.rs b/src/frontend/src/handler/explain.rs index e1774d8d5e820..8e6774f5ab571 100644 --- a/src/frontend/src/handler/explain.rs +++ b/src/frontend/src/handler/explain.rs @@ -16,6 +16,7 @@ use itertools::Itertools; use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_response::{PgResponse, StatementType}; use pgwire::types::Row; +use risingwave_common::bail_not_implemented; use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::types::DataType; use risingwave_sqlparser::ast::{ExplainOptions, ExplainType, Statement}; @@ -138,13 +139,7 @@ async fn do_handle_explain( gen_batch_plan_by_statement(&session, context.clone(), stmt).map(|x| x.plan) } - _ => { - return Err(ErrorCode::NotImplemented( - format!("unsupported statement {:?}", stmt), - None.into(), - ) - .into()) - } + _ => bail_not_implemented!("unsupported statement {:?}", stmt), }; (plan, context) @@ -221,7 +216,7 @@ pub async fn handle_explain( analyze: bool, ) -> Result { if analyze { - return Err(ErrorCode::NotImplemented("explain analyze".to_string(), 4856.into()).into()); + bail_not_implemented!(issue = 4856, "explain analyze"); } let context = OptimizerContext::new(handler_args.clone(), options.clone()); diff --git a/src/frontend/src/handler/extended_handle.rs b/src/frontend/src/handler/extended_handle.rs index d6f22984f404e..47fefc5700b3d 100644 --- a/src/frontend/src/handler/extended_handle.rs +++ b/src/frontend/src/handler/extended_handle.rs @@ -18,7 +18,8 @@ use std::sync::Arc; use bytes::Bytes; use pgwire::types::Format; -use risingwave_common::error::{ErrorCode, Result}; +use risingwave_common::bail_not_implemented; +use risingwave_common::error::Result; use risingwave_common::types::DataType; use risingwave_sqlparser::ast::{CreateSink, Query, Statement}; @@ -97,11 +98,7 @@ pub fn handle_parse( } Statement::CreateView { query, .. } => { if have_parameter_in_query(query) { - return Err(ErrorCode::NotImplemented( - "CREATE VIEW with parameters".to_string(), - None.into(), - ) - .into()); + bail_not_implemented!("CREATE VIEW with parameters"); } Ok(PrepareStatement::PureStatement(statement)) } @@ -109,11 +106,7 @@ pub fn handle_parse( if let Some(query) = query && have_parameter_in_query(query) { - Err(ErrorCode::NotImplemented( - "CREATE TABLE AS SELECT with parameters".to_string(), - None.into(), - ) - .into()) + bail_not_implemented!("CREATE TABLE AS SELECT with parameters"); } else { Ok(PrepareStatement::PureStatement(statement)) } @@ -122,11 +115,7 @@ pub fn handle_parse( if let CreateSink::AsQuery(query) = &stmt.sink_from && have_parameter_in_query(query) { - Err(ErrorCode::NotImplemented( - "CREATE SINK AS SELECT with parameters".to_string(), - None.into(), - ) - .into()) + bail_not_implemented!("CREATE SINK AS SELECT with parameters"); } else { Ok(PrepareStatement::PureStatement(statement)) } diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index c682fa6d9133c..24cfb7c177e16 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -22,6 +22,7 @@ use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, S use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult}; use pgwire::pg_server::BoxedError; use pgwire::types::{Format, Row}; +use risingwave_common::bail_not_implemented; use risingwave_common::error::{ErrorCode, Result}; use risingwave_sqlparser::ast::*; @@ -230,18 +231,10 @@ pub async fn handle( cdc_table_info, } => { if or_replace { - return Err(ErrorCode::NotImplemented( - "CREATE OR REPLACE TABLE".to_string(), - None.into(), - ) - .into()); + bail_not_implemented!("CREATE OR REPLACE TABLE"); } if temporary { - return Err(ErrorCode::NotImplemented( - "CREATE TEMPORARY TABLE".to_string(), - None.into(), - ) - .into()); + bail_not_implemented!("CREATE TEMPORARY TABLE"); } if let Some(query) = query { return create_table_as::handle_create_as( @@ -313,11 +306,7 @@ pub async fn handle( | ObjectType::Database | ObjectType::User | ObjectType::Connection => { - return Err(ErrorCode::NotImplemented( - "DROP CASCADE".to_string(), - None.into(), - ) - .into()); + bail_not_implemented!("DROP CASCADE"); } }; }; @@ -397,11 +386,7 @@ pub async fn handle( emit_mode, } => { if or_replace { - return Err(ErrorCode::NotImplemented( - "CREATE OR REPLACE VIEW".to_string(), - None.into(), - ) - .into()); + bail_not_implemented!("CREATE OR REPLACE VIEW"); } if materialized { create_mv::handle_create_mv( @@ -437,9 +422,7 @@ pub async fn handle( if_not_exists, } => { if unique { - return Err( - ErrorCode::NotImplemented("create unique index".into(), None.into()).into(), - ); + bail_not_implemented!("create unique index"); } create_index::handle_create_index( @@ -704,8 +687,6 @@ pub async fn handle( object_name, comment, } => comment::handle_comment(handler_args, object_type, object_name, comment).await, - _ => Err( - ErrorCode::NotImplemented(format!("Unhandled statement: {}", stmt), None.into()).into(), - ), + _ => bail_not_implemented!("Unhandled statement: {}", stmt), } } diff --git a/src/frontend/src/handler/show.rs b/src/frontend/src/handler/show.rs index 555162d42ed64..e87d6e54bdd6f 100644 --- a/src/frontend/src/handler/show.rs +++ b/src/frontend/src/handler/show.rs @@ -20,8 +20,9 @@ use pgwire::pg_protocol::truncated_fmt; use pgwire::pg_response::{PgResponse, StatementType}; use pgwire::pg_server::Session; use pgwire::types::Row; +use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{ColumnCatalog, DEFAULT_SCHEMA_NAME}; -use risingwave_common::error::{ErrorCode, Result}; +use risingwave_common::error::Result; use risingwave_common::types::DataType; use risingwave_common::util::addr::HostAddr; use risingwave_connector::source::kafka::PRIVATELINK_CONNECTION; @@ -88,11 +89,7 @@ pub async fn handle_show_object( let session = handler_args.session; if let Some(ShowStatementFilter::Where(..)) = filter { - return Err(ErrorCode::NotImplemented( - "WHERE clause in SHOW statement".to_string(), - None.into(), - ) - .into()); + bail_not_implemented!("WHERE clause in SHOW statement"); } let row_desc = infer_show_object(&command); @@ -374,11 +371,7 @@ pub fn handle_show_create_object( index.create_sql() } ShowCreateType::Function => { - return Err(ErrorCode::NotImplemented( - format!("show create on: {}", show_create_type), - None.into(), - ) - .into()); + bail_not_implemented!("show create on: {}", show_create_type); } }; let name = format!("{}.{}", schema_name, object_name); diff --git a/src/frontend/src/handler/transaction.rs b/src/frontend/src/handler/transaction.rs index d17b1f4183ce4..ba1f8dc7845c4 100644 --- a/src/frontend/src/handler/transaction.rs +++ b/src/frontend/src/handler/transaction.rs @@ -13,7 +13,8 @@ // limitations under the License. use pgwire::pg_response::StatementType; -use risingwave_common::error::{ErrorCode, Result}; +use risingwave_common::bail_not_implemented; +use risingwave_common::error::Result; use risingwave_sqlparser::ast::{TransactionAccessMode, TransactionMode, Value}; use super::{HandlerArgs, RwPgResponse}; @@ -21,7 +22,7 @@ use crate::session::transaction::AccessMode; macro_rules! not_impl { ($body:expr) => { - Err(ErrorCode::NotImplemented($body.into(), 10736.into())) + bail_not_implemented!(issue = 10376, "{}", $body) }; } @@ -40,7 +41,7 @@ pub async fn handle_begin( TransactionMode::AccessMode(mode) => { let _ = access_mode.replace(mode); } - TransactionMode::IsolationLevel(_) => not_impl!("ISOLATION LEVEL")?, + TransactionMode::IsolationLevel(_) => not_impl!("ISOLATION LEVEL"), } } @@ -74,7 +75,7 @@ pub async fn handle_commit( let HandlerArgs { session, .. } = handler_args; if chain { - not_impl!("COMMIT AND CHAIN")?; + not_impl!("COMMIT AND CHAIN"); } session.txn_commit_explicit(); @@ -91,7 +92,7 @@ pub async fn handle_rollback( let HandlerArgs { session, .. } = handler_args; if chain { - not_impl!("ROLLBACK AND CHAIN")?; + not_impl!("ROLLBACK AND CHAIN"); } session.txn_rollback_explicit(); diff --git a/src/frontend/src/optimizer/plan_node/logical_agg.rs b/src/frontend/src/optimizer/plan_node/logical_agg.rs index b0e04f0598778..c7acfdf05a523 100644 --- a/src/frontend/src/optimizer/plan_node/logical_agg.rs +++ b/src/frontend/src/optimizer/plan_node/logical_agg.rs @@ -17,6 +17,7 @@ use itertools::Itertools; use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::types::{DataType, Datum, ScalarImpl}; use risingwave_common::util::sort_util::ColumnOrder; +use risingwave_common::{bail_not_implemented, not_implemented}; use risingwave_expr::aggregate::{agg_kinds, AggKind}; use super::generic::{self, Agg, GenericPlanRef, PlanAggCall, ProjectBuilder}; @@ -296,12 +297,7 @@ impl LogicalAggBuilder { set.into_iter() .map(|expr| input_proj_builder.add_expr(&expr)) .try_collect() - .map_err(|err| { - ErrorCode::NotImplemented( - format!("{err} inside GROUP BY"), - None.into(), - ) - }) + .map_err(|err| not_implemented!("{err} inside GROUP BY")) }) .try_collect()?; @@ -321,9 +317,7 @@ impl LogicalAggBuilder { .into_iter() .map(|expr| input_proj_builder.add_expr(&expr)) .try_collect() - .map_err(|err| { - ErrorCode::NotImplemented(format!("{err} inside GROUP BY"), None.into()) - })?; + .map_err(|err| not_implemented!("{err} inside GROUP BY"))?; (group_key, vec![]) } GroupBy::GroupingSets(grouping_sets) => gen_group_key_and_grouping_sets(grouping_sets)?, @@ -476,9 +470,7 @@ impl LogicalAggBuilder { Ok(InputRef::new(index, expr.return_type())) }) .try_collect() - .map_err(|err: &'static str| { - ErrorCode::NotImplemented(format!("{err} inside aggregation calls"), None.into()) - })?; + .map_err(|err: &'static str| not_implemented!("{err} inside aggregation calls"))?; let order_by: Vec<_> = order_by .sort_exprs @@ -489,10 +481,7 @@ impl LogicalAggBuilder { }) .try_collect() .map_err(|err: &'static str| { - ErrorCode::NotImplemented( - format!("{err} inside aggregation calls order by"), - None.into(), - ) + not_implemented!("{err} inside aggregation calls order by") })?; match agg_kind { @@ -785,10 +774,13 @@ impl ExprRewriter for LogicalAggBuilder { fn rewrite_subquery(&mut self, subquery: crate::expr::Subquery) -> ExprImpl { if subquery.is_correlated(0) { - self.error = Some(ErrorCode::NotImplemented( - "correlated subquery in HAVING or SELECT with agg".into(), - 2275.into(), - )); + self.error = Some( + not_implemented!( + issue = 2275, + "correlated subquery in HAVING or SELECT with agg", + ) + .into(), + ); } subquery.into() } @@ -1143,11 +1135,7 @@ impl ToStream for LogicalAgg { for agg_call in self.agg_calls() { if matches!(agg_call.agg_kind, agg_kinds::unimplemented_in_stream!()) { - return Err(ErrorCode::NotImplemented( - format!("{} aggregation in materialized view", agg_call.agg_kind), - None.into(), - ) - .into()); + bail_not_implemented!("{} aggregation in materialized view", agg_call.agg_kind); } } let eowc = ctx.emit_on_window_close(); diff --git a/src/frontend/src/optimizer/plan_node/logical_over_window.rs b/src/frontend/src/optimizer/plan_node/logical_over_window.rs index 665cee6f178a0..69089396eb50d 100644 --- a/src/frontend/src/optimizer/plan_node/logical_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_over_window.rs @@ -17,6 +17,7 @@ use itertools::Itertools; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::types::{DataType, Datum, ScalarImpl}; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; +use risingwave_common::{bail_not_implemented, not_implemented}; use risingwave_expr::aggregate::AggKind; use risingwave_expr::window_function::{Frame, FrameBound, WindowFuncKind}; @@ -302,23 +303,23 @@ impl<'a> OverWindowProjectBuilder<'a> { FunctionCall::new(ExprType::Multiply, vec![input.clone(), input.clone()]).unwrap(), ); self.builder.add_expr(&squared_input_expr).map_err(|err| { - ErrorCode::NotImplemented(format!("{err} inside args"), None.into()) + not_implemented!("{err} inside args") })?; } for arg in &window_function.args { - self.builder.add_expr(arg).map_err(|err| { - ErrorCode::NotImplemented(format!("{err} inside args"), None.into()) - })?; + self.builder + .add_expr(arg) + .map_err(|err| not_implemented!("{err} inside args"))?; } for partition_by in &window_function.partition_by { - self.builder.add_expr(partition_by).map_err(|err| { - ErrorCode::NotImplemented(format!("{err} inside partition_by"), None.into()) - })?; + self.builder + .add_expr(partition_by) + .map_err(|err| not_implemented!("{err} inside partition_by"))?; } for order_by in window_function.order_by.sort_exprs.iter().map(|e| &e.expr) { - self.builder.add_expr(order_by).map_err(|err| { - ErrorCode::NotImplemented(format!("{err} inside order_by"), None.into()) - })?; + self.builder + .add_expr(order_by) + .map_err(|err| not_implemented!("{err} inside order_by"))?; } Ok(()) } @@ -358,9 +359,7 @@ impl LogicalOverWindow { for (idx, field) in input.schema().fields().iter().enumerate() { input_proj_builder .add_expr(&InputRef::new(idx, field.data_type()).into()) - .map_err(|err| { - ErrorCode::NotImplemented(format!("{err} inside input"), None.into()) - })?; + .map_err(|err| not_implemented!("{err} inside input"))?; } let mut build_input_proj_visitor = OverWindowProjectBuilder::new(&mut input_proj_builder); for expr in select_exprs { @@ -456,11 +455,9 @@ impl LogicalOverWindow { if const_offset.is_none() { // should already be checked in `WindowFunction::infer_return_type`, // but just in case - return Err(ErrorCode::NotImplemented( - "non-const `offset` of `lag`/`lead` is not supported yet".to_string(), - None.into(), - ) - .into()); + bail_not_implemented!( + "non-const `offset` of `lag`/`lead` is not supported yet" + ); } const_offset.unwrap()?.map(|v| *v.as_int64()).unwrap_or(1) } else { @@ -723,11 +720,7 @@ impl ToBatch for LogicalOverWindow { .map(|e| e.index()) .collect_vec(); if partition_key_indices.is_empty() { - return Err(ErrorCode::NotImplemented( - "Window function with empty PARTITION BY is not supported yet".to_string(), - None.into(), - ) - .into()); + bail_not_implemented!("Window function with empty PARTITION BY is not supported yet"); } let input = self.input().to_batch()?; @@ -780,11 +773,9 @@ impl ToStream for LogicalOverWindow { .map(|e| e.index()) .collect_vec(); if partition_key_indices.is_empty() { - return Err(ErrorCode::NotImplemented( - "Window function with empty PARTITION BY is not supported yet".to_string(), - None.into(), - ) - .into()); + bail_not_implemented!( + "Window function with empty PARTITION BY is not supported yet" + ); } let sort_input = @@ -813,11 +804,9 @@ impl ToStream for LogicalOverWindow { .map(|e| e.index()) .collect_vec(); if partition_key_indices.is_empty() { - return Err(ErrorCode::NotImplemented( - "Window function with empty PARTITION BY is not supported yet".to_string(), - None.into(), - ) - .into()); + bail_not_implemented!( + "Window function with empty PARTITION BY is not supported yet" + ); } let new_input = diff --git a/src/frontend/src/optimizer/plan_node/logical_share.rs b/src/frontend/src/optimizer/plan_node/logical_share.rs index 3e7193342fb67..b426ca849005f 100644 --- a/src/frontend/src/optimizer/plan_node/logical_share.rs +++ b/src/frontend/src/optimizer/plan_node/logical_share.rs @@ -15,7 +15,7 @@ use std::cell::RefCell; use pretty_xmlish::{Pretty, XmlNode}; -use risingwave_common::error::ErrorCode::NotImplemented; +use risingwave_common::bail_not_implemented; use risingwave_common::error::Result; use super::utils::{childless_record, Distill}; @@ -128,11 +128,7 @@ impl PredicatePushdown for LogicalShare { impl ToBatch for LogicalShare { fn to_batch(&self) -> Result { - Err(NotImplemented( - "batch query doesn't support share operator for now".into(), - None.into(), - ) - .into()) + bail_not_implemented!("batch query doesn't support share operator for now"); } } diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 9dfec01ef6ee7..ce3bfc840374a 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -20,10 +20,11 @@ use std::rc::Rc; use fixedbitset::FixedBitSet; use itertools::Itertools; use pretty_xmlish::{Pretty, XmlNode}; +use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{ ColumnCatalog, ColumnDesc, Field, Schema, KAFKA_TIMESTAMP_COLUMN_NAME, }; -use risingwave_common::error::{ErrorCode, Result, RwError, TrackingIssue}; +use risingwave_common::error::Result; use risingwave_connector::source::{ConnectorProperties, DataType}; use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; use risingwave_pb::plan_common::GeneratedColumnDesc; @@ -536,10 +537,7 @@ impl ToBatch for LogicalSource { &self.core.catalog.as_ref().unwrap().properties, ) { - return Err(RwError::from(ErrorCode::NotImplemented( - "New S3 connector for batch".to_string(), - TrackingIssue::from(None), - ))); + bail_not_implemented!("New S3 connector for batch"); } let source = self.wrap_with_optional_generated_columns_batch_proj()?; Ok(source) diff --git a/src/frontend/src/optimizer/plan_node/logical_sys_scan.rs b/src/frontend/src/optimizer/plan_node/logical_sys_scan.rs index 56985d81a5c27..752e7829e0087 100644 --- a/src/frontend/src/optimizer/plan_node/logical_sys_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_sys_scan.rs @@ -16,8 +16,9 @@ use std::rc::Rc; use itertools::Itertools; use pretty_xmlish::{Pretty, XmlNode}; +use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{ColumnDesc, TableDesc}; -use risingwave_common::error::{ErrorCode, Result, RwError}; +use risingwave_common::error::Result; use super::generic::{GenericPlanNode, GenericPlanRef}; use super::utils::{childless_record, Distill}; @@ -360,19 +361,13 @@ impl ToBatch for LogicalSysScan { impl ToStream for LogicalSysScan { fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result { - Err(RwError::from(ErrorCode::NotImplemented( - "streaming on system table is not allowed".to_string(), - None.into(), - ))) + bail_not_implemented!("streaming on system table"); } fn logical_rewrite_for_stream( &self, _ctx: &mut RewriteStreamContext, ) -> Result<(PlanRef, ColIndexMapping)> { - Err(RwError::from(ErrorCode::NotImplemented( - "streaming on system table is not allowed".to_string(), - None.into(), - ))) + bail_not_implemented!("streaming on system table"); } } diff --git a/src/frontend/src/optimizer/plan_node/logical_topn.rs b/src/frontend/src/optimizer/plan_node/logical_topn.rs index 940714d7d4abb..b1058b506008a 100644 --- a/src/frontend/src/optimizer/plan_node/logical_topn.rs +++ b/src/frontend/src/optimizer/plan_node/logical_topn.rs @@ -14,6 +14,7 @@ use fixedbitset::FixedBitSet; use itertools::Itertools; +use risingwave_common::bail_not_implemented; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::util::sort_util::ColumnOrder; @@ -70,11 +71,7 @@ impl LogicalTopN { group_key: Vec, ) -> Result { if with_ties && offset > 0 { - return Err(ErrorCode::NotImplemented( - "WITH TIES is not supported with OFFSET".to_string(), - None.into(), - ) - .into()); + bail_not_implemented!("WITH TIES is not supported with OFFSET"); } Ok(Self::new(input, limit, offset, with_ties, order, group_key).into()) } @@ -120,10 +117,7 @@ impl LogicalTopN { Distribution::Single | Distribution::SomeShard => { self.gen_single_stream_top_n_plan(stream_input) } - Distribution::Broadcast => Err(RwError::from(ErrorCode::NotImplemented( - "topN does not support Broadcast".to_string(), - None.into(), - ))), + Distribution::Broadcast => bail_not_implemented!("topN does not support Broadcast"), Distribution::HashShard(dists) | Distribution::UpstreamHashShard(dists, _) => { self.gen_vnode_two_phase_stream_top_n_plan(stream_input, &dists) } diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index 7ff5806eceb08..20a682bf7fb69 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -15,6 +15,7 @@ use std::rc::Rc; use itertools::Itertools; +use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::types::{DataType, Interval, ScalarImpl}; @@ -91,11 +92,7 @@ impl Planner { let join_type = join.join_type; let on_clause = join.cond; if on_clause.has_subquery() { - Err(ErrorCode::NotImplemented( - "Subquery in join on condition is unsupported".into(), - None.into(), - ) - .into()) + bail_not_implemented!("Subquery in join on condition"); } else { Ok(LogicalJoin::create(left, right, join_type, on_clause)) } @@ -105,11 +102,7 @@ impl Planner { let join_type = join.join_type; let on_clause = join.cond; if on_clause.has_subquery() { - return Err(ErrorCode::NotImplemented( - "Subquery in join on condition is unsupported".into(), - None.into(), - ) - .into()); + bail_not_implemented!("Subquery in join on condition"); } let correlated_id = self.ctx.next_correlated_id(); diff --git a/src/frontend/src/planner/select.rs b/src/frontend/src/planner/select.rs index a6b578dddb0dc..fa0e08d4f0217 100644 --- a/src/frontend/src/planner/select.rs +++ b/src/frontend/src/planner/select.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use itertools::Itertools; +use risingwave_common::bail_not_implemented; use risingwave_common::catalog::Schema; use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::types::DataType; @@ -297,13 +298,7 @@ impl Planner { let right_expr = InputRef::new(input.schema().len(), output_column_type); FunctionCall::new(ExprType::Equal, vec![left_expr, right_expr.into()])?.into() } - kind => { - return Err(ErrorCode::NotImplemented( - format!("Not supported subquery kind: {:?}", kind), - 1343.into(), - ) - .into()) - } + kind => bail_not_implemented!(issue = 1343, "Not supported subquery kind: {:?}", kind), }; *input = Self::create_apply( correlated_id, @@ -378,13 +373,7 @@ impl Planner { SubqueryKind::Existential => { right = self.create_exists(right)?; } - _ => { - return Err(ErrorCode::NotImplemented( - format!("{:?}", subquery.kind), - 1343.into(), - ) - .into()) - } + _ => bail_not_implemented!(issue = 1343, "{:?}", subquery.kind), } root = Self::create_apply( diff --git a/src/stream/src/executor/error.rs b/src/stream/src/executor/error.rs index fc2f02f0aede0..b4efcac1702f4 100644 --- a/src/stream/src/executor/error.rs +++ b/src/stream/src/executor/error.rs @@ -15,7 +15,7 @@ use std::backtrace::Backtrace; use risingwave_common::array::ArrayError; -use risingwave_common::error::{BoxedError, Error, TrackingIssue}; +use risingwave_common::error::{BoxedError, Error, NotImplemented}; use risingwave_common::util::value_encoding::error::ValueEncodingError; use risingwave_connector::error::ConnectorError; use risingwave_connector::sink::SinkError; @@ -109,8 +109,8 @@ enum ErrorKind { BoxedError, ), - #[error("Feature is not yet implemented: {0}, {1}")] - NotImplemented(String, TrackingIssue), + #[error(transparent)] + NotImplemented(#[from] NotImplemented), #[error(transparent)] Internal( @@ -137,10 +137,6 @@ impl StreamExecutorError { ErrorKind::ConnectorError(error.into()).into() } - pub fn not_implemented(error: impl Into, issue: impl Into) -> Self { - ErrorKind::NotImplemented(error.into(), issue.into()).into() - } - pub fn dml_error(error: impl Error) -> Self { ErrorKind::DmlError(error.into()).into() }