Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch): distributed dml #14630

Merged
merged 14 commits into from
Jan 18, 2024
4 changes: 4 additions & 0 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ pub struct ConfigMap {
#[parameter(default = true, rename = "rw_batch_enable_sort_agg")]
batch_enable_sort_agg: bool,

/// Enable distributed dml, so a insert, delete and update statement could be executed in distributed way (e.g. running in multiple compute nodes).
#[parameter(default = false, rename = "batch_enable_distributed_dml")]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do some performance test and make default to true.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make default to true

It comes with the cost for no atomicity guarantee. 😕

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we support reading external sources (e.g. iceberg) in the future, we can detect it from the plan and auto-enable it.

batch_enable_distributed_dml: bool,

/// The max gap allowed to transform small range scan into multi point lookup.
#[parameter(default = 8)]
max_split_range_gap: i32,
Expand Down
35 changes: 35 additions & 0 deletions src/frontend/planner_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ pub enum TestType {
BatchPlanProto,
/// Batch plan for local execution `.gen_batch_local_plan()`
BatchLocalPlan,
/// Batch plan for local execution `.gen_batch_distributed_plan()`
BatchDistributedPlan,

/// Create MV plan `.gen_create_mv_plan()`
StreamPlan,
Expand Down Expand Up @@ -201,6 +203,9 @@ pub struct TestCaseResult {
/// Batch plan for local execution `.gen_batch_local_plan()`
pub batch_local_plan: Option<String>,

/// Batch plan for distributed execution `.gen_batch_distributed_plan()`
pub batch_distributed_plan: Option<String>,

/// Generate sink plan
pub sink_plan: Option<String>,

Expand Down Expand Up @@ -709,6 +714,36 @@ impl TestCase {
}
}

'distributed_batch: {
if self
.expected_outputs
.contains(&TestType::BatchDistributedPlan)
|| self.expected_outputs.contains(&TestType::BatchError)
{
let batch_plan = match logical_plan.gen_batch_plan() {
Ok(batch_plan) => match logical_plan.gen_batch_distributed_plan(batch_plan) {
Ok(batch_plan) => batch_plan,
Err(err) => {
ret.batch_error = Some(err.to_report_string_pretty());
break 'distributed_batch;
}
},
Err(err) => {
ret.batch_error = Some(err.to_report_string_pretty());
break 'distributed_batch;
}
};

// Only generate batch_plan if it is specified in test case
if self
.expected_outputs
.contains(&TestType::BatchDistributedPlan)
{
ret.batch_distributed_plan = Some(explain_plan(&batch_plan));
}
}
}

{
// stream
for (
Expand Down
7 changes: 7 additions & 0 deletions src/frontend/planner_test/tests/testdata/input/delete.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,10 @@
delete from t returning sum(a);
expected_outputs:
- binder_error
- name: distributed delete
sql: |
set batch_enable_distributed_dml = true;
create table t (a int, b int);
delete from t;
expected_outputs:
- batch_distributed_plan
7 changes: 7 additions & 0 deletions src/frontend/planner_test/tests/testdata/input/insert.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -257,3 +257,10 @@
expected_outputs:
- batch_plan
- logical_plan
- name: distributed insert
sql: |
set batch_enable_distributed_dml = true;
create table t (a int, b int);
insert into t select * from t;
expected_outputs:
- batch_distributed_plan
7 changes: 7 additions & 0 deletions src/frontend/planner_test/tests/testdata/input/update.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,10 @@
expected_outputs:
- logical_plan
- batch_plan
- name: distributed update
sql: |
set batch_enable_distributed_dml = true;
create table t (a int, b int);
update t set a = a + 1;
expected_outputs:
- batch_distributed_plan
11 changes: 11 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/delete.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,14 @@
create table t (a int, b int);
delete from t returning sum(a);
binder_error: 'Bind error: should not have agg/window in the `RETURNING` list'
- name: distributed delete
sql: |
set batch_enable_distributed_dml = true;
create table t (a int, b int);
delete from t;
batch_distributed_plan: |-
BatchSimpleAgg { aggs: [sum()] }
└─BatchExchange { order: [], dist: Single }
└─BatchDelete { table: t }
Comment on lines +41 to +43
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reminds me of an issue in ancient times 🤣

#2678

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤣

└─BatchExchange { order: [], dist: HashShard(t.a, t.b, t._row_id) }
└─BatchScan { table: t, columns: [t.a, t.b, t._row_id], distribution: UpstreamHashShard(t._row_id) }
11 changes: 11 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/insert.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -332,3 +332,14 @@
BatchExchange { order: [], dist: Single }
└─BatchInsert { table: t, mapping: [0:0], default: [1<-(2:Int32 + 3:Int32)] }
└─BatchValues { rows: [[1:Int32]] }
- name: distributed insert
sql: |
set batch_enable_distributed_dml = true;
create table t (a int, b int);
insert into t select * from t;
batch_distributed_plan: |-
BatchSimpleAgg { aggs: [sum()] }
└─BatchExchange { order: [], dist: Single }
└─BatchInsert { table: t, mapping: [0:0, 1:1] }
└─BatchExchange { order: [], dist: HashShard(t.a, t.b) }
└─BatchScan { table: t, columns: [t.a, t.b], distribution: SomeShard }
11 changes: 11 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/update.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,14 @@
│ └─BatchScan { table: t, columns: [t.a, t.b, t._row_id], distribution: UpstreamHashShard(t._row_id) }
└─BatchExchange { order: [], dist: HashShard(t.b) }
└─BatchScan { table: t, columns: [t.b], distribution: SomeShard }
- name: distributed update
sql: |
set batch_enable_distributed_dml = true;
create table t (a int, b int);
update t set a = a + 1;
batch_distributed_plan: |-
BatchSimpleAgg { aggs: [sum()] }
└─BatchExchange { order: [], dist: Single }
└─BatchUpdate { table: t, exprs: [($0 + 1:Int32), $1, $2] }
└─BatchExchange { order: [], dist: HashShard(t.a, t.b, t._row_id) }
└─BatchScan { table: t, columns: [t.a, t.b, t._row_id], distribution: UpstreamHashShard(t._row_id) }
47 changes: 42 additions & 5 deletions src/frontend/src/optimizer/plan_node/batch_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
// limitations under the License.

use risingwave_common::error::Result;
use risingwave_common::types::DataType;
use risingwave_expr::aggregate::AggKind;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::DeleteNode;

Expand All @@ -21,10 +23,12 @@ use super::utils::impl_distill_by_unit;
use super::{
generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch,
};
use crate::expr::InputRef;
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::generic::PhysicalPlanRef;
use crate::optimizer::plan_node::ToLocalBatch;
use crate::optimizer::plan_node::generic::{Agg, GenericPlanNode, PhysicalPlanRef};
use crate::optimizer::plan_node::{BatchSimpleAgg, PlanAggCall, ToLocalBatch};
use crate::optimizer::property::{Distribution, Order, RequiredDist};
use crate::utils::{Condition, IndexSet};

/// `BatchDelete` implements [`super::LogicalDelete`]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand All @@ -35,7 +39,6 @@ pub struct BatchDelete {

impl BatchDelete {
pub fn new(core: generic::Delete<PlanRef>) -> Self {
assert_eq!(core.input.distribution(), &Distribution::Single);
let base =
PlanBase::new_batch_with_core(&core, core.input.distribution().clone(), Order::any());
Self { base, core }
Expand All @@ -59,9 +62,43 @@ impl_distill_by_unit!(BatchDelete, core, "BatchDelete");

impl ToDistributedBatch for BatchDelete {
fn to_distributed(&self) -> Result<PlanRef> {
let new_input = RequiredDist::single()
if self
.core
.ctx()
.session_ctx()
.config()
.batch_enable_distributed_dml()
{
// Add an hash shuffle between the delete and its input.
let new_input = RequiredDist::PhysicalDist(Distribution::HashShard(
(0..self.input().schema().len()).collect(),
))
Comment on lines +68 to +71
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I remember correctly, the hash used in batch exchange is different from streaming, and we have added a so-called ConsistentHash for the streaming hash distribution.

HASH = 3;
CONSISTENT_HASH = 4;

Here which distribution does this generated BatchExchange follow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one HASH = 3

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I remember correctly, the hash used in batch exchange is different from streaming, and we have added a so-called ConsistentHash for the streaming hash distribution.

use batch hash distribution here is ok because we still have the stream hash exchange before the materialize executor

.enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?;
Ok(self.clone_with_input(new_input).into())
let new_delete: PlanRef = self.clone_with_input(new_input).into();
if self.core.returning {
Ok(new_delete)
} else {
let new_delete =
RequiredDist::single().enforce_if_not_satisfies(new_delete, &Order::any())?;
// Accumulate the affected rows.
let sum_agg = PlanAggCall {
agg_kind: AggKind::Sum,
return_type: DataType::Int64,
inputs: vec![InputRef::new(0, DataType::Int64)],
distinct: false,
order_by: vec![],
filter: Condition::true_cond(),
direct_args: vec![],
};
let agg = Agg::new(vec![sum_agg], IndexSet::empty(), new_delete);
let batch_agg = BatchSimpleAgg::new(agg);
Ok(batch_agg.into())
}
} else {
let new_input = RequiredDist::single()
.enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?;
Ok(self.clone_with_input(new_input).into())
}
}
}

Expand Down
47 changes: 42 additions & 5 deletions src/frontend/src/optimizer/plan_node/batch_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

use pretty_xmlish::XmlNode;
use risingwave_common::error::Result;
use risingwave_common::types::DataType;
use risingwave_expr::aggregate::AggKind;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::InsertNode;
use risingwave_pb::plan_common::{DefaultColumns, IndexAndExpr};
Expand All @@ -22,10 +24,12 @@ use super::batch::prelude::*;
use super::generic::GenericPlanRef;
use super::utils::{childless_record, Distill};
use super::{generic, ExprRewritable, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch};
use crate::expr::Expr;
use crate::expr::{Expr, InputRef};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::{PlanBase, ToLocalBatch};
use crate::optimizer::plan_node::generic::{Agg, GenericPlanNode};
use crate::optimizer::plan_node::{BatchSimpleAgg, PlanAggCall, PlanBase, ToLocalBatch};
use crate::optimizer::property::{Distribution, Order, RequiredDist};
use crate::utils::{Condition, IndexSet};

/// `BatchInsert` implements [`super::LogicalInsert`]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand All @@ -36,7 +40,6 @@ pub struct BatchInsert {

impl BatchInsert {
pub fn new(core: generic::Insert<PlanRef>) -> Self {
assert_eq!(core.input.distribution(), &Distribution::Single);
let base: PlanBase<Batch> =
PlanBase::new_batch_with_core(&core, core.input.distribution().clone(), Order::any());

Expand Down Expand Up @@ -69,9 +72,43 @@ impl_plan_tree_node_for_unary! { BatchInsert }

impl ToDistributedBatch for BatchInsert {
fn to_distributed(&self) -> Result<PlanRef> {
let new_input = RequiredDist::single()
if self
.core
.ctx()
.session_ctx()
.config()
.batch_enable_distributed_dml()
{
// Add an hash shuffle between the insert and its input.
let new_input = RequiredDist::PhysicalDist(Distribution::HashShard(
(0..self.input().schema().len()).collect(),
))
.enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?;
Ok(self.clone_with_input(new_input).into())
let new_insert: PlanRef = self.clone_with_input(new_input).into();
if self.core.returning {
Ok(new_insert)
} else {
let new_insert =
RequiredDist::single().enforce_if_not_satisfies(new_insert, &Order::any())?;
// Accumulate the affected rows.
let sum_agg = PlanAggCall {
agg_kind: AggKind::Sum,
return_type: DataType::Int64,
inputs: vec![InputRef::new(0, DataType::Int64)],
distinct: false,
order_by: vec![],
filter: Condition::true_cond(),
direct_args: vec![],
};
let agg = Agg::new(vec![sum_agg], IndexSet::empty(), new_insert);
let batch_agg = BatchSimpleAgg::new(agg);
Ok(batch_agg.into())
}
} else {
let new_input = RequiredDist::single()
.enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?;
Ok(self.clone_with_input(new_input).into())
}
}
}

Expand Down
50 changes: 44 additions & 6 deletions src/frontend/src/optimizer/plan_node/batch_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
use itertools::Itertools;
use risingwave_common::catalog::Schema;
use risingwave_common::error::Result;
use risingwave_common::types::DataType;
use risingwave_expr::aggregate::AggKind;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::UpdateNode;

Expand All @@ -24,10 +26,12 @@ use super::utils::impl_distill_by_unit;
use super::{
generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch,
};
use crate::expr::{Expr, ExprRewriter, ExprVisitor};
use crate::expr::{Expr, ExprRewriter, ExprVisitor, InputRef};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::ToLocalBatch;
use crate::optimizer::plan_node::generic::{Agg, GenericPlanNode};
use crate::optimizer::plan_node::{BatchSimpleAgg, PlanAggCall, ToLocalBatch};
use crate::optimizer::property::{Distribution, Order, RequiredDist};
use crate::utils::{Condition, IndexSet};

/// `BatchUpdate` implements [`super::LogicalUpdate`]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand All @@ -38,9 +42,9 @@ pub struct BatchUpdate {

impl BatchUpdate {
pub fn new(core: generic::Update<PlanRef>, schema: Schema) -> Self {
assert_eq!(core.input.distribution(), &Distribution::Single);
let ctx = core.input.ctx();
let base = PlanBase::new_batch(ctx, schema, Distribution::Single, Order::any());
let base =
PlanBase::new_batch(ctx, schema, core.input.distribution().clone(), Order::any());
Self { base, core }
}
}
Expand All @@ -62,9 +66,43 @@ impl_distill_by_unit!(BatchUpdate, core, "BatchUpdate");

impl ToDistributedBatch for BatchUpdate {
fn to_distributed(&self) -> Result<PlanRef> {
let new_input = RequiredDist::single()
if self
.core
.ctx()
.session_ctx()
.config()
.batch_enable_distributed_dml()
{
// Add an hash shuffle between the update and its input.
let new_input = RequiredDist::PhysicalDist(Distribution::HashShard(
(0..self.input().schema().len()).collect(),
))
.enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?;
Ok(self.clone_with_input(new_input).into())
let new_update: PlanRef = self.clone_with_input(new_input).into();
if self.core.returning {
Ok(new_update)
} else {
let new_insert =
RequiredDist::single().enforce_if_not_satisfies(new_update, &Order::any())?;
// Accumulate the affected rows.
let sum_agg = PlanAggCall {
agg_kind: AggKind::Sum,
return_type: DataType::Int64,
inputs: vec![InputRef::new(0, DataType::Int64)],
distinct: false,
order_by: vec![],
filter: Condition::true_cond(),
direct_args: vec![],
};
let agg = Agg::new(vec![sum_agg], IndexSet::empty(), new_insert);
let batch_agg = BatchSimpleAgg::new(agg);
Ok(batch_agg.into())
}
chenzl25 marked this conversation as resolved.
Show resolved Hide resolved
} else {
let new_input = RequiredDist::single()
.enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?;
Ok(self.clone_with_input(new_input).into())
}
}
}

Expand Down
9 changes: 6 additions & 3 deletions src/frontend/src/scheduler/distributed/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -708,9 +708,12 @@ impl StageRunner {
if candidates.is_empty() {
return Err(SchedulerError::EmptyWorkerNodes);
}
return Ok(Some(
candidates[self.stage.session_id.0 as usize % candidates.len()].clone(),
));
let candidate = if self.stage.batch_enable_distributed_dml {
candidates[task_id as usize % candidates.len()].clone()
} else {
candidates[self.stage.session_id.0 as usize % candidates.len()].clone()
};
chenzl25 marked this conversation as resolved.
Show resolved Hide resolved
return Ok(Some(candidate));
};

if let Some(distributed_lookup_join_node) =
Expand Down
Loading
Loading