-
Notifications
You must be signed in to change notification settings - Fork 598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(batch): distributed dml #14630
feat(batch): distributed dml #14630
Changes from 12 commits
12e24be
96e9c98
86487f1
818f38b
56baffe
851be91
2fdbdae
0baacc0
a5510ea
4f5e330
7065f69
2f43d5d
3c8c497
b42417a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,3 +32,14 @@ | |
create table t (a int, b int); | ||
delete from t returning sum(a); | ||
binder_error: 'Bind error: should not have agg/window in the `RETURNING` list' | ||
- name: distributed delete | ||
sql: | | ||
set batch_enable_distributed_dml = true; | ||
create table t (a int, b int); | ||
delete from t; | ||
batch_distributed_plan: |- | ||
BatchSimpleAgg { aggs: [sum()] } | ||
└─BatchExchange { order: [], dist: Single } | ||
└─BatchDelete { table: t } | ||
Comment on lines
+41
to
+43
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This reminds me of an issue in ancient times 🤣 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤣 |
||
└─BatchExchange { order: [], dist: HashShard(t.a, t.b, t._row_id) } | ||
└─BatchScan { table: t, columns: [t.a, t.b, t._row_id], distribution: UpstreamHashShard(t._row_id) } |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -13,6 +13,8 @@ | |||||
// limitations under the License. | ||||||
|
||||||
use risingwave_common::error::Result; | ||||||
use risingwave_common::types::DataType; | ||||||
use risingwave_expr::aggregate::AggKind; | ||||||
use risingwave_pb::batch_plan::plan_node::NodeBody; | ||||||
use risingwave_pb::batch_plan::DeleteNode; | ||||||
|
||||||
|
@@ -21,10 +23,12 @@ use super::utils::impl_distill_by_unit; | |||||
use super::{ | ||||||
generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, | ||||||
}; | ||||||
use crate::expr::InputRef; | ||||||
use crate::optimizer::plan_node::expr_visitable::ExprVisitable; | ||||||
use crate::optimizer::plan_node::generic::PhysicalPlanRef; | ||||||
use crate::optimizer::plan_node::ToLocalBatch; | ||||||
use crate::optimizer::plan_node::generic::{Agg, GenericPlanNode, PhysicalPlanRef}; | ||||||
use crate::optimizer::plan_node::{BatchSimpleAgg, PlanAggCall, ToLocalBatch}; | ||||||
use crate::optimizer::property::{Distribution, Order, RequiredDist}; | ||||||
use crate::utils::{Condition, IndexSet}; | ||||||
|
||||||
/// `BatchDelete` implements [`super::LogicalDelete`] | ||||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)] | ||||||
|
@@ -35,7 +39,6 @@ pub struct BatchDelete { | |||||
|
||||||
impl BatchDelete { | ||||||
pub fn new(core: generic::Delete<PlanRef>) -> Self { | ||||||
assert_eq!(core.input.distribution(), &Distribution::Single); | ||||||
let base = | ||||||
PlanBase::new_batch_with_core(&core, core.input.distribution().clone(), Order::any()); | ||||||
Self { base, core } | ||||||
|
@@ -59,9 +62,43 @@ impl_distill_by_unit!(BatchDelete, core, "BatchDelete"); | |||||
|
||||||
impl ToDistributedBatch for BatchDelete { | ||||||
fn to_distributed(&self) -> Result<PlanRef> { | ||||||
let new_input = RequiredDist::single() | ||||||
if self | ||||||
.core | ||||||
.ctx() | ||||||
.session_ctx() | ||||||
.config() | ||||||
.batch_enable_distributed_dml() | ||||||
{ | ||||||
// Add an hash shuffle between the delete and its input. | ||||||
let new_input = RequiredDist::PhysicalDist(Distribution::HashShard( | ||||||
(0..self.input().schema().len()).collect(), | ||||||
)) | ||||||
Comment on lines
+68
to
+71
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I remember correctly, the risingwave/proto/batch_plan.proto Lines 344 to 345 in f85de37
Here which distribution does this generated There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This one There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
use batch hash distribution here is ok because we still have the stream hash exchange before the materialize executor |
||||||
.enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?; | ||||||
Ok(self.clone_with_input(new_input).into()) | ||||||
let new_delete: PlanRef = self.clone_with_input(new_input).into(); | ||||||
if self.core.returning { | ||||||
Ok(new_delete) | ||||||
} else { | ||||||
let new_delete = | ||||||
RequiredDist::single().enforce_if_not_satisfies(new_delete, &Order::any())?; | ||||||
// Accumulate the affected rows. | ||||||
let sum_agg = PlanAggCall { | ||||||
agg_kind: AggKind::Sum, | ||||||
return_type: DataType::Int64, | ||||||
inputs: vec![InputRef::new(0, DataType::Int64)], | ||||||
distinct: false, | ||||||
order_by: vec![], | ||||||
filter: Condition::true_cond(), | ||||||
direct_args: vec![], | ||||||
}; | ||||||
let agg = Agg::new(vec![sum_agg], IndexSet::empty(), new_delete); | ||||||
let batch_agg = BatchSimpleAgg::new(agg); | ||||||
Ok(batch_agg.into()) | ||||||
} | ||||||
} else { | ||||||
let new_input = RequiredDist::single() | ||||||
.enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?; | ||||||
Ok(self.clone_with_input(new_input).into()) | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's do some performance test and make default to true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It comes with the cost for no atomicity guarantee. 😕
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we support reading external sources (e.g. iceberg) in the future, we can detect it from the plan and auto-enable it.