From 4ce37c05553e7302b0e28ba35d0f2184a17dcebc Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Fri, 15 Nov 2024 16:35:49 +0800 Subject: [PATCH 01/11] refactor(frontend): rework `UPDATE` Signed-off-by: Bugen Zhao --- proto/batch_plan.proto | 12 +- src/batch/src/executor/update.rs | 64 +-- src/frontend/src/binder/expr/subquery.rs | 2 +- src/frontend/src/binder/mod.rs | 2 +- src/frontend/src/binder/statement.rs | 6 +- src/frontend/src/binder/update.rs | 403 +++++++++++++----- src/frontend/src/error.rs | 4 +- src/frontend/src/expr/subquery.rs | 2 + .../src/optimizer/plan_node/batch_update.rs | 21 +- .../src/optimizer/plan_node/generic/update.rs | 45 +- .../src/optimizer/plan_node/logical_update.rs | 27 +- src/frontend/src/planner/select.rs | 4 +- src/frontend/src/planner/statement.rs | 2 +- src/frontend/src/planner/update.rs | 90 +++- 14 files changed, 478 insertions(+), 206 deletions(-) diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index b46230b2438d6..577bbe327dc3c 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -173,11 +173,13 @@ message UpdateNode { // Id of the table to perform updating. uint32 table_id = 1; // Version of the table. - uint64 table_version_id = 4; - repeated expr.ExprNode exprs = 2; - bool returning = 3; - // The columns indices in the input schema, representing the columns need to send to streamDML exeuctor. - repeated uint32 update_column_indices = 5; + uint64 table_version_id = 2; + repeated expr.ExprNode old_exprs = 3; + repeated expr.ExprNode new_exprs = 4; + bool returning = 5; + + // // The columns indices in the input schema, representing the columns need to send to streamDML exeuctor. + // repeated uint32 update_column_indices = 5; // Session id is used to ensure that dml data from the same session should be sent to a fixed worker node and channel. uint32 session_id = 6; diff --git a/src/batch/src/executor/update.rs b/src/batch/src/executor/update.rs index a753aef840f52..e9afc4748cd4a 100644 --- a/src/batch/src/executor/update.rs +++ b/src/batch/src/executor/update.rs @@ -42,13 +42,13 @@ pub struct UpdateExecutor { table_version_id: TableVersionId, dml_manager: DmlManagerRef, child: BoxedExecutor, - exprs: Vec, + old_exprs: Vec, + new_exprs: Vec, chunk_size: usize, schema: Schema, identity: String, returning: bool, txn_id: TxnId, - update_column_indices: Vec, session_id: u32, } @@ -59,11 +59,11 @@ impl UpdateExecutor { table_version_id: TableVersionId, dml_manager: DmlManagerRef, child: BoxedExecutor, - exprs: Vec, + old_exprs: Vec, + new_exprs: Vec, chunk_size: usize, identity: String, returning: bool, - update_column_indices: Vec, session_id: u32, ) -> Self { let chunk_size = chunk_size.next_multiple_of(2); @@ -75,7 +75,8 @@ impl UpdateExecutor { table_version_id, dml_manager, child, - exprs, + old_exprs, + new_exprs, chunk_size, schema: if returning { table_schema @@ -87,7 +88,6 @@ impl UpdateExecutor { identity, returning, txn_id, - update_column_indices, session_id, } } @@ -122,15 +122,12 @@ impl UpdateExecutor { assert_eq!( data_types, - self.exprs.iter().map(|e| e.return_type()).collect_vec(), + self.new_exprs.iter().map(|e| e.return_type()).collect_vec(), "bad update schema" ); assert_eq!( data_types, - self.update_column_indices - .iter() - .map(|i: &usize| self.child.schema()[*i].data_type.clone()) - .collect_vec(), + self.old_exprs.iter().map(|e| e.return_type()).collect_vec(), "bad update schema" ); @@ -159,27 +156,35 @@ impl UpdateExecutor { let mut rows_updated = 0; #[for_await] - for data_chunk in self.child.execute() { - let data_chunk = data_chunk?; + for input in self.child.execute() { + let input = input?; + + let old_data_chunk = { + let mut columns = Vec::with_capacity(self.old_exprs.len()); + for expr in &mut self.old_exprs { + let column = expr.eval(&input).await?; + columns.push(column); + } + + DataChunk::new(columns, input.visibility().clone()) + }; let updated_data_chunk = { - let mut columns = Vec::with_capacity(self.exprs.len()); - for expr in &mut self.exprs { - let column = expr.eval(&data_chunk).await?; + let mut columns = Vec::with_capacity(self.new_exprs.len()); + for expr in &mut self.new_exprs { + let column = expr.eval(&input).await?; columns.push(column); } - DataChunk::new(columns, data_chunk.visibility().clone()) + DataChunk::new(columns, input.visibility().clone()) }; if self.returning { yield updated_data_chunk.clone(); } - for (row_delete, row_insert) in data_chunk - .project(&self.update_column_indices) - .rows() - .zip_eq_debug(updated_data_chunk.rows()) + for (row_delete, row_insert) in + (old_data_chunk.rows()).zip_eq_debug(updated_data_chunk.rows()) { rows_updated += 1; // If row_delete == row_insert, we don't need to do a actual update @@ -227,34 +232,35 @@ impl BoxedExecutorBuilder for UpdateExecutor { let table_id = TableId::new(update_node.table_id); - let exprs: Vec<_> = update_node - .get_exprs() + let old_exprs: Vec<_> = update_node + .get_old_exprs() .iter() .map(build_from_prost) .try_collect()?; - let update_column_indices = update_node - .update_column_indices + let new_exprs: Vec<_> = update_node + .get_new_exprs() .iter() - .map(|x| *x as usize) - .collect_vec(); + .map(build_from_prost) + .try_collect()?; Ok(Box::new(Self::new( table_id, update_node.table_version_id, source.context().dml_manager(), child, - exprs, + old_exprs, + new_exprs, source.context.get_config().developer.chunk_size, source.plan_node().get_identity().clone(), update_node.returning, - update_column_indices, update_node.session_id, ))) } } #[cfg(test)] +#[cfg(any())] mod tests { use std::sync::Arc; diff --git a/src/frontend/src/binder/expr/subquery.rs b/src/frontend/src/binder/expr/subquery.rs index 51819116771f1..2924f344700ae 100644 --- a/src/frontend/src/binder/expr/subquery.rs +++ b/src/frontend/src/binder/expr/subquery.rs @@ -19,7 +19,7 @@ use crate::error::{ErrorCode, Result}; use crate::expr::{ExprImpl, Subquery, SubqueryKind}; impl Binder { - pub(super) fn bind_subquery_expr( + pub fn bind_subquery_expr( &mut self, query: Query, kind: SubqueryKind, diff --git a/src/frontend/src/binder/mod.rs b/src/frontend/src/binder/mod.rs index adb7a1b9d0f2f..df3f091fb2a10 100644 --- a/src/frontend/src/binder/mod.rs +++ b/src/frontend/src/binder/mod.rs @@ -57,7 +57,7 @@ pub use relation::{ pub use select::{BoundDistinct, BoundSelect}; pub use set_expr::*; pub use statement::BoundStatement; -pub use update::BoundUpdate; +pub use update::{BoundUpdate, BoundUpdateV2, UpdateProject}; pub use values::BoundValues; use crate::catalog::catalog_service::CatalogReadGuard; diff --git a/src/frontend/src/binder/statement.rs b/src/frontend/src/binder/statement.rs index b73fab90aed9a..ad09b31f2c031 100644 --- a/src/frontend/src/binder/statement.rs +++ b/src/frontend/src/binder/statement.rs @@ -18,7 +18,7 @@ use risingwave_sqlparser::ast::Statement; use super::delete::BoundDelete; use super::fetch_cursor::BoundFetchCursor; -use super::update::BoundUpdate; +use super::update::BoundUpdateV2; use crate::binder::create_view::BoundCreateView; use crate::binder::{Binder, BoundInsert, BoundQuery}; use crate::error::Result; @@ -28,7 +28,7 @@ use crate::expr::ExprRewriter; pub enum BoundStatement { Insert(Box), Delete(Box), - Update(Box), + Update(Box), Query(Box), FetchCursor(Box), CreateView(Box), @@ -86,7 +86,7 @@ impl Binder { selection, returning, } => Ok(BoundStatement::Update( - self.bind_update(table_name, assignments, selection, returning)? + self.bind_update_v2(table_name, assignments, selection, returning)? .into(), )), diff --git a/src/frontend/src/binder/update.rs b/src/frontend/src/binder/update.rs index 9cc80dbde4471..22693381fc241 100644 --- a/src/frontend/src/binder/update.rs +++ b/src/frontend/src/binder/update.rs @@ -18,14 +18,15 @@ use std::collections::{BTreeMap, HashMap}; use fixedbitset::FixedBitSet; use itertools::Itertools; use risingwave_common::catalog::{Schema, TableVersionId}; +use risingwave_common::types::DataType; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_sqlparser::ast::{Assignment, AssignmentValue, Expr, ObjectName, SelectItem}; use super::statement::RewriteExprsRecursive; use super::{Binder, BoundBaseTable}; use crate::catalog::TableId; -use crate::error::{ErrorCode, Result, RwError}; -use crate::expr::{Expr as _, ExprImpl, InputRef}; +use crate::error::{bail_bind_error, ErrorCode, Result, RwError}; +use crate::expr::{Expr as _, ExprImpl, InputRef, SubqueryKind}; use crate::user::UserId; use crate::TableCatalog; @@ -79,6 +80,64 @@ impl RewriteExprsRecursive for BoundUpdate { } } +#[derive(Debug, Clone, Copy)] +pub enum UpdateProject { + Expr(usize), + Composite(usize, usize), +} + +#[derive(Debug, Clone)] +pub struct BoundUpdateV2 { + /// Id of the table to perform updating. + pub table_id: TableId, + + /// Version id of the table. + pub table_version_id: TableVersionId, + + /// Name of the table to perform updating. + pub table_name: String, + + /// Owner of the table to perform updating. + pub owner: UserId, + + /// Used for scanning the records to update with the `selection`. + pub table: BoundBaseTable, + + pub selection: Option, + + pub projects: HashMap, + + /// Expression used to project to the updated row. The assigned columns will use the new + /// expression, and the other columns will be simply `InputRef`. + pub exprs: Vec, + + // used for the 'RETURNING" keyword to indicate the returning items and schema + // if the list is empty and the schema is None, the output schema will be a INT64 as the + // affected row cnt + pub returning_list: Vec, + + pub returning_schema: Option, +} + +impl RewriteExprsRecursive for BoundUpdateV2 { + fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) { + self.selection = + std::mem::take(&mut self.selection).map(|expr| rewriter.rewrite_expr(expr)); + + let new_exprs = std::mem::take(&mut self.exprs) + .into_iter() + .map(|expr| rewriter.rewrite_expr(expr)) + .collect::>(); + self.exprs = new_exprs; + + let new_returning_list = std::mem::take(&mut self.returning_list) + .into_iter() + .map(|expr| rewriter.rewrite_expr(expr)) + .collect::>(); + self.returning_list = new_returning_list; + } +} + fn get_col_referenced_by_generated_pk(table_catalog: &TableCatalog) -> Result { let column_num = table_catalog.columns().len(); let pk_col_id = table_catalog.pk_column_ids(); @@ -97,13 +156,161 @@ fn get_col_referenced_by_generated_pk(table_catalog: &TableCatalog) -> Result, + // selection: Option, + // returning_items: Vec, + // ) -> Result { + // let (schema_name, table_name) = Self::resolve_schema_qualified_name(&self.db_name, name)?; + + // let table_catalog = self.resolve_dml_table(schema_name.as_deref(), &table_name, false)?; + // let default_columns_from_catalog = + // table_catalog.default_columns().collect::>(); + // if !returning_items.is_empty() && table_catalog.has_generated_column() { + // return Err(RwError::from(ErrorCode::BindError( + // "`RETURNING` clause is not supported for tables with generated columns".to_string(), + // ))); + // } + + // let table_id = table_catalog.id; + // let owner = table_catalog.owner; + // let table_version_id = table_catalog.version_id().expect("table must be versioned"); + // let cols_refed_by_generated_pk = get_col_referenced_by_generated_pk(table_catalog)?; + + // let table = self.bind_table(schema_name.as_deref(), &table_name, None)?; + + // let selection = selection.map(|expr| self.bind_expr(expr)).transpose()?; + + // let mut assignment_exprs = HashMap::new(); + // for Assignment { id, value } in assignments { + // // FIXME: Parsing of `id` is not strict. It will even treat `a.b` as `(a, b)`. + // let assignments = match (id.as_slice(), value) { + // // _ = (subquery) + // (_ids, AssignmentValue::Expr(Expr::Subquery(_))) => { + // return Err(ErrorCode::BindError( + // "subquery on the right side of assignment is unsupported".to_owned(), + // ) + // .into()) + // } + // // col = expr + // ([id], value) => { + // vec![(id.clone(), value)] + // } + // // (col1, col2) = (expr1, expr2) + // // TODO: support `DEFAULT` in multiple assignments + // (ids, AssignmentValue::Expr(Expr::Row(values))) if ids.len() == values.len() => id + // .into_iter() + // .zip_eq_fast(values.into_iter().map(AssignmentValue::Expr)) + // .collect(), + // // (col1, col2) = + // _ => { + // return Err(ErrorCode::BindError( + // "number of columns does not match number of values".to_owned(), + // ) + // .into()) + // } + // }; + + // for (id, value) in assignments { + // let id_expr = self.bind_expr(Expr::Identifier(id.clone()))?; + // let id_index = if let Some(id_input_ref) = id_expr.clone().as_input_ref() { + // let id_index = id_input_ref.index; + // if table + // .table_catalog + // .pk() + // .iter() + // .any(|k| k.column_index == id_index) + // { + // return Err(ErrorCode::BindError( + // "update modifying the PK column is unsupported".to_owned(), + // ) + // .into()); + // } + // if table + // .table_catalog + // .generated_col_idxes() + // .contains(&id_index) + // { + // return Err(ErrorCode::BindError( + // "update modifying the generated column is unsupported".to_owned(), + // ) + // .into()); + // } + // if cols_refed_by_generated_pk.contains(id_index) { + // return Err(ErrorCode::BindError( + // "update modifying the column referenced by generated columns that are part of the primary key is not allowed".to_owned(), + // ) + // .into()); + // } + // id_index + // } else { + // unreachable!() + // }; + + // let value_expr = match value { + // AssignmentValue::Expr(expr) => { + // self.bind_expr(expr)?.cast_assign(id_expr.return_type())? + // } + // AssignmentValue::Default => default_columns_from_catalog + // .get(&id_index) + // .cloned() + // .unwrap_or_else(|| ExprImpl::literal_null(id_expr.return_type())), + // }; + + // match assignment_exprs.entry(id_expr) { + // Entry::Occupied(_) => { + // return Err(ErrorCode::BindError( + // "multiple assignments to same column".to_owned(), + // ) + // .into()) + // } + // Entry::Vacant(v) => { + // v.insert(value_expr); + // } + // } + // } + // } + + // let exprs = table + // .table_catalog + // .columns() + // .iter() + // .enumerate() + // .filter_map(|(i, c)| { + // (!c.is_generated()).then_some(InputRef::new(i, c.data_type().clone()).into()) + // }) + // .map(|c| assignment_exprs.remove(&c).unwrap_or(c)) + // .collect_vec(); + + // let (returning_list, fields) = self.bind_returning_list(returning_items)?; + // let returning = !returning_list.is_empty(); + + // Ok(BoundUpdate { + // table_id, + // table_version_id, + // table_name, + // owner, + // table, + // selection, + // exprs, + // returning_list, + // returning_schema: if returning { + // Some(Schema { fields }) + // } else { + // None + // }, + // }) + // } + + pub(super) fn bind_update_v2( &mut self, name: ObjectName, assignments: Vec, selection: Option, returning_items: Vec, - ) -> Result { + ) -> Result { let (schema_name, table_name) = Self::resolve_schema_qualified_name(&self.db_name, name)?; let table_catalog = self.resolve_dml_table(schema_name.as_deref(), &table_name, false)?; @@ -124,118 +331,124 @@ impl Binder { let selection = selection.map(|expr| self.bind_expr(expr)).transpose()?; - let mut assignment_exprs = HashMap::new(); + let mut exprs = Vec::new(); + let mut projects = HashMap::new(); + for Assignment { id, value } in assignments { - // FIXME: Parsing of `id` is not strict. It will even treat `a.b` as `(a, b)`. - let assignments = match (id.as_slice(), value) { - // _ = (subquery) - (_ids, AssignmentValue::Expr(Expr::Subquery(_))) => { - return Err(ErrorCode::BindError( - "subquery on the right side of assignment is unsupported".to_owned(), - ) - .into()) - } - // col = expr + let ids: Vec<_> = id + .into_iter() + .map(|id| self.bind_expr(Expr::Identifier(id))) + .try_collect()?; + + match (ids.as_slice(), value) { ([id], value) => { - vec![(id.clone(), value)] - } - // (col1, col2) = (expr1, expr2) - // TODO: support `DEFAULT` in multiple assignments - (ids, AssignmentValue::Expr(Expr::Row(values))) if ids.len() == values.len() => id - .into_iter() - .zip_eq_fast(values.into_iter().map(AssignmentValue::Expr)) - .collect(), - // (col1, col2) = - _ => { - return Err(ErrorCode::BindError( - "number of columns does not match number of values".to_owned(), - ) - .into()) + let id_index = id.as_input_ref().unwrap().index; + + let expr = match value { + AssignmentValue::Expr(expr) => { + self.bind_expr(expr)?.cast_assign(id.return_type())? + } + AssignmentValue::Default => default_columns_from_catalog + .get(&id_index) + .cloned() + .unwrap_or_else(|| ExprImpl::literal_null(id.return_type())), + }; + + exprs.push(expr); + projects + .try_insert(id_index, UpdateProject::Expr(exprs.len() - 1)) + .expect("multiple assignments"); } - }; - - for (id, value) in assignments { - let id_expr = self.bind_expr(Expr::Identifier(id.clone()))?; - let id_index = if let Some(id_input_ref) = id_expr.clone().as_input_ref() { - let id_index = id_input_ref.index; - if table - .table_catalog - .pk() - .iter() - .any(|k| k.column_index == id_index) - { - return Err(ErrorCode::BindError( - "update modifying the PK column is unsupported".to_owned(), - ) - .into()); - } - if table - .table_catalog - .generated_col_idxes() - .contains(&id_index) - { - return Err(ErrorCode::BindError( - "update modifying the generated column is unsupported".to_owned(), - ) - .into()); - } - if cols_refed_by_generated_pk.contains(id_index) { - return Err(ErrorCode::BindError( - "update modifying the column referenced by generated columns that are part of the primary key is not allowed".to_owned(), - ) - .into()); + (ids, AssignmentValue::Default) => { + for id in ids { + let id_index = id.as_input_ref().unwrap().index; + + let expr = default_columns_from_catalog + .get(&id_index) + .cloned() + .unwrap_or_else(|| ExprImpl::literal_null(id.return_type())); + + exprs.push(expr); + projects + .try_insert(id_index, UpdateProject::Expr(exprs.len() - 1)) + .expect("multiple assignments"); } - id_index - } else { - unreachable!() - }; - - let value_expr = match value { - AssignmentValue::Expr(expr) => { - self.bind_expr(expr)?.cast_assign(id_expr.return_type())? + } + (ids, AssignmentValue::Expr(Expr::Row(values))) => { + if ids.len() != values.len() { + bail_bind_error!("number of columns does not match number of values"); } - AssignmentValue::Default => default_columns_from_catalog - .get(&id_index) - .cloned() - .unwrap_or_else(|| ExprImpl::literal_null(id_expr.return_type())), - }; - - match assignment_exprs.entry(id_expr) { - Entry::Occupied(_) => { - return Err(ErrorCode::BindError( - "multiple assignments to same column".to_owned(), - ) - .into()) + + for (id, value) in ids.iter().zip_eq_fast(values) { + let id_index = id.as_input_ref().unwrap().index; + + let expr = self.bind_expr(value)?.cast_assign(id.return_type())?; + + exprs.push(expr); + projects + .try_insert(id_index, UpdateProject::Expr(exprs.len() - 1)) + .expect("multiple assignments"); } - Entry::Vacant(v) => { - v.insert(value_expr); + } + (ids, AssignmentValue::Expr(Expr::Subquery(subquery))) => { + let target_type = DataType::new_unnamed_struct( + ids.iter().map(|id| id.return_type()).collect(), + ); + + let expr = self + .bind_subquery_expr(*subquery, SubqueryKind::UpdateSet)? + .cast_assign(target_type)?; + + exprs.push(expr); + + for (i, id) in ids.iter().enumerate() { + let id_index = id.as_input_ref().unwrap().index; + projects + .try_insert(id_index, UpdateProject::Composite(exprs.len() - 1, i)) + .expect("multiple assignments"); } } + (_ids, _expr) => { + bail_bind_error!("unsupported assignment"); + } } } - let exprs = table - .table_catalog - .columns() - .iter() - .enumerate() - .filter_map(|(i, c)| { - c.can_dml() - .then_some(InputRef::new(i, c.data_type().clone()).into()) - }) - .map(|c| assignment_exprs.remove(&c).unwrap_or(c)) - .collect_vec(); + for &id_index in projects.keys() { + if (table.table_catalog.pk()) + .iter() + .any(|k| k.column_index == id_index) + { + return Err(ErrorCode::BindError( + "update modifying the PK column is unsupported".to_owned(), + ) + .into()); + } + if (table.table_catalog.generated_col_idxes()).contains(&id_index) { + return Err(ErrorCode::BindError( + "update modifying the generated column is unsupported".to_owned(), + ) + .into()); + } + if cols_refed_by_generated_pk.contains(id_index) { + return Err(ErrorCode::BindError( + "update modifying the column referenced by generated columns that are part of the primary key is not allowed".to_owned(), + ) + .into()); + } + } let (returning_list, fields) = self.bind_returning_list(returning_items)?; let returning = !returning_list.is_empty(); - Ok(BoundUpdate { + Ok(BoundUpdateV2 { table_id, table_version_id, table_name, owner, table, selection, + projects, exprs, returning_list, returning_schema: if returning { diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 3092c9bee91a9..8adc1573e4a37 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -33,7 +33,7 @@ use tokio::task::JoinError; // - Some variants are never constructed. // - Some variants store a type-erased `BoxedError` to resolve the reverse dependency. // It's not necessary anymore as the error type is now defined at the top-level. -#[derive(Error, thiserror_ext::ReportDebug, thiserror_ext::Box)] +#[derive(Error, thiserror_ext::ReportDebug, thiserror_ext::Box, thiserror_ext::Macro)] #[thiserror_ext(newtype(name = RwError, backtrace))] pub enum ErrorCode { #[error("internal error: {0}")] @@ -105,7 +105,7 @@ pub enum ErrorCode { // TODO: use a new type for bind error // TODO(error-handling): should prefer use error types than strings. #[error("Bind error: {0}")] - BindError(String), + BindError(#[message] String), // TODO: only keep this one #[error("Failed to bind expression: {expr}: {error}")] BindErrorRoot { diff --git a/src/frontend/src/expr/subquery.rs b/src/frontend/src/expr/subquery.rs index 62f59c934dd6d..8ad682d8948ba 100644 --- a/src/frontend/src/expr/subquery.rs +++ b/src/frontend/src/expr/subquery.rs @@ -24,6 +24,7 @@ use crate::expr::{CorrelatedId, Depth}; pub enum SubqueryKind { /// Returns a scalar value (single column single row). Scalar, + UpdateSet, /// `EXISTS` | `NOT EXISTS` subquery (semi/anti-semi join). Returns a boolean. Existential, /// `IN` subquery. @@ -88,6 +89,7 @@ impl Expr for Subquery { assert_eq!(types.len(), 1, "Subquery with more than one column"); types[0].clone() } + SubqueryKind::UpdateSet => DataType::new_unnamed_struct(self.query.data_types()), SubqueryKind::Array => { let types = self.query.data_types(); assert_eq!(types.len(), 1, "Subquery with more than one column"); diff --git a/src/frontend/src/optimizer/plan_node/batch_update.rs b/src/frontend/src/optimizer/plan_node/batch_update.rs index d0351e6fdec2e..9a5891d27d835 100644 --- a/src/frontend/src/optimizer/plan_node/batch_update.rs +++ b/src/frontend/src/optimizer/plan_node/batch_update.rs @@ -84,20 +84,21 @@ impl ToDistributedBatch for BatchUpdate { impl ToBatchPb for BatchUpdate { fn to_batch_prost_body(&self) -> NodeBody { - let exprs = self.core.exprs.iter().map(|x| x.to_expr_proto()).collect(); - - let update_column_indices = self - .core - .update_column_indices + let old_exprs = (self.core.old_exprs) + .iter() + .map(|x| x.to_expr_proto()) + .collect(); + let new_exprs = (self.core.new_exprs) .iter() - .map(|i| *i as _) - .collect_vec(); + .map(|x| x.to_expr_proto()) + .collect(); + NodeBody::Update(UpdateNode { - exprs, table_id: self.core.table_id.table_id(), table_version_id: self.core.table_version_id, returning: self.core.returning, - update_column_indices, + old_exprs, + new_exprs, session_id: self.base.ctx().session_ctx().session_id().0 as u32, }) } @@ -125,6 +126,6 @@ impl ExprRewritable for BatchUpdate { impl ExprVisitable for BatchUpdate { fn visit_exprs(&self, v: &mut dyn ExprVisitor) { - self.core.exprs.iter().for_each(|e| v.visit_expr(e)); + self.core.visit_exprs(v); } } diff --git a/src/frontend/src/optimizer/plan_node/generic/update.rs b/src/frontend/src/optimizer/plan_node/generic/update.rs index 61d044f53c998..d68af1a01ae3f 100644 --- a/src/frontend/src/optimizer/plan_node/generic/update.rs +++ b/src/frontend/src/optimizer/plan_node/generic/update.rs @@ -21,7 +21,7 @@ use risingwave_common::types::DataType; use super::{DistillUnit, GenericPlanNode, GenericPlanRef}; use crate::catalog::TableId; -use crate::expr::{ExprImpl, ExprRewriter}; +use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor}; use crate::optimizer::plan_node::utils::childless_record; use crate::optimizer::property::FunctionalDependencySet; use crate::OptimizerContextRef; @@ -35,15 +35,15 @@ pub struct Update { pub table_id: TableId, pub table_version_id: TableVersionId, pub input: PlanRef, - pub exprs: Vec, + pub old_exprs: Vec, + pub new_exprs: Vec, pub returning: bool, - pub update_column_indices: Vec, } impl Update { pub fn output_len(&self) -> usize { if self.returning { - self.input.schema().len() + self.new_exprs.len() } else { 1 } @@ -56,18 +56,19 @@ impl GenericPlanNode for Update { fn schema(&self) -> Schema { if self.returning { - self.input.schema().clone() + Schema::new( + self.new_exprs + .iter() + .map(|e| Field::unnamed(e.return_type())) + .collect(), + ) } else { Schema::new(vec![Field::unnamed(DataType::Int64)]) } } fn stream_key(&self) -> Option> { - if self.returning { - Some(self.input.stream_key()?.to_vec()) - } else { - Some(vec![]) - } + None } fn ctx(&self) -> OptimizerContextRef { @@ -81,27 +82,31 @@ impl Update { table_name: String, table_id: TableId, table_version_id: TableVersionId, - exprs: Vec, + old_exprs: Vec, + new_exprs: Vec, returning: bool, - update_column_indices: Vec, ) -> Self { Self { table_name, table_id, table_version_id, input, - exprs, + old_exprs, + new_exprs, returning, - update_column_indices, } } pub(crate) fn rewrite_exprs(&mut self, r: &mut dyn ExprRewriter) { - self.exprs = self - .exprs - .iter() - .map(|e| r.rewrite_expr(e.clone())) - .collect(); + for exprs in [&mut self.old_exprs, &mut self.new_exprs] { + *exprs = exprs.iter().map(|e| r.rewrite_expr(e.clone())).collect(); + } + } + + pub(crate) fn visit_exprs(&self, v: &mut dyn ExprVisitor) { + for exprs in [&self.old_exprs, &self.new_exprs] { + exprs.iter().for_each(|e| v.visit_expr(e)); + } } } @@ -109,7 +114,7 @@ impl DistillUnit for Update { fn distill_with_name<'a>(&self, name: impl Into>) -> XmlNode<'a> { let mut vec = Vec::with_capacity(if self.returning { 3 } else { 2 }); vec.push(("table", Pretty::from(self.table_name.clone()))); - vec.push(("exprs", Pretty::debug(&self.exprs))); + vec.push(("exprs", Pretty::debug(&self.new_exprs))); if self.returning { vec.push(("returning", Pretty::display(&true))); } diff --git a/src/frontend/src/optimizer/plan_node/logical_update.rs b/src/frontend/src/optimizer/plan_node/logical_update.rs index 127b6ed8b317b..f5d0bdc089644 100644 --- a/src/frontend/src/optimizer/plan_node/logical_update.rs +++ b/src/frontend/src/optimizer/plan_node/logical_update.rs @@ -46,25 +46,6 @@ impl From> for LogicalUpdate { } } -impl LogicalUpdate { - #[must_use] - pub fn table_id(&self) -> TableId { - self.core.table_id - } - - pub fn exprs(&self) -> &[ExprImpl] { - self.core.exprs.as_ref() - } - - pub fn has_returning(&self) -> bool { - self.core.returning - } - - pub fn table_version_id(&self) -> TableVersionId { - self.core.table_version_id - } -} - impl PlanTreeNodeUnary for LogicalUpdate { fn input(&self) -> PlanRef { self.core.input.clone() @@ -86,15 +67,15 @@ impl ExprRewritable for LogicalUpdate { } fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { - let mut new = self.core.clone(); - new.exprs = new.exprs.into_iter().map(|e| r.rewrite_expr(e)).collect(); - Self::from(new).into() + let mut core = self.core.clone(); + core.rewrite_exprs(r); + Self::from(core).into() } } impl ExprVisitable for LogicalUpdate { fn visit_exprs(&self, v: &mut dyn ExprVisitor) { - self.core.exprs.iter().for_each(|e| v.visit_expr(e)); + self.core.visit_exprs(v); } } diff --git a/src/frontend/src/planner/select.rs b/src/frontend/src/planner/select.rs index a9e7dd3526ed1..a1ae723520819 100644 --- a/src/frontend/src/planner/select.rs +++ b/src/frontend/src/planner/select.rs @@ -320,7 +320,7 @@ impl Planner { /// /// The [`InputRef`]s' indexes start from `root.schema().len()`, /// which means they are additional columns beyond the original `root`. - fn substitute_subqueries( + pub(super) fn substitute_subqueries( &mut self, mut root: PlanRef, mut exprs: Vec, @@ -369,7 +369,7 @@ impl Planner { let subroot = self.plan_query(subquery.query)?; let right = match subquery.kind { - SubqueryKind::Scalar => subroot.into_unordered_subplan(), + SubqueryKind::Scalar | SubqueryKind::UpdateSet => subroot.into_unordered_subplan(), SubqueryKind::Existential => { self.create_exists(subroot.into_unordered_subplan())? } diff --git a/src/frontend/src/planner/statement.rs b/src/frontend/src/planner/statement.rs index 91c1b9edfc619..dc957da66c016 100644 --- a/src/frontend/src/planner/statement.rs +++ b/src/frontend/src/planner/statement.rs @@ -22,7 +22,7 @@ impl Planner { match stmt { BoundStatement::Insert(i) => self.plan_insert(*i), BoundStatement::Delete(d) => self.plan_delete(*d), - BoundStatement::Update(u) => self.plan_update(*u), + BoundStatement::Update(u) => self.plan_update_v2(*u), BoundStatement::Query(q) => self.plan_query(*q), BoundStatement::FetchCursor(_) => unimplemented!(), BoundStatement::CreateView(c) => self.plan_query(*c.query), diff --git a/src/frontend/src/planner/update.rs b/src/frontend/src/planner/update.rs index ddf9ab0bdf9ae..7a381991d6370 100644 --- a/src/frontend/src/planner/update.rs +++ b/src/frontend/src/planner/update.rs @@ -13,41 +13,103 @@ // limitations under the License. use fixedbitset::FixedBitSet; -use itertools::Itertools; +use risingwave_common::types::{DataType, Scalar}; +use risingwave_pb::expr::expr_node::Type; use super::Planner; -use crate::binder::BoundUpdate; +use crate::binder::{BoundUpdateV2, UpdateProject}; use crate::error::Result; +use crate::expr::{ExprImpl, FunctionCall, InputRef, Literal}; +use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::{generic, LogicalProject, LogicalUpdate}; use crate::optimizer::property::{Order, RequiredDist}; use crate::optimizer::{PlanRef, PlanRoot}; impl Planner { - pub(super) fn plan_update(&mut self, update: BoundUpdate) -> Result { + pub(super) fn plan_update_v2(&mut self, update: BoundUpdateV2) -> Result { let scan = self.plan_base_table(&update.table)?; let input = if let Some(expr) = update.selection { self.plan_where(scan, expr)? } else { scan }; + let returning = !update.returning_list.is_empty(); - let update_column_indices = update - .table - .table_catalog - .columns() - .iter() - .enumerate() - .filter_map(|(i, c)| c.can_dml().then_some(i)) - .collect_vec(); + // let update_column_indices = update + // .table + // .table_catalog + // .columns() + // .iter() + // .enumerate() + // .filter_map(|(i, c)| (!c.is_generated()).then_some(i)) + // .collect_vec(); + + let schema_len = input.schema().len(); + + let with_new: PlanRef = { + let mut plan = input; + + let mut exprs: Vec = plan + .schema() + .data_types() + .into_iter() + .enumerate() + .map(|(index, data_type)| InputRef::new(index, data_type).into()) + .collect(); + + exprs.extend(update.exprs); + + if exprs.iter().any(|e| e.has_subquery()) { + (plan, exprs) = self.substitute_subqueries(plan, exprs)?; + } + + LogicalProject::new(plan, exprs).into() + }; + + let mut olds = Vec::new(); + let mut news = Vec::new(); + + for (i, col) in update.table.table_catalog.columns().iter().enumerate() { + if col.is_generated() { + continue; + } + let data_type = col.data_type(); + + let old: ExprImpl = InputRef::new(i, data_type.clone()).into(); + + let new: ExprImpl = match update.projects.get(&i).copied() { + Some(UpdateProject::Expr(index)) => { + InputRef::new(index + schema_len, data_type.clone()).into() + } + Some(UpdateProject::Composite(index, sub)) => FunctionCall::new_unchecked( + Type::Field, + vec![ + InputRef::new( + index + schema_len, + with_new.schema().data_types()[i + schema_len].clone(), + ) + .into(), + Literal::new(Some((sub as i32).to_scalar_value()), DataType::Int32).into(), + ], + data_type.clone(), + ) + .into(), + + None => old.clone(), + }; + + olds.push(old); + news.push(new); + } let mut plan: PlanRef = LogicalUpdate::from(generic::Update::new( - input, + with_new, update.table_name.clone(), update.table_id, update.table_version_id, - update.exprs, + olds, + news, returning, - update_column_indices, )) .into(); From 12f807a45cdf02d2363e21399406274d31a2c2b8 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Fri, 15 Nov 2024 17:42:01 +0800 Subject: [PATCH 02/11] fix subquery update set return type Signed-off-by: Bugen Zhao --- src/frontend/src/binder/expr/subquery.rs | 16 ++++++---------- src/frontend/src/error.rs | 2 +- src/frontend/src/planner/select.rs | 18 +++++++++++++++++- src/frontend/src/planner/update.rs | 2 +- 4 files changed, 25 insertions(+), 13 deletions(-) diff --git a/src/frontend/src/binder/expr/subquery.rs b/src/frontend/src/binder/expr/subquery.rs index 2924f344700ae..c31a5d653aeb5 100644 --- a/src/frontend/src/binder/expr/subquery.rs +++ b/src/frontend/src/binder/expr/subquery.rs @@ -15,20 +15,16 @@ use risingwave_sqlparser::ast::Query; use crate::binder::Binder; -use crate::error::{ErrorCode, Result}; +use crate::error::{bail_bind_error, Result}; use crate::expr::{ExprImpl, Subquery, SubqueryKind}; impl Binder { - pub fn bind_subquery_expr( - &mut self, - query: Query, - kind: SubqueryKind, - ) -> Result { + pub fn bind_subquery_expr(&mut self, query: Query, kind: SubqueryKind) -> Result { let query = self.bind_query(query)?; - if !matches!(kind, SubqueryKind::Existential) && query.data_types().len() != 1 { - return Err( - ErrorCode::BindError("Subquery must return only one column".to_string()).into(), - ); + if !matches!(kind, SubqueryKind::Existential | SubqueryKind::UpdateSet) + && query.data_types().len() != 1 + { + bail_bind_error!("Subquery must return only one column"); } Ok(Subquery::new(query, kind).into()) } diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 8adc1573e4a37..f0cf35e859664 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -34,7 +34,7 @@ use tokio::task::JoinError; // - Some variants store a type-erased `BoxedError` to resolve the reverse dependency. // It's not necessary anymore as the error type is now defined at the top-level. #[derive(Error, thiserror_ext::ReportDebug, thiserror_ext::Box, thiserror_ext::Macro)] -#[thiserror_ext(newtype(name = RwError, backtrace))] +#[thiserror_ext(newtype(name = RwError, backtrace), macro(path = "crate::error"))] pub enum ErrorCode { #[error("internal error: {0}")] InternalError(String), diff --git a/src/frontend/src/planner/select.rs b/src/frontend/src/planner/select.rs index a1ae723520819..2c5d7169d1b84 100644 --- a/src/frontend/src/planner/select.rs +++ b/src/frontend/src/planner/select.rs @@ -366,10 +366,26 @@ impl Planner { .zip_eq_fast(rewriter.correlated_indices_collection) .zip_eq_fast(rewriter.correlated_ids) { + let return_type = subquery.return_type(); let subroot = self.plan_query(subquery.query)?; let right = match subquery.kind { - SubqueryKind::Scalar | SubqueryKind::UpdateSet => subroot.into_unordered_subplan(), + SubqueryKind::Scalar => subroot.into_unordered_subplan(), + SubqueryKind::UpdateSet => { + let plan = subroot.into_unordered_subplan(); + + let all_input_refs = plan + .schema() + .data_types() + .into_iter() + .enumerate() + .map(|(i, data_type)| InputRef::new(i, data_type).into()) + .collect::>(); + + let call = + FunctionCall::new_unchecked(ExprType::Row, all_input_refs, return_type); + LogicalProject::create(plan, vec![call.into()]) + } SubqueryKind::Existential => { self.create_exists(subroot.into_unordered_subplan())? } diff --git a/src/frontend/src/planner/update.rs b/src/frontend/src/planner/update.rs index 7a381991d6370..8ad7d9e036314 100644 --- a/src/frontend/src/planner/update.rs +++ b/src/frontend/src/planner/update.rs @@ -86,7 +86,7 @@ impl Planner { vec![ InputRef::new( index + schema_len, - with_new.schema().data_types()[i + schema_len].clone(), + with_new.schema().data_types()[index + schema_len].clone(), ) .into(), Literal::new(Some((sub as i32).to_scalar_value()), DataType::Int32).into(), From d387ca72cb4a96425210f1c86b3819a087e4a0e5 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Fri, 15 Nov 2024 17:52:08 +0800 Subject: [PATCH 03/11] code cleanups Signed-off-by: Bugen Zhao --- proto/batch_plan.proto | 3 - src/frontend/src/binder/mod.rs | 2 +- src/frontend/src/binder/statement.rs | 6 +- src/frontend/src/binder/update.rs | 211 +----------------- .../src/optimizer/plan_node/batch_update.rs | 1 - .../src/optimizer/plan_node/logical_update.rs | 5 +- src/frontend/src/planner/statement.rs | 2 +- src/frontend/src/planner/update.rs | 12 +- 8 files changed, 14 insertions(+), 228 deletions(-) diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index 577bbe327dc3c..e9d76097b0a77 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -178,9 +178,6 @@ message UpdateNode { repeated expr.ExprNode new_exprs = 4; bool returning = 5; - // // The columns indices in the input schema, representing the columns need to send to streamDML exeuctor. - // repeated uint32 update_column_indices = 5; - // Session id is used to ensure that dml data from the same session should be sent to a fixed worker node and channel. uint32 session_id = 6; } diff --git a/src/frontend/src/binder/mod.rs b/src/frontend/src/binder/mod.rs index df3f091fb2a10..96b242b188297 100644 --- a/src/frontend/src/binder/mod.rs +++ b/src/frontend/src/binder/mod.rs @@ -57,7 +57,7 @@ pub use relation::{ pub use select::{BoundDistinct, BoundSelect}; pub use set_expr::*; pub use statement::BoundStatement; -pub use update::{BoundUpdate, BoundUpdateV2, UpdateProject}; +pub use update::{BoundUpdate, UpdateProject}; pub use values::BoundValues; use crate::catalog::catalog_service::CatalogReadGuard; diff --git a/src/frontend/src/binder/statement.rs b/src/frontend/src/binder/statement.rs index ad09b31f2c031..b73fab90aed9a 100644 --- a/src/frontend/src/binder/statement.rs +++ b/src/frontend/src/binder/statement.rs @@ -18,7 +18,7 @@ use risingwave_sqlparser::ast::Statement; use super::delete::BoundDelete; use super::fetch_cursor::BoundFetchCursor; -use super::update::BoundUpdateV2; +use super::update::BoundUpdate; use crate::binder::create_view::BoundCreateView; use crate::binder::{Binder, BoundInsert, BoundQuery}; use crate::error::Result; @@ -28,7 +28,7 @@ use crate::expr::ExprRewriter; pub enum BoundStatement { Insert(Box), Delete(Box), - Update(Box), + Update(Box), Query(Box), FetchCursor(Box), CreateView(Box), @@ -86,7 +86,7 @@ impl Binder { selection, returning, } => Ok(BoundStatement::Update( - self.bind_update_v2(table_name, assignments, selection, returning)? + self.bind_update(table_name, assignments, selection, returning)? .into(), )), diff --git a/src/frontend/src/binder/update.rs b/src/frontend/src/binder/update.rs index 22693381fc241..9d2dda1a7eea4 100644 --- a/src/frontend/src/binder/update.rs +++ b/src/frontend/src/binder/update.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap}; use fixedbitset::FixedBitSet; @@ -26,60 +25,10 @@ use super::statement::RewriteExprsRecursive; use super::{Binder, BoundBaseTable}; use crate::catalog::TableId; use crate::error::{bail_bind_error, ErrorCode, Result, RwError}; -use crate::expr::{Expr as _, ExprImpl, InputRef, SubqueryKind}; +use crate::expr::{Expr as _, ExprImpl, SubqueryKind}; use crate::user::UserId; use crate::TableCatalog; -#[derive(Debug, Clone)] -pub struct BoundUpdate { - /// Id of the table to perform updating. - pub table_id: TableId, - - /// Version id of the table. - pub table_version_id: TableVersionId, - - /// Name of the table to perform updating. - pub table_name: String, - - /// Owner of the table to perform updating. - pub owner: UserId, - - /// Used for scanning the records to update with the `selection`. - pub table: BoundBaseTable, - - pub selection: Option, - - /// Expression used to project to the updated row. The assigned columns will use the new - /// expression, and the other columns will be simply `InputRef`. - pub exprs: Vec, - - // used for the 'RETURNING" keyword to indicate the returning items and schema - // if the list is empty and the schema is None, the output schema will be a INT64 as the - // affected row cnt - pub returning_list: Vec, - - pub returning_schema: Option, -} - -impl RewriteExprsRecursive for BoundUpdate { - fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) { - self.selection = - std::mem::take(&mut self.selection).map(|expr| rewriter.rewrite_expr(expr)); - - let new_exprs = std::mem::take(&mut self.exprs) - .into_iter() - .map(|expr| rewriter.rewrite_expr(expr)) - .collect::>(); - self.exprs = new_exprs; - - let new_returning_list = std::mem::take(&mut self.returning_list) - .into_iter() - .map(|expr| rewriter.rewrite_expr(expr)) - .collect::>(); - self.returning_list = new_returning_list; - } -} - #[derive(Debug, Clone, Copy)] pub enum UpdateProject { Expr(usize), @@ -87,7 +36,7 @@ pub enum UpdateProject { } #[derive(Debug, Clone)] -pub struct BoundUpdateV2 { +pub struct BoundUpdate { /// Id of the table to perform updating. pub table_id: TableId, @@ -119,7 +68,7 @@ pub struct BoundUpdateV2 { pub returning_schema: Option, } -impl RewriteExprsRecursive for BoundUpdateV2 { +impl RewriteExprsRecursive for BoundUpdate { fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) { self.selection = std::mem::take(&mut self.selection).map(|expr| rewriter.rewrite_expr(expr)); @@ -156,161 +105,13 @@ fn get_col_referenced_by_generated_pk(table_catalog: &TableCatalog) -> Result, - // selection: Option, - // returning_items: Vec, - // ) -> Result { - // let (schema_name, table_name) = Self::resolve_schema_qualified_name(&self.db_name, name)?; - - // let table_catalog = self.resolve_dml_table(schema_name.as_deref(), &table_name, false)?; - // let default_columns_from_catalog = - // table_catalog.default_columns().collect::>(); - // if !returning_items.is_empty() && table_catalog.has_generated_column() { - // return Err(RwError::from(ErrorCode::BindError( - // "`RETURNING` clause is not supported for tables with generated columns".to_string(), - // ))); - // } - - // let table_id = table_catalog.id; - // let owner = table_catalog.owner; - // let table_version_id = table_catalog.version_id().expect("table must be versioned"); - // let cols_refed_by_generated_pk = get_col_referenced_by_generated_pk(table_catalog)?; - - // let table = self.bind_table(schema_name.as_deref(), &table_name, None)?; - - // let selection = selection.map(|expr| self.bind_expr(expr)).transpose()?; - - // let mut assignment_exprs = HashMap::new(); - // for Assignment { id, value } in assignments { - // // FIXME: Parsing of `id` is not strict. It will even treat `a.b` as `(a, b)`. - // let assignments = match (id.as_slice(), value) { - // // _ = (subquery) - // (_ids, AssignmentValue::Expr(Expr::Subquery(_))) => { - // return Err(ErrorCode::BindError( - // "subquery on the right side of assignment is unsupported".to_owned(), - // ) - // .into()) - // } - // // col = expr - // ([id], value) => { - // vec![(id.clone(), value)] - // } - // // (col1, col2) = (expr1, expr2) - // // TODO: support `DEFAULT` in multiple assignments - // (ids, AssignmentValue::Expr(Expr::Row(values))) if ids.len() == values.len() => id - // .into_iter() - // .zip_eq_fast(values.into_iter().map(AssignmentValue::Expr)) - // .collect(), - // // (col1, col2) = - // _ => { - // return Err(ErrorCode::BindError( - // "number of columns does not match number of values".to_owned(), - // ) - // .into()) - // } - // }; - - // for (id, value) in assignments { - // let id_expr = self.bind_expr(Expr::Identifier(id.clone()))?; - // let id_index = if let Some(id_input_ref) = id_expr.clone().as_input_ref() { - // let id_index = id_input_ref.index; - // if table - // .table_catalog - // .pk() - // .iter() - // .any(|k| k.column_index == id_index) - // { - // return Err(ErrorCode::BindError( - // "update modifying the PK column is unsupported".to_owned(), - // ) - // .into()); - // } - // if table - // .table_catalog - // .generated_col_idxes() - // .contains(&id_index) - // { - // return Err(ErrorCode::BindError( - // "update modifying the generated column is unsupported".to_owned(), - // ) - // .into()); - // } - // if cols_refed_by_generated_pk.contains(id_index) { - // return Err(ErrorCode::BindError( - // "update modifying the column referenced by generated columns that are part of the primary key is not allowed".to_owned(), - // ) - // .into()); - // } - // id_index - // } else { - // unreachable!() - // }; - - // let value_expr = match value { - // AssignmentValue::Expr(expr) => { - // self.bind_expr(expr)?.cast_assign(id_expr.return_type())? - // } - // AssignmentValue::Default => default_columns_from_catalog - // .get(&id_index) - // .cloned() - // .unwrap_or_else(|| ExprImpl::literal_null(id_expr.return_type())), - // }; - - // match assignment_exprs.entry(id_expr) { - // Entry::Occupied(_) => { - // return Err(ErrorCode::BindError( - // "multiple assignments to same column".to_owned(), - // ) - // .into()) - // } - // Entry::Vacant(v) => { - // v.insert(value_expr); - // } - // } - // } - // } - - // let exprs = table - // .table_catalog - // .columns() - // .iter() - // .enumerate() - // .filter_map(|(i, c)| { - // (!c.is_generated()).then_some(InputRef::new(i, c.data_type().clone()).into()) - // }) - // .map(|c| assignment_exprs.remove(&c).unwrap_or(c)) - // .collect_vec(); - - // let (returning_list, fields) = self.bind_returning_list(returning_items)?; - // let returning = !returning_list.is_empty(); - - // Ok(BoundUpdate { - // table_id, - // table_version_id, - // table_name, - // owner, - // table, - // selection, - // exprs, - // returning_list, - // returning_schema: if returning { - // Some(Schema { fields }) - // } else { - // None - // }, - // }) - // } - - pub(super) fn bind_update_v2( + pub(super) fn bind_update( &mut self, name: ObjectName, assignments: Vec, selection: Option, returning_items: Vec, - ) -> Result { + ) -> Result { let (schema_name, table_name) = Self::resolve_schema_qualified_name(&self.db_name, name)?; let table_catalog = self.resolve_dml_table(schema_name.as_deref(), &table_name, false)?; @@ -441,7 +242,7 @@ impl Binder { let (returning_list, fields) = self.bind_returning_list(returning_items)?; let returning = !returning_list.is_empty(); - Ok(BoundUpdateV2 { + Ok(BoundUpdate { table_id, table_version_id, table_name, diff --git a/src/frontend/src/optimizer/plan_node/batch_update.rs b/src/frontend/src/optimizer/plan_node/batch_update.rs index 9a5891d27d835..28dfa79916cc9 100644 --- a/src/frontend/src/optimizer/plan_node/batch_update.rs +++ b/src/frontend/src/optimizer/plan_node/batch_update.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use itertools::Itertools; use risingwave_common::catalog::Schema; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::UpdateNode; diff --git a/src/frontend/src/optimizer/plan_node/logical_update.rs b/src/frontend/src/optimizer/plan_node/logical_update.rs index f5d0bdc089644..a5590501715b9 100644 --- a/src/frontend/src/optimizer/plan_node/logical_update.rs +++ b/src/frontend/src/optimizer/plan_node/logical_update.rs @@ -12,17 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::catalog::TableVersionId; - use super::generic::GenericPlanRef; use super::utils::impl_distill_by_unit; use super::{ gen_filter_and_pushdown, generic, BatchUpdate, ColPrunable, ExprRewritable, Logical, LogicalProject, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream, }; -use crate::catalog::TableId; use crate::error::Result; -use crate::expr::{ExprImpl, ExprRewriter, ExprVisitor}; +use crate::expr::{ExprRewriter, ExprVisitor}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::{ ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext, diff --git a/src/frontend/src/planner/statement.rs b/src/frontend/src/planner/statement.rs index dc957da66c016..91c1b9edfc619 100644 --- a/src/frontend/src/planner/statement.rs +++ b/src/frontend/src/planner/statement.rs @@ -22,7 +22,7 @@ impl Planner { match stmt { BoundStatement::Insert(i) => self.plan_insert(*i), BoundStatement::Delete(d) => self.plan_delete(*d), - BoundStatement::Update(u) => self.plan_update_v2(*u), + BoundStatement::Update(u) => self.plan_update(*u), BoundStatement::Query(q) => self.plan_query(*q), BoundStatement::FetchCursor(_) => unimplemented!(), BoundStatement::CreateView(c) => self.plan_query(*c.query), diff --git a/src/frontend/src/planner/update.rs b/src/frontend/src/planner/update.rs index 8ad7d9e036314..1e827c72764c9 100644 --- a/src/frontend/src/planner/update.rs +++ b/src/frontend/src/planner/update.rs @@ -17,7 +17,7 @@ use risingwave_common::types::{DataType, Scalar}; use risingwave_pb::expr::expr_node::Type; use super::Planner; -use crate::binder::{BoundUpdateV2, UpdateProject}; +use crate::binder::{BoundUpdate, UpdateProject}; use crate::error::Result; use crate::expr::{ExprImpl, FunctionCall, InputRef, Literal}; use crate::optimizer::plan_node::generic::GenericPlanRef; @@ -26,7 +26,7 @@ use crate::optimizer::property::{Order, RequiredDist}; use crate::optimizer::{PlanRef, PlanRoot}; impl Planner { - pub(super) fn plan_update_v2(&mut self, update: BoundUpdateV2) -> Result { + pub(super) fn plan_update(&mut self, update: BoundUpdate) -> Result { let scan = self.plan_base_table(&update.table)?; let input = if let Some(expr) = update.selection { self.plan_where(scan, expr)? @@ -35,14 +35,6 @@ impl Planner { }; let returning = !update.returning_list.is_empty(); - // let update_column_indices = update - // .table - // .table_catalog - // .columns() - // .iter() - // .enumerate() - // .filter_map(|(i, c)| (!c.is_generated()).then_some(i)) - // .collect_vec(); let schema_len = input.schema().len(); From 104e11d23a2ab402997821f0bcca062faab7a443 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 18 Nov 2024 14:43:17 +0800 Subject: [PATCH 04/11] refine docs Signed-off-by: Bugen Zhao --- proto/batch_plan.proto | 2 + src/batch/src/executor/update.rs | 6 +- src/frontend/src/binder/update.rs | 98 ++++++++++++++++-------------- src/frontend/src/expr/subquery.rs | 2 + src/frontend/src/planner/select.rs | 3 +- src/frontend/src/planner/update.rs | 41 ++++++------- 6 files changed, 81 insertions(+), 71 deletions(-) diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index e9d76097b0a77..f10092d952ac7 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -174,7 +174,9 @@ message UpdateNode { uint32 table_id = 1; // Version of the table. uint64 table_version_id = 2; + // Expressions to generate `U-` records. repeated expr.ExprNode old_exprs = 3; + // Expressions to generate `U+` records. repeated expr.ExprNode new_exprs = 4; bool returning = 5; diff --git a/src/batch/src/executor/update.rs b/src/batch/src/executor/update.rs index e9afc4748cd4a..95f1963cf582e 100644 --- a/src/batch/src/executor/update.rs +++ b/src/batch/src/executor/update.rs @@ -109,7 +109,7 @@ impl Executor for UpdateExecutor { impl UpdateExecutor { #[try_stream(boxed, ok = DataChunk, error = BatchError)] - async fn do_execute(mut self: Box) { + async fn do_execute(self: Box) { let table_dml_handle = self .dml_manager .table_dml_handle(self.table_id, self.table_version_id)?; @@ -161,7 +161,7 @@ impl UpdateExecutor { let old_data_chunk = { let mut columns = Vec::with_capacity(self.old_exprs.len()); - for expr in &mut self.old_exprs { + for expr in &self.old_exprs { let column = expr.eval(&input).await?; columns.push(column); } @@ -171,7 +171,7 @@ impl UpdateExecutor { let updated_data_chunk = { let mut columns = Vec::with_capacity(self.new_exprs.len()); - for expr in &mut self.new_exprs { + for expr in &self.new_exprs { let column = expr.eval(&input).await?; columns.push(column); } diff --git a/src/frontend/src/binder/update.rs b/src/frontend/src/binder/update.rs index 9d2dda1a7eea4..24982b1c76f0b 100644 --- a/src/frontend/src/binder/update.rs +++ b/src/frontend/src/binder/update.rs @@ -24,17 +24,30 @@ use risingwave_sqlparser::ast::{Assignment, AssignmentValue, Expr, ObjectName, S use super::statement::RewriteExprsRecursive; use super::{Binder, BoundBaseTable}; use crate::catalog::TableId; -use crate::error::{bail_bind_error, ErrorCode, Result, RwError}; +use crate::error::{bail_bind_error, bind_error, ErrorCode, Result, RwError}; use crate::expr::{Expr as _, ExprImpl, SubqueryKind}; use crate::user::UserId; use crate::TableCatalog; +/// Project into `exprs` in `BoundUpdate` to get the new values for updating. #[derive(Debug, Clone, Copy)] pub enum UpdateProject { - Expr(usize), + /// Use the expression at the given index in `exprs`. + Simple(usize), + /// Use the `i`-th field of the expression (returning a struct) at the given index in `exprs`. Composite(usize, usize), } +impl UpdateProject { + /// Offset the index by `i`. + pub fn offset(self, i: usize) -> Self { + match self { + UpdateProject::Simple(index) => UpdateProject::Simple(index + i), + UpdateProject::Composite(index, j) => UpdateProject::Composite(index, j), + } + } +} + #[derive(Debug, Clone)] pub struct BoundUpdate { /// Id of the table to perform updating. @@ -54,12 +67,14 @@ pub struct BoundUpdate { pub selection: Option, - pub projects: HashMap, - - /// Expression used to project to the updated row. The assigned columns will use the new - /// expression, and the other columns will be simply `InputRef`. + /// Expression used to evaluate the new values for the columns. pub exprs: Vec, + /// Mapping from the index of the column to be updated, to the index of the expression in `exprs`. + /// + /// By constructing two `Project` nodes with `exprs` and `projects`, we can get the new values. + pub projects: HashMap, + // used for the 'RETURNING" keyword to indicate the returning items and schema // if the list is empty and the schema is None, the output schema will be a INT64 as the // affected row cnt @@ -135,6 +150,15 @@ impl Binder { let mut exprs = Vec::new(); let mut projects = HashMap::new(); + macro_rules! record { + ($id:expr, $project:expr) => { + let id_index = $id.as_input_ref().unwrap().index; + projects + .try_insert(id_index, $project) + .map_err(|_e| bind_error!("multiple assignments to the same column"))?; + }; + } + for Assignment { id, value } in assignments { let ids: Vec<_> = id .into_iter() @@ -142,79 +166,65 @@ impl Binder { .try_collect()?; match (ids.as_slice(), value) { - ([id], value) => { - let id_index = id.as_input_ref().unwrap().index; - - let expr = match value { - AssignmentValue::Expr(expr) => { - self.bind_expr(expr)?.cast_assign(id.return_type())? - } - AssignmentValue::Default => default_columns_from_catalog - .get(&id_index) - .cloned() - .unwrap_or_else(|| ExprImpl::literal_null(id.return_type())), - }; - - exprs.push(expr); - projects - .try_insert(id_index, UpdateProject::Expr(exprs.len() - 1)) - .expect("multiple assignments"); - } + // `SET col1 = DEFAULT`, `SET (col1, col2, ...) = DEFAULT` (ids, AssignmentValue::Default) => { for id in ids { let id_index = id.as_input_ref().unwrap().index; - let expr = default_columns_from_catalog .get(&id_index) .cloned() .unwrap_or_else(|| ExprImpl::literal_null(id.return_type())); exprs.push(expr); - projects - .try_insert(id_index, UpdateProject::Expr(exprs.len() - 1)) - .expect("multiple assignments"); + record!(id, UpdateProject::Simple(exprs.len() - 1)); } } + + // `SET col1 = expr` + ([id], AssignmentValue::Expr(expr)) => { + let expr = self.bind_expr(expr)?.cast_assign(id.return_type())?; + exprs.push(expr); + record!(id, UpdateProject::Simple(exprs.len() - 1)); + } + // `SET (col1, col2, ...) = (val1, val2, ...)` (ids, AssignmentValue::Expr(Expr::Row(values))) => { if ids.len() != values.len() { bail_bind_error!("number of columns does not match number of values"); } for (id, value) in ids.iter().zip_eq_fast(values) { - let id_index = id.as_input_ref().unwrap().index; - let expr = self.bind_expr(value)?.cast_assign(id.return_type())?; - exprs.push(expr); - projects - .try_insert(id_index, UpdateProject::Expr(exprs.len() - 1)) - .expect("multiple assignments"); + record!(id, UpdateProject::Simple(exprs.len() - 1)); } } + // `SET (col1, col2, ...) = (SELECT ...)` (ids, AssignmentValue::Expr(Expr::Subquery(subquery))) => { + let expr = self.bind_subquery_expr(*subquery, SubqueryKind::UpdateSet)?; + + if expr.return_type().as_struct().len() != ids.len() { + bail_bind_error!("number of columns does not match number of values"); + } + let target_type = DataType::new_unnamed_struct( ids.iter().map(|id| id.return_type()).collect(), ); - - let expr = self - .bind_subquery_expr(*subquery, SubqueryKind::UpdateSet)? - .cast_assign(target_type)?; + let expr = expr.cast_assign(target_type)?; exprs.push(expr); for (i, id) in ids.iter().enumerate() { - let id_index = id.as_input_ref().unwrap().index; - projects - .try_insert(id_index, UpdateProject::Composite(exprs.len() - 1, i)) - .expect("multiple assignments"); + record!(id, UpdateProject::Composite(exprs.len() - 1, i)); } } - (_ids, _expr) => { - bail_bind_error!("unsupported assignment"); + + (_ids, AssignmentValue::Expr(_expr)) => { + bail_bind_error!("source for a multiple-column UPDATE item must be a sub-SELECT or ROW() expression"); } } } + // Check whether updating these columns is allowed. for &id_index in projects.keys() { if (table.table_catalog.pk()) .iter() diff --git a/src/frontend/src/expr/subquery.rs b/src/frontend/src/expr/subquery.rs index 8ad682d8948ba..8460f73d5fbba 100644 --- a/src/frontend/src/expr/subquery.rs +++ b/src/frontend/src/expr/subquery.rs @@ -24,6 +24,8 @@ use crate::expr::{CorrelatedId, Depth}; pub enum SubqueryKind { /// Returns a scalar value (single column single row). Scalar, + /// Returns a scalar struct value composed of multiple columns. + /// Used in `UPDATE SET (col1, col2) = (SELECT ...)`. UpdateSet, /// `EXISTS` | `NOT EXISTS` subquery (semi/anti-semi join). Returns a boolean. Existential, diff --git a/src/frontend/src/planner/select.rs b/src/frontend/src/planner/select.rs index 2c5d7169d1b84..ebed01351f7d1 100644 --- a/src/frontend/src/planner/select.rs +++ b/src/frontend/src/planner/select.rs @@ -374,6 +374,7 @@ impl Planner { SubqueryKind::UpdateSet => { let plan = subroot.into_unordered_subplan(); + // Compose all input columns into a struct with `ROW` function. let all_input_refs = plan .schema() .data_types() @@ -381,9 +382,9 @@ impl Planner { .enumerate() .map(|(i, data_type)| InputRef::new(i, data_type).into()) .collect::>(); - let call = FunctionCall::new_unchecked(ExprType::Row, all_input_refs, return_type); + LogicalProject::create(plan, vec![call.into()]) } SubqueryKind::Existential => { diff --git a/src/frontend/src/planner/update.rs b/src/frontend/src/planner/update.rs index 1e827c72764c9..fb07dfa9a11e7 100644 --- a/src/frontend/src/planner/update.rs +++ b/src/frontend/src/planner/update.rs @@ -36,8 +36,7 @@ impl Planner { let returning = !update.returning_list.is_empty(); - let schema_len = input.schema().len(); - + // Extend table scan with updated columns. let with_new: PlanRef = { let mut plan = input; @@ -62,33 +61,29 @@ impl Planner { let mut news = Vec::new(); for (i, col) in update.table.table_catalog.columns().iter().enumerate() { - if col.is_generated() { + if !col.can_dml() { continue; } let data_type = col.data_type(); let old: ExprImpl = InputRef::new(i, data_type.clone()).into(); - let new: ExprImpl = match update.projects.get(&i).copied() { - Some(UpdateProject::Expr(index)) => { - InputRef::new(index + schema_len, data_type.clone()).into() - } - Some(UpdateProject::Composite(index, sub)) => FunctionCall::new_unchecked( - Type::Field, - vec![ - InputRef::new( - index + schema_len, - with_new.schema().data_types()[index + schema_len].clone(), - ) - .into(), - Literal::new(Some((sub as i32).to_scalar_value()), DataType::Int32).into(), - ], - data_type.clone(), - ) - .into(), - - None => old.clone(), - }; + let new: ExprImpl = + match (update.projects.get(&i)).map(|p| p.offset(with_new.schema().len())) { + Some(UpdateProject::Simple(j)) => InputRef::new(j, data_type.clone()).into(), + Some(UpdateProject::Composite(j, field)) => FunctionCall::new_unchecked( + Type::Field, + vec![ + InputRef::new(j, with_new.schema().data_types()[j].clone()).into(), + Literal::new(Some((field as i32).to_scalar_value()), DataType::Int32) + .into(), + ], + data_type.clone(), + ) + .into(), + + None => old.clone(), + }; olds.push(old); news.push(new); From 8c6b7aa0ca7aeddbf9df62a730174d0b126c529c Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 18 Nov 2024 15:11:54 +0800 Subject: [PATCH 05/11] fix index bug Signed-off-by: Bugen Zhao --- src/frontend/src/binder/update.rs | 2 +- src/frontend/src/planner/update.rs | 38 ++++++++++++++++-------------- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/src/frontend/src/binder/update.rs b/src/frontend/src/binder/update.rs index 24982b1c76f0b..9fce6bf44bd98 100644 --- a/src/frontend/src/binder/update.rs +++ b/src/frontend/src/binder/update.rs @@ -43,7 +43,7 @@ impl UpdateProject { pub fn offset(self, i: usize) -> Self { match self { UpdateProject::Simple(index) => UpdateProject::Simple(index + i), - UpdateProject::Composite(index, j) => UpdateProject::Composite(index, j), + UpdateProject::Composite(index, j) => UpdateProject::Composite(index + i, j), } } } diff --git a/src/frontend/src/planner/update.rs b/src/frontend/src/planner/update.rs index fb07dfa9a11e7..2db18ac0e2924 100644 --- a/src/frontend/src/planner/update.rs +++ b/src/frontend/src/planner/update.rs @@ -27,14 +27,15 @@ use crate::optimizer::{PlanRef, PlanRoot}; impl Planner { pub(super) fn plan_update(&mut self, update: BoundUpdate) -> Result { + let returning = !update.returning_list.is_empty(); + let scan = self.plan_base_table(&update.table)?; let input = if let Some(expr) = update.selection { self.plan_where(scan, expr)? } else { scan }; - - let returning = !update.returning_list.is_empty(); + let old_schema_len = input.schema().len(); // Extend table scan with updated columns. let with_new: PlanRef = { @@ -50,6 +51,7 @@ impl Planner { exprs.extend(update.exprs); + // Substitute subqueries into `LogicalApply`s. if exprs.iter().any(|e| e.has_subquery()) { (plan, exprs) = self.substitute_subqueries(plan, exprs)?; } @@ -61,6 +63,7 @@ impl Planner { let mut news = Vec::new(); for (i, col) in update.table.table_catalog.columns().iter().enumerate() { + // Skip generated columns and system columns. if !col.can_dml() { continue; } @@ -68,22 +71,21 @@ impl Planner { let old: ExprImpl = InputRef::new(i, data_type.clone()).into(); - let new: ExprImpl = - match (update.projects.get(&i)).map(|p| p.offset(with_new.schema().len())) { - Some(UpdateProject::Simple(j)) => InputRef::new(j, data_type.clone()).into(), - Some(UpdateProject::Composite(j, field)) => FunctionCall::new_unchecked( - Type::Field, - vec![ - InputRef::new(j, with_new.schema().data_types()[j].clone()).into(), - Literal::new(Some((field as i32).to_scalar_value()), DataType::Int32) - .into(), - ], - data_type.clone(), - ) - .into(), - - None => old.clone(), - }; + let new: ExprImpl = match (update.projects.get(&i)).map(|p| p.offset(old_schema_len)) { + Some(UpdateProject::Simple(j)) => InputRef::new(j, data_type.clone()).into(), + Some(UpdateProject::Composite(j, field)) => FunctionCall::new_unchecked( + Type::Field, + vec![ + InputRef::new(j, with_new.schema().data_types()[j].clone()).into(), // struct + Literal::new(Some((field as i32).to_scalar_value()), DataType::Int32) + .into(), + ], + data_type.clone(), + ) + .into(), + + None => old.clone(), + }; olds.push(old); news.push(new); From 86c5a667ffb2dac6106c235185b9a3096ed2a2bd Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 18 Nov 2024 15:53:02 +0800 Subject: [PATCH 06/11] add tests Signed-off-by: Bugen Zhao --- e2e_test/batch/basic/dml_update.slt.part | 108 +++++++++++++++++++++++ 1 file changed, 108 insertions(+) create mode 100644 e2e_test/batch/basic/dml_update.slt.part diff --git a/e2e_test/batch/basic/dml_update.slt.part b/e2e_test/batch/basic/dml_update.slt.part new file mode 100644 index 0000000000000..cff0f23955b7d --- /dev/null +++ b/e2e_test/batch/basic/dml_update.slt.part @@ -0,0 +1,108 @@ +# Extension to `dml.slt.part` for testing advanced `UPDATE` statements. + +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table t (v1 int default 1919, v2 int default 810); + +statement ok +insert into t values (114, 514); + + +# Single assignment, to subquery. +statement ok +update t set v1 = (select 666); + +query II +select * from t; +---- +666 514 + +# Single assignment, to runtime-cardinality subquery returning 1 row. +statement ok +update t set v1 = (select generate_series(888, 888)); + +query II +select * from t; +---- +888 514 + +# Single assignment, to runtime-cardinality subquery returning 0 rows (set to NULL). +statement ok +update t set v1 = (select generate_series(1, 0)); + +query II +select * from t; +---- +NULL 514 + +# Single assignment, to runtime-cardinality subquery returning multiple rows. +statement error Scalar subquery produced more than one row +update t set v1 = (select generate_series(1, 2)); + +# Single assignment, to correlated subquery. +statement ok +update t set v1 = (select count(*) from t as source where source.v2 = t.v2); + +query II +select * from t; +---- +1 514 + +# Single assignment, to subquery with mismatched column count. +statement error must return only one column +update t set v1 = (select 666, 888); + + + +# Multiple assignment clauses. +statement ok +update t set v1 = 1919, v2 = 810; + +query II +select * from t; +---- +1919 810 + +# Multiple assignments to the same column. +statement error multiple assignments to the same column +update t set v1 = 1, v1 = 2; + +statement error multiple assignments to the same column +update t set (v1, v1) = (1, 2); + +statement error multiple assignments to the same column +update t set (v1, v2) = (1, 2), v2 = 2; + +# Multiple assignments, to subquery. +statement ok +update t set (v1, v2) = (select 666, 888); + +query II +select * from t; +---- +666 888 + +# Multiple assignments, to subquery with cast. +statement ok +update t set (v1, v2) = (select 888.88, 999); + +query II +select * from t; +---- +889 999 + +# Multiple assignments, to subquery with cast failure. +# TODO: this currently shows `cannot cast type "record" to "record"` because we wrap the subquery result +# into a struct, which is not quite clear. +statement error cannot cast type +update t set (v1, v2) = (select '888.88', 999); + +# Multiple assignments, to subquery with mismatched column count. +statement error number of columns does not match number of values +update t set (v1, v2) = (select 666); + + +statement ok +drop table t; From 668413cbccb5495e393fe569b303b0591b258478 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 18 Nov 2024 15:59:49 +0800 Subject: [PATCH 07/11] refine tests Signed-off-by: Bugen Zhao --- e2e_test/batch/basic/{dml.slt.part => dml_basic.slt.part} | 0 e2e_test/batch/basic/dml_update.slt.part | 6 +++++- 2 files changed, 5 insertions(+), 1 deletion(-) rename e2e_test/batch/basic/{dml.slt.part => dml_basic.slt.part} (100%) diff --git a/e2e_test/batch/basic/dml.slt.part b/e2e_test/batch/basic/dml_basic.slt.part similarity index 100% rename from e2e_test/batch/basic/dml.slt.part rename to e2e_test/batch/basic/dml_basic.slt.part diff --git a/e2e_test/batch/basic/dml_update.slt.part b/e2e_test/batch/basic/dml_update.slt.part index cff0f23955b7d..580a74ba44368 100644 --- a/e2e_test/batch/basic/dml_update.slt.part +++ b/e2e_test/batch/basic/dml_update.slt.part @@ -1,4 +1,4 @@ -# Extension to `dml.slt.part` for testing advanced `UPDATE` statements. +# Extension to `dml_basic.slt.part` for testing advanced `UPDATE` statements. statement ok SET RW_IMPLICIT_FLUSH TO true; @@ -103,6 +103,10 @@ update t set (v1, v2) = (select '888.88', 999); statement error number of columns does not match number of values update t set (v1, v2) = (select 666); +# Multiple assignments, to scalar expression. +statement error source for a multiple-column UPDATE item must be a sub-SELECT or ROW\(\) expression +update t set (v1, v2) = v1 + 1; + statement ok drop table t; From 1e5387149a189e5c99140736f7e03328d2fef23f Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 18 Nov 2024 16:07:58 +0800 Subject: [PATCH 08/11] disallow updating system col Signed-off-by: Bugen Zhao --- e2e_test/batch/basic/dml_update.slt.part | 6 ++++++ src/frontend/src/binder/update.rs | 5 +++++ 2 files changed, 11 insertions(+) diff --git a/e2e_test/batch/basic/dml_update.slt.part b/e2e_test/batch/basic/dml_update.slt.part index 580a74ba44368..c5a699f160c6c 100644 --- a/e2e_test/batch/basic/dml_update.slt.part +++ b/e2e_test/batch/basic/dml_update.slt.part @@ -108,5 +108,11 @@ statement error source for a multiple-column UPDATE item must be a sub-SELECT or update t set (v1, v2) = v1 + 1; +# Assignment to system columns. +statement error update modifying column `_rw_timestamp` is unsupported +update t set _rw_timestamp = _rw_timestamp + interval '1 second'; + + + statement ok drop table t; diff --git a/src/frontend/src/binder/update.rs b/src/frontend/src/binder/update.rs index 9fce6bf44bd98..f57ad1d197982 100644 --- a/src/frontend/src/binder/update.rs +++ b/src/frontend/src/binder/update.rs @@ -247,6 +247,11 @@ impl Binder { ) .into()); } + + let col = &table.table_catalog.columns()[id_index]; + if !col.can_dml() { + bail_bind_error!("update modifying column `{}` is unsupported", col.name()); + } } let (returning_list, fields) = self.bind_returning_list(returning_items)?; From e298d38fbb8088f39476867b1ab875b1177bd522 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 18 Nov 2024 16:51:19 +0800 Subject: [PATCH 09/11] update planner test Signed-off-by: Bugen Zhao --- .../testdata/output/index_selection.yaml | 18 +-- .../tests/testdata/output/update.yaml | 110 ++++++++++-------- 2 files changed, 73 insertions(+), 55 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/output/index_selection.yaml b/src/frontend/planner_test/tests/testdata/output/index_selection.yaml index a6240c69f395f..349c5f7d89012 100644 --- a/src/frontend/planner_test/tests/testdata/output/index_selection.yaml +++ b/src/frontend/planner_test/tests/testdata/output/index_selection.yaml @@ -213,16 +213,18 @@ update t1 set c = 3 where a = 1 and b = 2; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchUpdate { table: t1, exprs: [$0, $1, 3:Int64, $3] } + └─BatchUpdate { table: t1, exprs: [$0, $1, $5, $3] } └─BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id, t1._rw_timestamp], lookup table: t1 } - └─BatchExchange { order: [], dist: UpstreamHashShard(idx2.t1._row_id) } - └─BatchScan { table: idx2, columns: [idx2.t1._row_id], scan_ranges: [idx2.b = Decimal(Normalized(2)) AND idx2.a = Int32(1)], distribution: SomeShard } + └─BatchProject { exprs: [t1.a, t1.b, t1.c, t1._row_id, t1._rw_timestamp, 3:Int64] } + └─BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id, t1._rw_timestamp], lookup table: t1 } + └─BatchExchange { order: [], dist: UpstreamHashShard(idx2.t1._row_id) } + └─BatchScan { table: idx2, columns: [idx2.t1._row_id], scan_ranges: [idx2.b = Decimal(Normalized(2)) AND idx2.a = Int32(1)], distribution: SomeShard } batch_local_plan: |- - BatchUpdate { table: t1, exprs: [$0, $1, 3:Int64, $3] } - └─BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id, t1._rw_timestamp], lookup table: t1 } - └─BatchExchange { order: [], dist: Single } - └─BatchScan { table: idx2, columns: [idx2.t1._row_id], scan_ranges: [idx2.b = Decimal(Normalized(2)) AND idx2.a = Int32(1)], distribution: SomeShard } + BatchUpdate { table: t1, exprs: [$0, $1, $5, $3] } + └─BatchProject { exprs: [t1.a, t1.b, t1.c, t1._row_id, t1._rw_timestamp, 3:Int64] } + └─BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id, t1._rw_timestamp], lookup table: t1 } + └─BatchExchange { order: [], dist: Single } + └─BatchScan { table: idx2, columns: [idx2.t1._row_id], scan_ranges: [idx2.b = Decimal(Normalized(2)) AND idx2.a = Int32(1)], distribution: SomeShard } - sql: | create table t1 (a int, b numeric, c bigint, p int); create materialized view v as select count(*) as cnt, p from t1 group by p; diff --git a/src/frontend/planner_test/tests/testdata/output/update.yaml b/src/frontend/planner_test/tests/testdata/output/update.yaml index 19d6673d77f9a..eee33aef821fa 100644 --- a/src/frontend/planner_test/tests/testdata/output/update.yaml +++ b/src/frontend/planner_test/tests/testdata/output/update.yaml @@ -4,9 +4,10 @@ update t set v1 = 0; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchUpdate { table: t, exprs: [0:Int32, $1, $2] } + └─BatchUpdate { table: t, exprs: [$4, $1, $2] } └─BatchExchange { order: [], dist: Single } - └─BatchScan { table: t, columns: [t.v1, t.v2, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } + └─BatchProject { exprs: [t.v1, t.v2, t._row_id, t._rw_timestamp, 0:Int32] } + └─BatchScan { table: t, columns: [t.v1, t.v2, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } - sql: | create table t (v1 int, v2 int); update t set v1 = true; @@ -16,72 +17,81 @@ update t set v1 = v2 + 1; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchUpdate { table: t, exprs: [($1 + 1:Int32), $1, $2] } + └─BatchUpdate { table: t, exprs: [$4, $1, $2] } └─BatchExchange { order: [], dist: Single } - └─BatchScan { table: t, columns: [t.v1, t.v2, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } + └─BatchProject { exprs: [t.v1, t.v2, t._row_id, t._rw_timestamp, (t.v2 + 1:Int32) as $expr1] } + └─BatchScan { table: t, columns: [t.v1, t.v2, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } - sql: | create table t (v1 int, v2 real); update t set v1 = v2; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchUpdate { table: t, exprs: [$1::Int32, $1, $2] } + └─BatchUpdate { table: t, exprs: [$4, $1, $2] } └─BatchExchange { order: [], dist: Single } - └─BatchScan { table: t, columns: [t.v1, t.v2, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } + └─BatchProject { exprs: [t.v1, t.v2, t._row_id, t._rw_timestamp, t.v2::Int32 as $expr1] } + └─BatchScan { table: t, columns: [t.v1, t.v2, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } - sql: | create table t (v1 int, v2 real); update t set v1 = DEFAULT; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchUpdate { table: t, exprs: [null:Int32, $1, $2] } + └─BatchUpdate { table: t, exprs: [$4, $1, $2] } └─BatchExchange { order: [], dist: Single } - └─BatchScan { table: t, columns: [t.v1, t.v2, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } + └─BatchProject { exprs: [t.v1, t.v2, t._row_id, t._rw_timestamp, null:Int32] } + └─BatchScan { table: t, columns: [t.v1, t.v2, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } - sql: | create table t (v1 int, v2 int); update t set v1 = v2 + 1 where v2 > 0; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchUpdate { table: t, exprs: [($1 + 1:Int32), $1, $2] } + └─BatchUpdate { table: t, exprs: [$4, $1, $2] } └─BatchExchange { order: [], dist: Single } - └─BatchFilter { predicate: (t.v2 > 0:Int32) } - └─BatchScan { table: t, columns: [t.v1, t.v2, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } + └─BatchProject { exprs: [t.v1, t.v2, t._row_id, t._rw_timestamp, (t.v2 + 1:Int32) as $expr1] } + └─BatchFilter { predicate: (t.v2 > 0:Int32) } + └─BatchScan { table: t, columns: [t.v1, t.v2, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } - sql: | create table t (v1 int, v2 int); update t set (v1, v2) = (v2 + 1, v1 - 1) where v1 != v2; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchUpdate { table: t, exprs: [($1 + 1:Int32), ($0 - 1:Int32), $2] } + └─BatchUpdate { table: t, exprs: [$4, $5, $2] } └─BatchExchange { order: [], dist: Single } - └─BatchFilter { predicate: (t.v1 <> t.v2) } - └─BatchScan { table: t, columns: [t.v1, t.v2, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } + └─BatchProject { exprs: [t.v1, t.v2, t._row_id, t._rw_timestamp, (t.v2 + 1:Int32) as $expr1, (t.v1 - 1:Int32) as $expr2] } + └─BatchFilter { predicate: (t.v1 <> t.v2) } + └─BatchScan { table: t, columns: [t.v1, t.v2, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } - sql: | create table t (v1 int, v2 int); update t set (v1, v2) = (v2 + 1, v1 - 1) where v1 != v2 returning *, v2+1, v1-1; logical_plan: |- - LogicalProject { exprs: [t.v1, t.v2, (t.v2 + 1:Int32) as $expr1, (t.v1 - 1:Int32) as $expr2] } - └─LogicalUpdate { table: t, exprs: [($1 + 1:Int32), ($0 - 1:Int32), $2], returning: true } - └─LogicalFilter { predicate: (t.v1 <> t.v2) } - └─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id, t._rw_timestamp] } + LogicalProject { exprs: [, , ( + 1:Int32) as $expr3, ( - 1:Int32) as $expr4] } + └─LogicalUpdate { table: t, exprs: [$4, $5, $2], returning: true } + └─LogicalProject { exprs: [t.v1, t.v2, t._row_id, t._rw_timestamp, (t.v2 + 1:Int32) as $expr1, (t.v1 - 1:Int32) as $expr2] } + └─LogicalFilter { predicate: (t.v1 <> t.v2) } + └─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id, t._rw_timestamp] } batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchProject { exprs: [t.v1, t.v2, (t.v2 + 1:Int32) as $expr1, (t.v1 - 1:Int32) as $expr2] } - └─BatchUpdate { table: t, exprs: [($1 + 1:Int32), ($0 - 1:Int32), $2], returning: true } + └─BatchProject { exprs: [, , ( + 1:Int32) as $expr3, ( - 1:Int32) as $expr4] } + └─BatchUpdate { table: t, exprs: [$4, $5, $2], returning: true } └─BatchExchange { order: [], dist: Single } - └─BatchFilter { predicate: (t.v1 <> t.v2) } - └─BatchScan { table: t, columns: [t.v1, t.v2, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } + └─BatchProject { exprs: [t.v1, t.v2, t._row_id, t._rw_timestamp, (t.v2 + 1:Int32) as $expr1, (t.v1 - 1:Int32) as $expr2] } + └─BatchFilter { predicate: (t.v1 <> t.v2) } + └─BatchScan { table: t, columns: [t.v1, t.v2, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } - name: update with returning statement, should keep `Update` sql: | create table t (v int); update t set v = 114 returning 514; logical_plan: |- LogicalProject { exprs: [514:Int32] } - └─LogicalUpdate { table: t, exprs: [114:Int32, $1], returning: true } - └─LogicalScan { table: t, columns: [t.v, t._row_id, t._rw_timestamp] } + └─LogicalUpdate { table: t, exprs: [$3, $1], returning: true } + └─LogicalProject { exprs: [t.v, t._row_id, t._rw_timestamp, 114:Int32] } + └─LogicalScan { table: t, columns: [t.v, t._row_id, t._rw_timestamp] } batch_plan: |- BatchExchange { order: [], dist: Single } └─BatchProject { exprs: [514:Int32] } - └─BatchUpdate { table: t, exprs: [114:Int32, $1], returning: true } + └─BatchUpdate { table: t, exprs: [$3, $1], returning: true } └─BatchExchange { order: [], dist: Single } - └─BatchScan { table: t, columns: [t.v, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } + └─BatchProject { exprs: [t.v, t._row_id, t._rw_timestamp, 114:Int32] } + └─BatchScan { table: t, columns: [t.v, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } - sql: | create table t (v1 int primary key, v2 int); update t set (v2, v1) = (v1, v2); @@ -90,22 +100,25 @@ create table t (v1 int default 1+1, v2 int); update t set v1 = default; logical_plan: |- - LogicalUpdate { table: t, exprs: [(1:Int32 + 1:Int32), $1, $2] } - └─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id, t._rw_timestamp] } + LogicalUpdate { table: t, exprs: [$4, $1, $2] } + └─LogicalProject { exprs: [t.v1, t.v2, t._row_id, t._rw_timestamp, (1:Int32 + 1:Int32) as $expr1] } + └─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id, t._rw_timestamp] } batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchUpdate { table: t, exprs: [2:Int32, $1, $2] } + └─BatchUpdate { table: t, exprs: [$4, $1, $2] } └─BatchExchange { order: [], dist: Single } - └─BatchScan { table: t, columns: [t.v1, t.v2, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } + └─BatchProject { exprs: [t.v1, t.v2, t._row_id, t._rw_timestamp, 2:Int32] } + └─BatchScan { table: t, columns: [t.v1, t.v2, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } - name: update table with generated columns sql: | create table t(v1 int as v2-1, v2 int, v3 int as v2+1); update t set v2 = 3; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchUpdate { table: t, exprs: [3:Int32, $3] } + └─BatchUpdate { table: t, exprs: [$5, $3] } └─BatchExchange { order: [], dist: Single } - └─BatchScan { table: t, columns: [t.v1, t.v2, t.v3, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } + └─BatchProject { exprs: [t.v1, t.v2, t.v3, t._row_id, t._rw_timestamp, 3:Int32] } + └─BatchScan { table: t, columns: [t.v1, t.v2, t.v3, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } - name: update generated column sql: | create table t(v1 int as v2-1, v2 int, v3 int as v2+1); @@ -121,20 +134,22 @@ create table t (a int, b int); update t set a = 777 where b not in (select a from t); logical_plan: |- - LogicalUpdate { table: t, exprs: [777:Int32, $1, $2] } - └─LogicalApply { type: LeftAnti, on: (t.b = t.a), correlated_id: 1 } - ├─LogicalScan { table: t, columns: [t.a, t.b, t._row_id, t._rw_timestamp] } - └─LogicalProject { exprs: [t.a] } - └─LogicalScan { table: t, columns: [t.a, t.b, t._row_id, t._rw_timestamp] } + LogicalUpdate { table: t, exprs: [$4, $1, $2] } + └─LogicalProject { exprs: [t.a, t.b, t._row_id, t._rw_timestamp, 777:Int32] } + └─LogicalApply { type: LeftAnti, on: (t.b = t.a), correlated_id: 1 } + ├─LogicalScan { table: t, columns: [t.a, t.b, t._row_id, t._rw_timestamp] } + └─LogicalProject { exprs: [t.a] } + └─LogicalScan { table: t, columns: [t.a, t.b, t._row_id, t._rw_timestamp] } batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchUpdate { table: t, exprs: [777:Int32, $1, $2] } + └─BatchUpdate { table: t, exprs: [$4, $1, $2] } └─BatchExchange { order: [], dist: Single } - └─BatchHashJoin { type: LeftAnti, predicate: t.b = t.a, output: all } - ├─BatchExchange { order: [], dist: HashShard(t.b) } - │ └─BatchScan { table: t, columns: [t.a, t.b, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } - └─BatchExchange { order: [], dist: HashShard(t.a) } - └─BatchScan { table: t, columns: [t.a], distribution: SomeShard } + └─BatchProject { exprs: [t.a, t.b, t._row_id, t._rw_timestamp, 777:Int32] } + └─BatchHashJoin { type: LeftAnti, predicate: t.b = t.a, output: all } + ├─BatchExchange { order: [], dist: HashShard(t.b) } + │ └─BatchScan { table: t, columns: [t.a, t.b, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } + └─BatchExchange { order: [], dist: HashShard(t.a) } + └─BatchScan { table: t, columns: [t.a], distribution: SomeShard } - name: delete subquery sql: | create table t (a int, b int); @@ -163,12 +178,13 @@ batch_distributed_plan: |- BatchSimpleAgg { aggs: [sum()] } └─BatchExchange { order: [], dist: Single } - └─BatchUpdate { table: t, exprs: [($0 + 1:Int32), $1, $2] } - └─BatchExchange { order: [], dist: HashShard(t.a, t.b, t._row_id, t._rw_timestamp) } - └─BatchScan { table: t, columns: [t.a, t.b, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } + └─BatchUpdate { table: t, exprs: [$4, $1, $2] } + └─BatchExchange { order: [], dist: HashShard(t.a, t.b, t._row_id, t._rw_timestamp, $expr1) } + └─BatchProject { exprs: [t.a, t.b, t._row_id, t._rw_timestamp, (t.a + 1:Int32) as $expr1] } + └─BatchScan { table: t, columns: [t.a, t.b, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } - name: update table with subquery in the set clause sql: | create table t1 (v1 int primary key, v2 int); create table t2 (v1 int primary key, v2 int); update t1 set v1 = (select v1 from t2 where t1.v2 = t2.v2); - binder_error: 'Bind error: subquery on the right side of assignment is unsupported' + binder_error: 'Bind error: update modifying the PK column is unsupported' From ae69031e85310645bc4ef4c460b92edb3596051b Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 18 Nov 2024 16:59:58 +0800 Subject: [PATCH 10/11] add more planner tests Signed-off-by: Bugen Zhao --- .../tests/testdata/input/update.yaml | 29 +++++++-- .../tests/testdata/output/update.yaml | 64 +++++++++++++++++-- 2 files changed, 81 insertions(+), 12 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/input/update.yaml b/src/frontend/planner_test/tests/testdata/input/update.yaml index 65c0f47eb4cd4..744735af843de 100644 --- a/src/frontend/planner_test/tests/testdata/input/update.yaml +++ b/src/frontend/planner_test/tests/testdata/input/update.yaml @@ -76,7 +76,7 @@ update t set v2 = 3; expected_outputs: - binder_error -- name: update subquery +- name: update subquery selection sql: | create table t (a int, b int); update t set a = 777 where b not in (select a from t); @@ -98,10 +98,27 @@ update t set a = a + 1; expected_outputs: - batch_distributed_plan -- name: update table with subquery in the set clause +- name: update table to subquery sql: | - create table t1 (v1 int primary key, v2 int); - create table t2 (v1 int primary key, v2 int); - update t1 set v1 = (select v1 from t2 where t1.v2 = t2.v2); + create table t (v1 int, v2 int); + update t set v1 = (select 666); + expected_outputs: + - batch_plan +- name: update table to subquery with runtime cardinality + sql: | + create table t (v1 int, v2 int); + update t set v1 = (select generate_series(888, 888)); + expected_outputs: + - batch_plan +- name: update table to correlated subquery + sql: | + create table t (v1 int, v2 int); + update t set v1 = (select count(*) from t as source where source.v2 = t.v2); expected_outputs: - - binder_error + - batch_plan +- name: update table to subquery with multiple assignments + sql: | + create table t (v1 int, v2 int); + update t set (v1, v2) = (select 666.66, 777); + expected_outputs: + - batch_plan diff --git a/src/frontend/planner_test/tests/testdata/output/update.yaml b/src/frontend/planner_test/tests/testdata/output/update.yaml index eee33aef821fa..4a12b492660ad 100644 --- a/src/frontend/planner_test/tests/testdata/output/update.yaml +++ b/src/frontend/planner_test/tests/testdata/output/update.yaml @@ -129,7 +129,7 @@ create table t(v1 int as v2-1, v2 int, v3 int as v2+1, primary key (v3)); update t set v2 = 3; binder_error: 'Bind error: update modifying the column referenced by generated columns that are part of the primary key is not allowed' -- name: update subquery +- name: update subquery selection sql: | create table t (a int, b int); update t set a = 777 where b not in (select a from t); @@ -182,9 +182,61 @@ └─BatchExchange { order: [], dist: HashShard(t.a, t.b, t._row_id, t._rw_timestamp, $expr1) } └─BatchProject { exprs: [t.a, t.b, t._row_id, t._rw_timestamp, (t.a + 1:Int32) as $expr1] } └─BatchScan { table: t, columns: [t.a, t.b, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } -- name: update table with subquery in the set clause +- name: update table to subquery sql: | - create table t1 (v1 int primary key, v2 int); - create table t2 (v1 int primary key, v2 int); - update t1 set v1 = (select v1 from t2 where t1.v2 = t2.v2); - binder_error: 'Bind error: update modifying the PK column is unsupported' + create table t (v1 int, v2 int); + update t set v1 = (select 666); + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchUpdate { table: t, exprs: [$4, $1, $2] } + └─BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all } + ├─BatchExchange { order: [], dist: Single } + │ └─BatchScan { table: t, columns: [t.v1, t.v2, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } + └─BatchValues { rows: [[666:Int32]] } +- name: update table to subquery with runtime cardinality + sql: | + create table t (v1 int, v2 int); + update t set v1 = (select generate_series(888, 888)); + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchUpdate { table: t, exprs: [$4, $1, $2] } + └─BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all } + ├─BatchExchange { order: [], dist: Single } + │ └─BatchScan { table: t, columns: [t.v1, t.v2, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } + └─BatchMaxOneRow + └─BatchProject { exprs: [GenerateSeries(888:Int32, 888:Int32)] } + └─BatchProjectSet { select_list: [GenerateSeries(888:Int32, 888:Int32)] } + └─BatchValues { rows: [[]] } +- name: update table to correlated subquery + sql: | + create table t (v1 int, v2 int); + update t set v1 = (select count(*) from t as source where source.v2 = t.v2); + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchUpdate { table: t, exprs: [$4, $1, $2] } + └─BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [t.v1, t.v2, t._row_id, t._rw_timestamp, count(1:Int32)::Int32 as $expr1] } + └─BatchHashJoin { type: LeftOuter, predicate: t.v2 IS NOT DISTINCT FROM t.v2, output: [t.v1, t.v2, t._row_id, t._rw_timestamp, count(1:Int32)] } + ├─BatchExchange { order: [], dist: HashShard(t.v2) } + │ └─BatchScan { table: t, columns: [t.v1, t.v2, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } + └─BatchHashAgg { group_key: [t.v2], aggs: [count(1:Int32)] } + └─BatchHashJoin { type: LeftOuter, predicate: t.v2 IS NOT DISTINCT FROM t.v2, output: [t.v2, 1:Int32] } + ├─BatchHashAgg { group_key: [t.v2], aggs: [] } + │ └─BatchExchange { order: [], dist: HashShard(t.v2) } + │ └─BatchScan { table: t, columns: [t.v2], distribution: SomeShard } + └─BatchExchange { order: [], dist: HashShard(t.v2) } + └─BatchProject { exprs: [t.v2, 1:Int32] } + └─BatchFilter { predicate: IsNotNull(t.v2) } + └─BatchScan { table: t, columns: [t.v2], distribution: SomeShard } +- name: update table to subquery with multiple assignments + sql: | + create table t (v1 int, v2 int); + update t set (v1, v2) = (select 666.66, 777); + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchUpdate { table: t, exprs: [Field($4, 0:Int32), Field($4, 1:Int32), $2] } + └─BatchProject { exprs: [t.v1, t.v2, t._row_id, t._rw_timestamp, $expr10011::Struct(StructType { field_names: [], field_types: [Int32, Int32] }) as $expr1] } + └─BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all } + ├─BatchExchange { order: [], dist: Single } + │ └─BatchScan { table: t, columns: [t.v1, t.v2, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } + └─BatchValues { rows: [['(666.66,777)':Struct(StructType { field_names: [], field_types: [Decimal, Int32] })]] } From d9a0bb8a66266f2c6ee0d67b58a9dff92236db09 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 20 Nov 2024 15:44:55 +0800 Subject: [PATCH 11/11] add corner case to test case Signed-off-by: Bugen Zhao --- e2e_test/batch/basic/dml_update.slt.part | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/e2e_test/batch/basic/dml_update.slt.part b/e2e_test/batch/basic/dml_update.slt.part index c5a699f160c6c..fcc3bbdfce9a2 100644 --- a/e2e_test/batch/basic/dml_update.slt.part +++ b/e2e_test/batch/basic/dml_update.slt.part @@ -55,7 +55,6 @@ statement error must return only one column update t set v1 = (select 666, 888); - # Multiple assignment clauses. statement ok update t set v1 = 1919, v2 = 810; @@ -113,6 +112,21 @@ statement error update modifying column `_rw_timestamp` is unsupported update t set _rw_timestamp = _rw_timestamp + interval '1 second'; +# https://github.com/risingwavelabs/risingwave/pull/19402#pullrequestreview-2444427475 +# https://github.com/risingwavelabs/risingwave/pull/19452 +statement ok +create table y (v1 int, v2 int); + +statement ok +insert into y values (11, 11), (22, 22); + +statement error Scalar subquery produced more than one row +update t set (v1, v2) = (select y.v1, y.v2 from y); + +statement ok +drop table y; + +# Cleanup. statement ok drop table t;