-
Notifications
You must be signed in to change notification settings - Fork 598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(batch): distributed dml #14630
feat(batch): distributed dml #14630
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
@@ -94,6 +94,10 @@ pub struct ConfigMap { | |||
#[parameter(default = true, rename = "rw_batch_enable_sort_agg")] | |||
batch_enable_sort_agg: bool, | |||
|
|||
/// Enable distributed dml, so a insert, delete and update statement could be executed in distributed way (e.g. running in multiple compute nodes). | |||
#[parameter(default = false, rename = "batch_enable_distributed_dml")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's do some performance test and make default to true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make default to true
It comes with the cost for no atomicity guarantee. 😕
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we support reading external sources (e.g. iceberg) in the future, we can detect it from the plan and auto-enable it.
// Add an hash shuffle between the delete and its input. | ||
let new_input = RequiredDist::PhysicalDist(Distribution::HashShard( | ||
(0..self.input().schema().len()).collect(), | ||
)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I remember correctly, the hash
used in batch exchange is different from streaming, and we have added a so-called ConsistentHash
for the streaming hash distribution.
risingwave/proto/batch_plan.proto
Lines 344 to 345 in f85de37
HASH = 3; | |
CONSISTENT_HASH = 4; |
Here which distribution does this generated BatchExchange
follow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one HASH = 3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I remember correctly, the hash used in batch exchange is different from streaming, and we have added a so-called ConsistentHash for the streaming hash distribution.
use batch hash distribution here is ok because we still have the stream hash exchange before the materialize executor
May I ask how the atomicity is guaranteed under distributed mode? |
No atomicity guarantee in this mode. Its goal is to gain the best ingestion performance for initial batch ingestion where users always can drop their table when failure happens. |
Got it. What about documenting it somewhere? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM
BatchSimpleAgg { aggs: [sum()] } | ||
└─BatchExchange { order: [], dist: Single } | ||
└─BatchDelete { table: t } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This reminds me of an issue in ancient times 🤣
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤣
It is strange and we need do some invesitigation later.
All keys should be in the cache and the handle conflict should not be the bottleneck. |
Table t could be in the cache, but t2 is a newly created table and the compactor is running. |
Ok, So we need a way to check those non-append-only tables without pk to check if the |
Let me kill the compactor to test it again first as suggested by @hzxa21 |
Without compactors, barriers would pile up when batch ingestion. |
#14635 I think we can just disable the conflict check for the table without any downstream mv. Our storage actually has provided overwrite semantic. |
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
batch_enable_distributed_dml
to control whether to execute DML in a distributed way.Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.
batch_enable_distributed_dml
to enable batch ingesting. Enable distributed DML, so an insert, delete, and update statement can be executed in a distributed way (e.g. running in multiple compute nodes).No atomicity guarantee in this mode. Its goal is to gain the best ingestion performance for initial batch ingestion where users always can drop their table when failure happens.