Skip to content

Commit

Permalink
refine docs
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Nov 18, 2024
1 parent d387ca7 commit 104e11d
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 71 deletions.
2 changes: 2 additions & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ message UpdateNode {
uint32 table_id = 1;
// Version of the table.
uint64 table_version_id = 2;

Check failure on line 176 in proto/batch_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" with name "table_version_id" on message "UpdateNode" changed option "json_name" from "exprs" to "tableVersionId".

Check failure on line 176 in proto/batch_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" with name "table_version_id" on message "UpdateNode" changed cardinality from "repeated" to "optional with implicit presence".

Check failure on line 176 in proto/batch_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" with name "table_version_id" on message "UpdateNode" changed type from "message" to "uint64". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.

Check failure on line 176 in proto/batch_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" on message "UpdateNode" changed name from "exprs" to "table_version_id".
// Expressions to generate `U-` records.
repeated expr.ExprNode old_exprs = 3;

Check failure on line 178 in proto/batch_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "3" with name "old_exprs" on message "UpdateNode" changed option "json_name" from "returning" to "oldExprs".

Check failure on line 178 in proto/batch_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "3" with name "old_exprs" on message "UpdateNode" changed cardinality from "optional with implicit presence" to "repeated".

Check failure on line 178 in proto/batch_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "3" with name "old_exprs" on message "UpdateNode" changed type from "bool" to "message". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.

Check failure on line 178 in proto/batch_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "3" on message "UpdateNode" changed name from "returning" to "old_exprs".
// Expressions to generate `U+` records.
repeated expr.ExprNode new_exprs = 4;

Check failure on line 180 in proto/batch_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "4" with name "new_exprs" on message "UpdateNode" changed option "json_name" from "tableVersionId" to "newExprs".

Check failure on line 180 in proto/batch_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "4" with name "new_exprs" on message "UpdateNode" changed cardinality from "optional with implicit presence" to "repeated".
bool returning = 5;

Expand Down
6 changes: 3 additions & 3 deletions src/batch/src/executor/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl Executor for UpdateExecutor {

impl UpdateExecutor {
#[try_stream(boxed, ok = DataChunk, error = BatchError)]
async fn do_execute(mut self: Box<Self>) {
async fn do_execute(self: Box<Self>) {
let table_dml_handle = self
.dml_manager
.table_dml_handle(self.table_id, self.table_version_id)?;
Expand Down Expand Up @@ -161,7 +161,7 @@ impl UpdateExecutor {

let old_data_chunk = {
let mut columns = Vec::with_capacity(self.old_exprs.len());
for expr in &mut self.old_exprs {
for expr in &self.old_exprs {
let column = expr.eval(&input).await?;
columns.push(column);
}
Expand All @@ -171,7 +171,7 @@ impl UpdateExecutor {

let updated_data_chunk = {
let mut columns = Vec::with_capacity(self.new_exprs.len());
for expr in &mut self.new_exprs {
for expr in &self.new_exprs {
let column = expr.eval(&input).await?;
columns.push(column);
}
Expand Down
98 changes: 54 additions & 44 deletions src/frontend/src/binder/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,30 @@ use risingwave_sqlparser::ast::{Assignment, AssignmentValue, Expr, ObjectName, S
use super::statement::RewriteExprsRecursive;
use super::{Binder, BoundBaseTable};
use crate::catalog::TableId;
use crate::error::{bail_bind_error, ErrorCode, Result, RwError};
use crate::error::{bail_bind_error, bind_error, ErrorCode, Result, RwError};
use crate::expr::{Expr as _, ExprImpl, SubqueryKind};
use crate::user::UserId;
use crate::TableCatalog;

/// Project into `exprs` in `BoundUpdate` to get the new values for updating.
#[derive(Debug, Clone, Copy)]
pub enum UpdateProject {
Expr(usize),
/// Use the expression at the given index in `exprs`.
Simple(usize),
/// Use the `i`-th field of the expression (returning a struct) at the given index in `exprs`.
Composite(usize, usize),
}

impl UpdateProject {
/// Offset the index by `i`.
pub fn offset(self, i: usize) -> Self {
match self {
UpdateProject::Simple(index) => UpdateProject::Simple(index + i),
UpdateProject::Composite(index, j) => UpdateProject::Composite(index, j),
}
}
}

#[derive(Debug, Clone)]
pub struct BoundUpdate {
/// Id of the table to perform updating.
Expand All @@ -54,12 +67,14 @@ pub struct BoundUpdate {

pub selection: Option<ExprImpl>,

pub projects: HashMap<usize, UpdateProject>,

/// Expression used to project to the updated row. The assigned columns will use the new
/// expression, and the other columns will be simply `InputRef`.
/// Expression used to evaluate the new values for the columns.
pub exprs: Vec<ExprImpl>,

/// Mapping from the index of the column to be updated, to the index of the expression in `exprs`.
///
/// By constructing two `Project` nodes with `exprs` and `projects`, we can get the new values.
pub projects: HashMap<usize, UpdateProject>,

// used for the 'RETURNING" keyword to indicate the returning items and schema
// if the list is empty and the schema is None, the output schema will be a INT64 as the
// affected row cnt
Expand Down Expand Up @@ -135,86 +150,81 @@ impl Binder {
let mut exprs = Vec::new();
let mut projects = HashMap::new();

macro_rules! record {
($id:expr, $project:expr) => {
let id_index = $id.as_input_ref().unwrap().index;
projects
.try_insert(id_index, $project)
.map_err(|_e| bind_error!("multiple assignments to the same column"))?;
};
}

for Assignment { id, value } in assignments {
let ids: Vec<_> = id
.into_iter()
.map(|id| self.bind_expr(Expr::Identifier(id)))
.try_collect()?;

match (ids.as_slice(), value) {
([id], value) => {
let id_index = id.as_input_ref().unwrap().index;

let expr = match value {
AssignmentValue::Expr(expr) => {
self.bind_expr(expr)?.cast_assign(id.return_type())?
}
AssignmentValue::Default => default_columns_from_catalog
.get(&id_index)
.cloned()
.unwrap_or_else(|| ExprImpl::literal_null(id.return_type())),
};

exprs.push(expr);
projects
.try_insert(id_index, UpdateProject::Expr(exprs.len() - 1))
.expect("multiple assignments");
}
// `SET col1 = DEFAULT`, `SET (col1, col2, ...) = DEFAULT`
(ids, AssignmentValue::Default) => {
for id in ids {
let id_index = id.as_input_ref().unwrap().index;

let expr = default_columns_from_catalog
.get(&id_index)
.cloned()
.unwrap_or_else(|| ExprImpl::literal_null(id.return_type()));

exprs.push(expr);
projects
.try_insert(id_index, UpdateProject::Expr(exprs.len() - 1))
.expect("multiple assignments");
record!(id, UpdateProject::Simple(exprs.len() - 1));
}
}

// `SET col1 = expr`
([id], AssignmentValue::Expr(expr)) => {
let expr = self.bind_expr(expr)?.cast_assign(id.return_type())?;
exprs.push(expr);
record!(id, UpdateProject::Simple(exprs.len() - 1));
}
// `SET (col1, col2, ...) = (val1, val2, ...)`
(ids, AssignmentValue::Expr(Expr::Row(values))) => {
if ids.len() != values.len() {
bail_bind_error!("number of columns does not match number of values");
}

for (id, value) in ids.iter().zip_eq_fast(values) {
let id_index = id.as_input_ref().unwrap().index;

let expr = self.bind_expr(value)?.cast_assign(id.return_type())?;

exprs.push(expr);
projects
.try_insert(id_index, UpdateProject::Expr(exprs.len() - 1))
.expect("multiple assignments");
record!(id, UpdateProject::Simple(exprs.len() - 1));
}
}
// `SET (col1, col2, ...) = (SELECT ...)`
(ids, AssignmentValue::Expr(Expr::Subquery(subquery))) => {
let expr = self.bind_subquery_expr(*subquery, SubqueryKind::UpdateSet)?;

if expr.return_type().as_struct().len() != ids.len() {
bail_bind_error!("number of columns does not match number of values");
}

let target_type = DataType::new_unnamed_struct(
ids.iter().map(|id| id.return_type()).collect(),
);

let expr = self
.bind_subquery_expr(*subquery, SubqueryKind::UpdateSet)?
.cast_assign(target_type)?;
let expr = expr.cast_assign(target_type)?;

exprs.push(expr);

for (i, id) in ids.iter().enumerate() {
let id_index = id.as_input_ref().unwrap().index;
projects
.try_insert(id_index, UpdateProject::Composite(exprs.len() - 1, i))
.expect("multiple assignments");
record!(id, UpdateProject::Composite(exprs.len() - 1, i));
}
}
(_ids, _expr) => {
bail_bind_error!("unsupported assignment");

(_ids, AssignmentValue::Expr(_expr)) => {
bail_bind_error!("source for a multiple-column UPDATE item must be a sub-SELECT or ROW() expression");
}
}
}

// Check whether updating these columns is allowed.
for &id_index in projects.keys() {
if (table.table_catalog.pk())
.iter()
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/expr/subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use crate::expr::{CorrelatedId, Depth};
pub enum SubqueryKind {
/// Returns a scalar value (single column single row).
Scalar,
/// Returns a scalar struct value composed of multiple columns.
/// Used in `UPDATE SET (col1, col2) = (SELECT ...)`.
UpdateSet,
/// `EXISTS` | `NOT EXISTS` subquery (semi/anti-semi join). Returns a boolean.
Existential,
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/planner/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,16 +374,17 @@ impl Planner {
SubqueryKind::UpdateSet => {
let plan = subroot.into_unordered_subplan();

// Compose all input columns into a struct with `ROW` function.
let all_input_refs = plan
.schema()
.data_types()
.into_iter()
.enumerate()
.map(|(i, data_type)| InputRef::new(i, data_type).into())
.collect::<Vec<_>>();

let call =
FunctionCall::new_unchecked(ExprType::Row, all_input_refs, return_type);

LogicalProject::create(plan, vec![call.into()])
}
SubqueryKind::Existential => {
Expand Down
41 changes: 18 additions & 23 deletions src/frontend/src/planner/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ impl Planner {

let returning = !update.returning_list.is_empty();

let schema_len = input.schema().len();

// Extend table scan with updated columns.
let with_new: PlanRef = {
let mut plan = input;

Expand All @@ -62,33 +61,29 @@ impl Planner {
let mut news = Vec::new();

for (i, col) in update.table.table_catalog.columns().iter().enumerate() {
if col.is_generated() {
if !col.can_dml() {
continue;
}
let data_type = col.data_type();

let old: ExprImpl = InputRef::new(i, data_type.clone()).into();

let new: ExprImpl = match update.projects.get(&i).copied() {
Some(UpdateProject::Expr(index)) => {
InputRef::new(index + schema_len, data_type.clone()).into()
}
Some(UpdateProject::Composite(index, sub)) => FunctionCall::new_unchecked(
Type::Field,
vec![
InputRef::new(
index + schema_len,
with_new.schema().data_types()[index + schema_len].clone(),
)
.into(),
Literal::new(Some((sub as i32).to_scalar_value()), DataType::Int32).into(),
],
data_type.clone(),
)
.into(),

None => old.clone(),
};
let new: ExprImpl =
match (update.projects.get(&i)).map(|p| p.offset(with_new.schema().len())) {
Some(UpdateProject::Simple(j)) => InputRef::new(j, data_type.clone()).into(),
Some(UpdateProject::Composite(j, field)) => FunctionCall::new_unchecked(
Type::Field,
vec![
InputRef::new(j, with_new.schema().data_types()[j].clone()).into(),
Literal::new(Some((field as i32).to_scalar_value()), DataType::Int32)
.into(),
],
data_type.clone(),
)
.into(),

None => old.clone(),
};

olds.push(old);
news.push(new);
Expand Down

0 comments on commit 104e11d

Please sign in to comment.