diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index 577bbe327dc3c..e9d76097b0a77 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -178,9 +178,6 @@ message UpdateNode { repeated expr.ExprNode new_exprs = 4; bool returning = 5; - // // The columns indices in the input schema, representing the columns need to send to streamDML exeuctor. - // repeated uint32 update_column_indices = 5; - // Session id is used to ensure that dml data from the same session should be sent to a fixed worker node and channel. uint32 session_id = 6; } diff --git a/src/frontend/src/binder/mod.rs b/src/frontend/src/binder/mod.rs index df3f091fb2a10..96b242b188297 100644 --- a/src/frontend/src/binder/mod.rs +++ b/src/frontend/src/binder/mod.rs @@ -57,7 +57,7 @@ pub use relation::{ pub use select::{BoundDistinct, BoundSelect}; pub use set_expr::*; pub use statement::BoundStatement; -pub use update::{BoundUpdate, BoundUpdateV2, UpdateProject}; +pub use update::{BoundUpdate, UpdateProject}; pub use values::BoundValues; use crate::catalog::catalog_service::CatalogReadGuard; diff --git a/src/frontend/src/binder/statement.rs b/src/frontend/src/binder/statement.rs index ad09b31f2c031..b73fab90aed9a 100644 --- a/src/frontend/src/binder/statement.rs +++ b/src/frontend/src/binder/statement.rs @@ -18,7 +18,7 @@ use risingwave_sqlparser::ast::Statement; use super::delete::BoundDelete; use super::fetch_cursor::BoundFetchCursor; -use super::update::BoundUpdateV2; +use super::update::BoundUpdate; use crate::binder::create_view::BoundCreateView; use crate::binder::{Binder, BoundInsert, BoundQuery}; use crate::error::Result; @@ -28,7 +28,7 @@ use crate::expr::ExprRewriter; pub enum BoundStatement { Insert(Box), Delete(Box), - Update(Box), + Update(Box), Query(Box), FetchCursor(Box), CreateView(Box), @@ -86,7 +86,7 @@ impl Binder { selection, returning, } => Ok(BoundStatement::Update( - self.bind_update_v2(table_name, assignments, selection, returning)? + self.bind_update(table_name, assignments, selection, returning)? .into(), )), diff --git a/src/frontend/src/binder/update.rs b/src/frontend/src/binder/update.rs index 22693381fc241..9d2dda1a7eea4 100644 --- a/src/frontend/src/binder/update.rs +++ b/src/frontend/src/binder/update.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap}; use fixedbitset::FixedBitSet; @@ -26,60 +25,10 @@ use super::statement::RewriteExprsRecursive; use super::{Binder, BoundBaseTable}; use crate::catalog::TableId; use crate::error::{bail_bind_error, ErrorCode, Result, RwError}; -use crate::expr::{Expr as _, ExprImpl, InputRef, SubqueryKind}; +use crate::expr::{Expr as _, ExprImpl, SubqueryKind}; use crate::user::UserId; use crate::TableCatalog; -#[derive(Debug, Clone)] -pub struct BoundUpdate { - /// Id of the table to perform updating. - pub table_id: TableId, - - /// Version id of the table. - pub table_version_id: TableVersionId, - - /// Name of the table to perform updating. - pub table_name: String, - - /// Owner of the table to perform updating. - pub owner: UserId, - - /// Used for scanning the records to update with the `selection`. - pub table: BoundBaseTable, - - pub selection: Option, - - /// Expression used to project to the updated row. The assigned columns will use the new - /// expression, and the other columns will be simply `InputRef`. - pub exprs: Vec, - - // used for the 'RETURNING" keyword to indicate the returning items and schema - // if the list is empty and the schema is None, the output schema will be a INT64 as the - // affected row cnt - pub returning_list: Vec, - - pub returning_schema: Option, -} - -impl RewriteExprsRecursive for BoundUpdate { - fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) { - self.selection = - std::mem::take(&mut self.selection).map(|expr| rewriter.rewrite_expr(expr)); - - let new_exprs = std::mem::take(&mut self.exprs) - .into_iter() - .map(|expr| rewriter.rewrite_expr(expr)) - .collect::>(); - self.exprs = new_exprs; - - let new_returning_list = std::mem::take(&mut self.returning_list) - .into_iter() - .map(|expr| rewriter.rewrite_expr(expr)) - .collect::>(); - self.returning_list = new_returning_list; - } -} - #[derive(Debug, Clone, Copy)] pub enum UpdateProject { Expr(usize), @@ -87,7 +36,7 @@ pub enum UpdateProject { } #[derive(Debug, Clone)] -pub struct BoundUpdateV2 { +pub struct BoundUpdate { /// Id of the table to perform updating. pub table_id: TableId, @@ -119,7 +68,7 @@ pub struct BoundUpdateV2 { pub returning_schema: Option, } -impl RewriteExprsRecursive for BoundUpdateV2 { +impl RewriteExprsRecursive for BoundUpdate { fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) { self.selection = std::mem::take(&mut self.selection).map(|expr| rewriter.rewrite_expr(expr)); @@ -156,161 +105,13 @@ fn get_col_referenced_by_generated_pk(table_catalog: &TableCatalog) -> Result, - // selection: Option, - // returning_items: Vec, - // ) -> Result { - // let (schema_name, table_name) = Self::resolve_schema_qualified_name(&self.db_name, name)?; - - // let table_catalog = self.resolve_dml_table(schema_name.as_deref(), &table_name, false)?; - // let default_columns_from_catalog = - // table_catalog.default_columns().collect::>(); - // if !returning_items.is_empty() && table_catalog.has_generated_column() { - // return Err(RwError::from(ErrorCode::BindError( - // "`RETURNING` clause is not supported for tables with generated columns".to_string(), - // ))); - // } - - // let table_id = table_catalog.id; - // let owner = table_catalog.owner; - // let table_version_id = table_catalog.version_id().expect("table must be versioned"); - // let cols_refed_by_generated_pk = get_col_referenced_by_generated_pk(table_catalog)?; - - // let table = self.bind_table(schema_name.as_deref(), &table_name, None)?; - - // let selection = selection.map(|expr| self.bind_expr(expr)).transpose()?; - - // let mut assignment_exprs = HashMap::new(); - // for Assignment { id, value } in assignments { - // // FIXME: Parsing of `id` is not strict. It will even treat `a.b` as `(a, b)`. - // let assignments = match (id.as_slice(), value) { - // // _ = (subquery) - // (_ids, AssignmentValue::Expr(Expr::Subquery(_))) => { - // return Err(ErrorCode::BindError( - // "subquery on the right side of assignment is unsupported".to_owned(), - // ) - // .into()) - // } - // // col = expr - // ([id], value) => { - // vec![(id.clone(), value)] - // } - // // (col1, col2) = (expr1, expr2) - // // TODO: support `DEFAULT` in multiple assignments - // (ids, AssignmentValue::Expr(Expr::Row(values))) if ids.len() == values.len() => id - // .into_iter() - // .zip_eq_fast(values.into_iter().map(AssignmentValue::Expr)) - // .collect(), - // // (col1, col2) = - // _ => { - // return Err(ErrorCode::BindError( - // "number of columns does not match number of values".to_owned(), - // ) - // .into()) - // } - // }; - - // for (id, value) in assignments { - // let id_expr = self.bind_expr(Expr::Identifier(id.clone()))?; - // let id_index = if let Some(id_input_ref) = id_expr.clone().as_input_ref() { - // let id_index = id_input_ref.index; - // if table - // .table_catalog - // .pk() - // .iter() - // .any(|k| k.column_index == id_index) - // { - // return Err(ErrorCode::BindError( - // "update modifying the PK column is unsupported".to_owned(), - // ) - // .into()); - // } - // if table - // .table_catalog - // .generated_col_idxes() - // .contains(&id_index) - // { - // return Err(ErrorCode::BindError( - // "update modifying the generated column is unsupported".to_owned(), - // ) - // .into()); - // } - // if cols_refed_by_generated_pk.contains(id_index) { - // return Err(ErrorCode::BindError( - // "update modifying the column referenced by generated columns that are part of the primary key is not allowed".to_owned(), - // ) - // .into()); - // } - // id_index - // } else { - // unreachable!() - // }; - - // let value_expr = match value { - // AssignmentValue::Expr(expr) => { - // self.bind_expr(expr)?.cast_assign(id_expr.return_type())? - // } - // AssignmentValue::Default => default_columns_from_catalog - // .get(&id_index) - // .cloned() - // .unwrap_or_else(|| ExprImpl::literal_null(id_expr.return_type())), - // }; - - // match assignment_exprs.entry(id_expr) { - // Entry::Occupied(_) => { - // return Err(ErrorCode::BindError( - // "multiple assignments to same column".to_owned(), - // ) - // .into()) - // } - // Entry::Vacant(v) => { - // v.insert(value_expr); - // } - // } - // } - // } - - // let exprs = table - // .table_catalog - // .columns() - // .iter() - // .enumerate() - // .filter_map(|(i, c)| { - // (!c.is_generated()).then_some(InputRef::new(i, c.data_type().clone()).into()) - // }) - // .map(|c| assignment_exprs.remove(&c).unwrap_or(c)) - // .collect_vec(); - - // let (returning_list, fields) = self.bind_returning_list(returning_items)?; - // let returning = !returning_list.is_empty(); - - // Ok(BoundUpdate { - // table_id, - // table_version_id, - // table_name, - // owner, - // table, - // selection, - // exprs, - // returning_list, - // returning_schema: if returning { - // Some(Schema { fields }) - // } else { - // None - // }, - // }) - // } - - pub(super) fn bind_update_v2( + pub(super) fn bind_update( &mut self, name: ObjectName, assignments: Vec, selection: Option, returning_items: Vec, - ) -> Result { + ) -> Result { let (schema_name, table_name) = Self::resolve_schema_qualified_name(&self.db_name, name)?; let table_catalog = self.resolve_dml_table(schema_name.as_deref(), &table_name, false)?; @@ -441,7 +242,7 @@ impl Binder { let (returning_list, fields) = self.bind_returning_list(returning_items)?; let returning = !returning_list.is_empty(); - Ok(BoundUpdateV2 { + Ok(BoundUpdate { table_id, table_version_id, table_name, diff --git a/src/frontend/src/optimizer/plan_node/batch_update.rs b/src/frontend/src/optimizer/plan_node/batch_update.rs index 9a5891d27d835..28dfa79916cc9 100644 --- a/src/frontend/src/optimizer/plan_node/batch_update.rs +++ b/src/frontend/src/optimizer/plan_node/batch_update.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use itertools::Itertools; use risingwave_common::catalog::Schema; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::UpdateNode; diff --git a/src/frontend/src/optimizer/plan_node/logical_update.rs b/src/frontend/src/optimizer/plan_node/logical_update.rs index f5d0bdc089644..a5590501715b9 100644 --- a/src/frontend/src/optimizer/plan_node/logical_update.rs +++ b/src/frontend/src/optimizer/plan_node/logical_update.rs @@ -12,17 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::catalog::TableVersionId; - use super::generic::GenericPlanRef; use super::utils::impl_distill_by_unit; use super::{ gen_filter_and_pushdown, generic, BatchUpdate, ColPrunable, ExprRewritable, Logical, LogicalProject, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream, }; -use crate::catalog::TableId; use crate::error::Result; -use crate::expr::{ExprImpl, ExprRewriter, ExprVisitor}; +use crate::expr::{ExprRewriter, ExprVisitor}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::{ ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext, diff --git a/src/frontend/src/planner/statement.rs b/src/frontend/src/planner/statement.rs index dc957da66c016..91c1b9edfc619 100644 --- a/src/frontend/src/planner/statement.rs +++ b/src/frontend/src/planner/statement.rs @@ -22,7 +22,7 @@ impl Planner { match stmt { BoundStatement::Insert(i) => self.plan_insert(*i), BoundStatement::Delete(d) => self.plan_delete(*d), - BoundStatement::Update(u) => self.plan_update_v2(*u), + BoundStatement::Update(u) => self.plan_update(*u), BoundStatement::Query(q) => self.plan_query(*q), BoundStatement::FetchCursor(_) => unimplemented!(), BoundStatement::CreateView(c) => self.plan_query(*c.query), diff --git a/src/frontend/src/planner/update.rs b/src/frontend/src/planner/update.rs index 8ad7d9e036314..1e827c72764c9 100644 --- a/src/frontend/src/planner/update.rs +++ b/src/frontend/src/planner/update.rs @@ -17,7 +17,7 @@ use risingwave_common::types::{DataType, Scalar}; use risingwave_pb::expr::expr_node::Type; use super::Planner; -use crate::binder::{BoundUpdateV2, UpdateProject}; +use crate::binder::{BoundUpdate, UpdateProject}; use crate::error::Result; use crate::expr::{ExprImpl, FunctionCall, InputRef, Literal}; use crate::optimizer::plan_node::generic::GenericPlanRef; @@ -26,7 +26,7 @@ use crate::optimizer::property::{Order, RequiredDist}; use crate::optimizer::{PlanRef, PlanRoot}; impl Planner { - pub(super) fn plan_update_v2(&mut self, update: BoundUpdateV2) -> Result { + pub(super) fn plan_update(&mut self, update: BoundUpdate) -> Result { let scan = self.plan_base_table(&update.table)?; let input = if let Some(expr) = update.selection { self.plan_where(scan, expr)? @@ -35,14 +35,6 @@ impl Planner { }; let returning = !update.returning_list.is_empty(); - // let update_column_indices = update - // .table - // .table_catalog - // .columns() - // .iter() - // .enumerate() - // .filter_map(|(i, c)| (!c.is_generated()).then_some(i)) - // .collect_vec(); let schema_len = input.schema().len();