Skip to content

Commit

Permalink
support insert in local mode
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Feb 28, 2023
1 parent 3c4591b commit a9b9424
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 36 deletions.
31 changes: 29 additions & 2 deletions src/frontend/src/handler/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use postgres_types::FromSql;
use risingwave_common::catalog::Schema;
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::session_config::QueryMode;
use risingwave_sqlparser::ast::Statement;
use risingwave_sqlparser::ast::{SetExpr, Statement};

use super::{PgResponseStream, RwPgResponse};
use crate::binder::{Binder, BoundSetExpr, BoundStatement};
Expand All @@ -50,6 +50,34 @@ pub fn gen_batch_query_plan(
) -> Result<(PlanRef, QueryMode, Schema)> {
let stmt_type = to_statement_type(&stmt)?;

let must_dist = {
fn is_insert_using_select(stmt: &Statement) -> bool {
fn has_select_query(set_expr: &SetExpr) -> bool {
match set_expr {
SetExpr::Select(_) => true,
SetExpr::Query(query) => has_select_query(&query.body),
SetExpr::SetOperation { left, right, .. } => {
has_select_query(left) || has_select_query(right)
}
SetExpr::Values(_) => false,
}
}

matches!(
stmt,
Statement::Insert {source, ..} if has_select_query(&source.body)
)
}

matches!(
stmt_type,
StatementType::UPDATE
| StatementType::DELETE
| StatementType::UPDATE_RETURNING
| StatementType::DELETE_RETURNING
) | is_insert_using_select(&stmt)
};

let bound = {
let mut binder = Binder::new(session);
binder.bind(stmt)?
Expand All @@ -68,7 +96,6 @@ pub fn gen_batch_query_plan(
must_local = true;
}
}
let must_dist = stmt_type.is_dml();

let query_mode = match (must_dist, must_local) {
(true, true) => {
Expand Down
107 changes: 73 additions & 34 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ use self::plan_node::{
#[cfg(debug_assertions)]
use self::plan_visitor::InputRefValidator;
use self::plan_visitor::{
has_batch_delete, has_batch_exchange, has_batch_insert, has_batch_update, has_logical_apply,
has_logical_over_agg, HasMaxOneRowApply,
has_batch_exchange, has_logical_apply, has_logical_over_agg, HasMaxOneRowApply,
};
use self::property::RequiredDist;
use self::rule::*;
Expand Down Expand Up @@ -531,32 +530,6 @@ impl PlanRoot {
Ok(plan)
}

/// As we always run the root stage locally, we should ensure that singleton table scan is not
/// the root stage. Returns `true` if we must insert an additional exchange to ensure this.
fn require_additional_exchange_on_root(plan: PlanRef) -> bool {
fn is_candidate_table_scan(plan: &PlanRef) -> bool {
if let Some(node) = plan.as_batch_seq_scan()
&& !node.logical().is_sys_table() {
true
} else {
plan.node_type() == PlanNodeType::BatchSource
}
}

fn no_exchange_before_table_scan(plan: PlanRef) -> bool {
if plan.node_type() == PlanNodeType::BatchExchange {
return false;
}
is_candidate_table_scan(&plan)
|| plan.inputs().into_iter().any(no_exchange_before_table_scan)
}

assert_eq!(plan.distribution(), &Distribution::Single);
no_exchange_before_table_scan(plan)

// TODO: join between a normal table and a system table is not supported yet
}

/// Optimize and generate a batch query plan for distributed execution.
pub fn gen_batch_distributed_plan(&mut self) -> Result<PlanRef> {
self.set_required_dist(RequiredDist::single());
Expand All @@ -576,11 +549,7 @@ impl PlanRoot {
ctx.trace("To Batch Distributed Plan:");
ctx.trace(plan.explain_to_string().unwrap());
}
if has_batch_insert(plan.clone())
|| has_batch_delete(plan.clone())
|| has_batch_update(plan.clone())
|| Self::require_additional_exchange_on_root(plan.clone())
{
if require_additional_exchange_on_root_in_distributed_mode(plan.clone()) {
plan =
BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into();
}
Expand All @@ -598,7 +567,7 @@ impl PlanRoot {
// We remark that since the `to_local_with_order_required` does not enforce single
// distribution, we enforce at the root if needed.
let insert_exchange = match plan.distribution() {
Distribution::Single => Self::require_additional_exchange_on_root(plan.clone()),
Distribution::Single => require_additional_exchange_on_root_in_local_mode(plan.clone()),
_ => true,
};
if insert_exchange {
Expand Down Expand Up @@ -828,6 +797,76 @@ fn inline_session_timezone_in_exprs(ctx: OptimizerContextRef, plan: PlanRef) ->
Ok(plan)
}

fn exist_and_no_exchange_before(plan: &PlanRef, is_candidate: fn(&PlanRef) -> bool) -> bool {
if plan.node_type() == PlanNodeType::BatchExchange {
return false;
}
is_candidate(plan)
|| plan
.inputs()
.iter()
.any(|input| exist_and_no_exchange_before(input, is_candidate))
}

/// As we always run the root stage locally, for some plan in root stage which need to execute in
/// compute node we insert an additional exhchange before it to avoid to include it in the root
/// stage.
///
/// Returns `true` if we must insert an additional exchange to ensure this.
fn require_additional_exchange_on_root_in_distributed_mode(plan: PlanRef) -> bool {
fn is_user_table(plan: &PlanRef) -> bool {
plan.as_batch_seq_scan()
.map(|node| !node.logical().is_sys_table())
.unwrap_or(false)
}

fn is_source(plan: &PlanRef) -> bool {
plan.node_type() == PlanNodeType::BatchSource
}

fn is_insert(plan: &PlanRef) -> bool {
plan.node_type() == PlanNodeType::BatchInsert
}

fn is_update(plan: &PlanRef) -> bool {
plan.node_type() == PlanNodeType::BatchUpdate
}

fn is_delete(plan: &PlanRef) -> bool {
plan.node_type() == PlanNodeType::BatchDelete
}

assert_eq!(plan.distribution(), &Distribution::Single);
exist_and_no_exchange_before(&plan, is_user_table)
|| exist_and_no_exchange_before(&plan, is_source)
|| exist_and_no_exchange_before(&plan, is_insert)
|| exist_and_no_exchange_before(&plan, is_update)
|| exist_and_no_exchange_before(&plan, is_delete)
}

/// The purpose is same as `require_additional_exchange_on_root_in_distributed_mode`. We separate
/// them for the different requirement of plan node in different execute mode.
fn require_additional_exchange_on_root_in_local_mode(plan: PlanRef) -> bool {
fn is_user_table(plan: &PlanRef) -> bool {
plan.as_batch_seq_scan()
.map(|node| !node.logical().is_sys_table())
.unwrap_or(false)
}

fn is_source(plan: &PlanRef) -> bool {
plan.node_type() == PlanNodeType::BatchSource
}

fn is_insert(plan: &PlanRef) -> bool {
plan.node_type() == PlanNodeType::BatchInsert
}

assert_eq!(plan.distribution(), &Distribution::Single);
exist_and_no_exchange_before(&plan, is_user_table)
|| exist_and_no_exchange_before(&plan, is_source)
|| exist_and_no_exchange_before(&plan, is_insert)
}

#[cfg(test)]
mod tests {
use risingwave_common::catalog::Field;
Expand Down

0 comments on commit a9b9424

Please sign in to comment.