Skip to content

Commit

Permalink
feat(batch): distributed dml (#14630)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Jan 18, 2024
1 parent 67eae55 commit 0f79291
Show file tree
Hide file tree
Showing 14 changed files with 214 additions and 19 deletions.
5 changes: 5 additions & 0 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ pub struct ConfigMap {
#[parameter(default = true, rename = "rw_batch_enable_sort_agg")]
batch_enable_sort_agg: bool,

/// Enable distributed DML, so an insert, delete, and update statement can be executed in a distributed way (e.g. running in multiple compute nodes).
/// No atomicity guarantee in this mode. Its goal is to gain the best ingestion performance for initial batch ingestion where users always can drop their table when failure happens.
#[parameter(default = false, rename = "batch_enable_distributed_dml")]
batch_enable_distributed_dml: bool,

/// The max gap allowed to transform small range scan into multi point lookup.
#[parameter(default = 8)]
max_split_range_gap: i32,
Expand Down
35 changes: 35 additions & 0 deletions src/frontend/planner_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ pub enum TestType {
BatchPlanProto,
/// Batch plan for local execution `.gen_batch_local_plan()`
BatchLocalPlan,
/// Batch plan for local execution `.gen_batch_distributed_plan()`
BatchDistributedPlan,

/// Create MV plan `.gen_create_mv_plan()`
StreamPlan,
Expand Down Expand Up @@ -201,6 +203,9 @@ pub struct TestCaseResult {
/// Batch plan for local execution `.gen_batch_local_plan()`
pub batch_local_plan: Option<String>,

/// Batch plan for distributed execution `.gen_batch_distributed_plan()`
pub batch_distributed_plan: Option<String>,

/// Generate sink plan
pub sink_plan: Option<String>,

Expand Down Expand Up @@ -709,6 +714,36 @@ impl TestCase {
}
}

'distributed_batch: {
if self
.expected_outputs
.contains(&TestType::BatchDistributedPlan)
|| self.expected_outputs.contains(&TestType::BatchError)
{
let batch_plan = match logical_plan.gen_batch_plan() {
Ok(batch_plan) => match logical_plan.gen_batch_distributed_plan(batch_plan) {
Ok(batch_plan) => batch_plan,
Err(err) => {
ret.batch_error = Some(err.to_report_string_pretty());
break 'distributed_batch;
}
},
Err(err) => {
ret.batch_error = Some(err.to_report_string_pretty());
break 'distributed_batch;
}
};

// Only generate batch_plan if it is specified in test case
if self
.expected_outputs
.contains(&TestType::BatchDistributedPlan)
{
ret.batch_distributed_plan = Some(explain_plan(&batch_plan));
}
}
}

{
// stream
for (
Expand Down
7 changes: 7 additions & 0 deletions src/frontend/planner_test/tests/testdata/input/delete.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,10 @@
delete from t returning sum(a);
expected_outputs:
- binder_error
- name: distributed delete
sql: |
set batch_enable_distributed_dml = true;
create table t (a int, b int);
delete from t;
expected_outputs:
- batch_distributed_plan
7 changes: 7 additions & 0 deletions src/frontend/planner_test/tests/testdata/input/insert.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -257,3 +257,10 @@
expected_outputs:
- batch_plan
- logical_plan
- name: distributed insert
sql: |
set batch_enable_distributed_dml = true;
create table t (a int, b int);
insert into t select * from t;
expected_outputs:
- batch_distributed_plan
7 changes: 7 additions & 0 deletions src/frontend/planner_test/tests/testdata/input/update.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,10 @@
expected_outputs:
- logical_plan
- batch_plan
- name: distributed update
sql: |
set batch_enable_distributed_dml = true;
create table t (a int, b int);
update t set a = a + 1;
expected_outputs:
- batch_distributed_plan
11 changes: 11 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/delete.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,14 @@
create table t (a int, b int);
delete from t returning sum(a);
binder_error: 'Bind error: should not have agg/window in the `RETURNING` list'
- name: distributed delete
sql: |
set batch_enable_distributed_dml = true;
create table t (a int, b int);
delete from t;
batch_distributed_plan: |-
BatchSimpleAgg { aggs: [sum()] }
└─BatchExchange { order: [], dist: Single }
└─BatchDelete { table: t }
└─BatchExchange { order: [], dist: HashShard(t.a, t.b, t._row_id) }
└─BatchScan { table: t, columns: [t.a, t.b, t._row_id], distribution: UpstreamHashShard(t._row_id) }
11 changes: 11 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/insert.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -332,3 +332,14 @@
BatchExchange { order: [], dist: Single }
└─BatchInsert { table: t, mapping: [0:0], default: [1<-(2:Int32 + 3:Int32)] }
└─BatchValues { rows: [[1:Int32]] }
- name: distributed insert
sql: |
set batch_enable_distributed_dml = true;
create table t (a int, b int);
insert into t select * from t;
batch_distributed_plan: |-
BatchSimpleAgg { aggs: [sum()] }
└─BatchExchange { order: [], dist: Single }
└─BatchInsert { table: t, mapping: [0:0, 1:1] }
└─BatchExchange { order: [], dist: HashShard(t.a, t.b) }
└─BatchScan { table: t, columns: [t.a, t.b], distribution: SomeShard }
11 changes: 11 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/update.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,14 @@
│ └─BatchScan { table: t, columns: [t.a, t.b, t._row_id], distribution: UpstreamHashShard(t._row_id) }
└─BatchExchange { order: [], dist: HashShard(t.b) }
└─BatchScan { table: t, columns: [t.b], distribution: SomeShard }
- name: distributed update
sql: |
set batch_enable_distributed_dml = true;
create table t (a int, b int);
update t set a = a + 1;
batch_distributed_plan: |-
BatchSimpleAgg { aggs: [sum()] }
└─BatchExchange { order: [], dist: Single }
└─BatchUpdate { table: t, exprs: [($0 + 1:Int32), $1, $2] }
└─BatchExchange { order: [], dist: HashShard(t.a, t.b, t._row_id) }
└─BatchScan { table: t, columns: [t.a, t.b, t._row_id], distribution: UpstreamHashShard(t._row_id) }
29 changes: 24 additions & 5 deletions src/frontend/src/optimizer/plan_node/batch_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use super::{
generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch,
};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::generic::PhysicalPlanRef;
use crate::optimizer::plan_node::ToLocalBatch;
use crate::optimizer::plan_node::generic::{GenericPlanNode, PhysicalPlanRef};
use crate::optimizer::plan_node::{utils, ToLocalBatch};
use crate::optimizer::property::{Distribution, Order, RequiredDist};

/// `BatchDelete` implements [`super::LogicalDelete`]
Expand All @@ -35,7 +35,6 @@ pub struct BatchDelete {

impl BatchDelete {
pub fn new(core: generic::Delete<PlanRef>) -> Self {
assert_eq!(core.input.distribution(), &Distribution::Single);
let base =
PlanBase::new_batch_with_core(&core, core.input.distribution().clone(), Order::any());
Self { base, core }
Expand All @@ -59,9 +58,29 @@ impl_distill_by_unit!(BatchDelete, core, "BatchDelete");

impl ToDistributedBatch for BatchDelete {
fn to_distributed(&self) -> Result<PlanRef> {
let new_input = RequiredDist::single()
if self
.core
.ctx()
.session_ctx()
.config()
.batch_enable_distributed_dml()
{
// Add an hash shuffle between the delete and its input.
let new_input = RequiredDist::PhysicalDist(Distribution::HashShard(
(0..self.input().schema().len()).collect(),
))
.enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?;
Ok(self.clone_with_input(new_input).into())
let new_delete: PlanRef = self.clone_with_input(new_input).into();
if self.core.returning {
Ok(new_delete)
} else {
utils::sum_affected_row(new_delete)
}
} else {
let new_input = RequiredDist::single()
.enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?;
Ok(self.clone_with_input(new_input).into())
}
}
}

Expand Down
28 changes: 24 additions & 4 deletions src/frontend/src/optimizer/plan_node/batch_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use super::utils::{childless_record, Distill};
use super::{generic, ExprRewritable, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch};
use crate::expr::Expr;
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::{PlanBase, ToLocalBatch};
use crate::optimizer::plan_node::generic::GenericPlanNode;
use crate::optimizer::plan_node::{utils, PlanBase, ToLocalBatch};
use crate::optimizer::property::{Distribution, Order, RequiredDist};

/// `BatchInsert` implements [`super::LogicalInsert`]
Expand All @@ -36,7 +37,6 @@ pub struct BatchInsert {

impl BatchInsert {
pub fn new(core: generic::Insert<PlanRef>) -> Self {
assert_eq!(core.input.distribution(), &Distribution::Single);
let base: PlanBase<Batch> =
PlanBase::new_batch_with_core(&core, core.input.distribution().clone(), Order::any());

Expand Down Expand Up @@ -69,9 +69,29 @@ impl_plan_tree_node_for_unary! { BatchInsert }

impl ToDistributedBatch for BatchInsert {
fn to_distributed(&self) -> Result<PlanRef> {
let new_input = RequiredDist::single()
if self
.core
.ctx()
.session_ctx()
.config()
.batch_enable_distributed_dml()
{
// Add an hash shuffle between the insert and its input.
let new_input = RequiredDist::PhysicalDist(Distribution::HashShard(
(0..self.input().schema().len()).collect(),
))
.enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?;
Ok(self.clone_with_input(new_input).into())
let new_insert: PlanRef = self.clone_with_input(new_input).into();
if self.core.returning {
Ok(new_insert)
} else {
utils::sum_affected_row(new_insert)
}
} else {
let new_input = RequiredDist::single()
.enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?;
Ok(self.clone_with_input(new_input).into())
}
}
}

Expand Down
31 changes: 26 additions & 5 deletions src/frontend/src/optimizer/plan_node/batch_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use super::{
};
use crate::expr::{Expr, ExprRewriter, ExprVisitor};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::ToLocalBatch;
use crate::optimizer::plan_node::generic::GenericPlanNode;
use crate::optimizer::plan_node::{utils, ToLocalBatch};
use crate::optimizer::property::{Distribution, Order, RequiredDist};

/// `BatchUpdate` implements [`super::LogicalUpdate`]
Expand All @@ -38,9 +39,9 @@ pub struct BatchUpdate {

impl BatchUpdate {
pub fn new(core: generic::Update<PlanRef>, schema: Schema) -> Self {
assert_eq!(core.input.distribution(), &Distribution::Single);
let ctx = core.input.ctx();
let base = PlanBase::new_batch(ctx, schema, Distribution::Single, Order::any());
let base =
PlanBase::new_batch(ctx, schema, core.input.distribution().clone(), Order::any());
Self { base, core }
}
}
Expand All @@ -62,9 +63,29 @@ impl_distill_by_unit!(BatchUpdate, core, "BatchUpdate");

impl ToDistributedBatch for BatchUpdate {
fn to_distributed(&self) -> Result<PlanRef> {
let new_input = RequiredDist::single()
if self
.core
.ctx()
.session_ctx()
.config()
.batch_enable_distributed_dml()
{
// Add an hash shuffle between the update and its input.
let new_input = RequiredDist::PhysicalDist(Distribution::HashShard(
(0..self.input().schema().len()).collect(),
))
.enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?;
Ok(self.clone_with_input(new_input).into())
let new_update: PlanRef = self.clone_with_input(new_input).into();
if self.core.returning {
Ok(new_update)
} else {
utils::sum_affected_row(new_update)
}
} else {
let new_input = RequiredDist::single()
.enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?;
Ok(self.clone_with_input(new_input).into())
}
}
}

Expand Down
28 changes: 26 additions & 2 deletions src/frontend/src/optimizer/plan_node/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use risingwave_common::util::sort_util::{ColumnOrder, OrderType};

use crate::catalog::table_catalog::{CreateType, TableType};
use crate::catalog::{ColumnId, TableCatalog, TableId};
use crate::optimizer::property::Cardinality;
use crate::utils::WithOptions;
use crate::optimizer::property::{Cardinality, Order, RequiredDist};
use crate::utils::{Condition, IndexSet, WithOptions};

#[derive(Default)]
pub struct TableCatalogBuilder {
Expand Down Expand Up @@ -288,6 +288,23 @@ impl<'a> IndicesDisplay<'a> {
}
}

pub(crate) fn sum_affected_row(dml: PlanRef) -> error::Result<PlanRef> {
let dml = RequiredDist::single().enforce_if_not_satisfies(dml, &Order::any())?;
// Accumulate the affected rows.
let sum_agg = PlanAggCall {
agg_kind: AggKind::Sum,
return_type: DataType::Int64,
inputs: vec![InputRef::new(0, DataType::Int64)],
distinct: false,
order_by: vec![],
filter: Condition::true_cond(),
direct_args: vec![],
};
let agg = Agg::new(vec![sum_agg], IndexSet::empty(), dml);
let batch_agg = BatchSimpleAgg::new(agg);
Ok(batch_agg.into())
}

/// Call `debug_struct` on the given formatter to create a debug struct builder.
/// If a property list is provided, properties in it will be added to the struct name according to
/// the condition of that property.
Expand All @@ -308,6 +325,13 @@ macro_rules! plan_node_name {
};
}
pub(crate) use plan_node_name;
use risingwave_common::error;
use risingwave_common::types::DataType;
use risingwave_expr::aggregate::AggKind;

use super::generic::{self, GenericPlanRef};
use super::pretty_config;
use crate::expr::InputRef;
use crate::optimizer::plan_node::generic::Agg;
use crate::optimizer::plan_node::{BatchSimpleAgg, PlanAggCall};
use crate::PlanRef;
12 changes: 9 additions & 3 deletions src/frontend/src/scheduler/distributed/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -708,9 +708,15 @@ impl StageRunner {
if candidates.is_empty() {
return Err(SchedulerError::EmptyWorkerNodes);
}
return Ok(Some(
candidates[self.stage.session_id.0 as usize % candidates.len()].clone(),
));
let candidate = if self.stage.batch_enable_distributed_dml {
// If distributed dml is enabled, we need to try our best to distribute dml tasks evenly to each worker.
// Using a `task_id` could be helpful in this case.
candidates[task_id as usize % candidates.len()].clone()
} else {
// If distributed dml is disabled, we need to guarantee that dml from the same session would be sent to a fixed worker/channel to provide a order guarantee.
candidates[self.stage.session_id.0 as usize % candidates.len()].clone()
};
return Ok(Some(candidate));
};

if let Some(distributed_lookup_join_node) =
Expand Down
Loading

0 comments on commit 0f79291

Please sign in to comment.