Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch): distributed dml #14630

Merged
merged 14 commits into from
Jan 18, 2024
Merged

feat(batch): distributed dml #14630

merged 14 commits into from
Jan 18, 2024

Conversation

chenzl25
Copy link
Contributor

@chenzl25 chenzl25 commented Jan 17, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

  • Resolve feat: Distributed Insertion #14574
  • Add session variable batch_enable_distributed_dml to control whether to execute DML in a distributed way.
  • Currently, we use a hash shuffle between batch insert/delete/update and its input. In the future, we could use a round robin instead to avoid the hash calculation cost if necessary.
  • Use a sum Agg to accumulate the affected rows when we plan a distributed DML.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

  • Introduce a session variable batch_enable_distributed_dml to enable batch ingesting. Enable distributed DML, so an insert, delete, and update statement can be executed in a distributed way (e.g. running in multiple compute nodes).No atomicity guarantee in this mode. Its goal is to gain the best ingestion performance for initial batch ingestion where users always can drop their table when failure happens.

@chenzl25
Copy link
Contributor Author

chenzl25 commented Jan 17, 2024

Experiment:
Table t contains 20million rows about 1.6GB data

4CN: each 1c4g

Test: insert select

CREATE TABLE t2 (a INT, b CHARACTER VARYING) Append only;
insert into t2 select * from t;

distributed_dml = true (66% performance better)

Time: 30s
CPU utilization of each CN is even.

distributed_dml = false

Time: 50s
One CN CPU utilization has reached 100%

image

@chenzl25
Copy link
Contributor Author

chenzl25 commented Jan 17, 2024

However, when we use CREATE TABLE t2 (a INT, b CHARACTER VARYING) as the destination table. There is no big difference between distributed DML and the non-distributed one, because conflict check causes a back-pressure.

It takes about 180s, so we need to improve the table (with conflict check) ingestion performance.

image

Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

@@ -94,6 +94,10 @@ pub struct ConfigMap {
#[parameter(default = true, rename = "rw_batch_enable_sort_agg")]
batch_enable_sort_agg: bool,

/// Enable distributed dml, so a insert, delete and update statement could be executed in distributed way (e.g. running in multiple compute nodes).
#[parameter(default = false, rename = "batch_enable_distributed_dml")]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do some performance test and make default to true.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make default to true

It comes with the cost for no atomicity guarantee. 😕

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we support reading external sources (e.g. iceberg) in the future, we can detect it from the plan and auto-enable it.

Comment on lines +72 to +75
// Add an hash shuffle between the delete and its input.
let new_input = RequiredDist::PhysicalDist(Distribution::HashShard(
(0..self.input().schema().len()).collect(),
))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I remember correctly, the hash used in batch exchange is different from streaming, and we have added a so-called ConsistentHash for the streaming hash distribution.

HASH = 3;
CONSISTENT_HASH = 4;

Here which distribution does this generated BatchExchange follow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one HASH = 3

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I remember correctly, the hash used in batch exchange is different from streaming, and we have added a so-called ConsistentHash for the streaming hash distribution.

use batch hash distribution here is ok because we still have the stream hash exchange before the materialize executor

@BugenZhao
Copy link
Member

May I ask how the atomicity is guaranteed under distributed mode?

@chenzl25
Copy link
Contributor Author

May I ask how the atomicity is guaranteed under distributed mode?

No atomicity guarantee in this mode. Its goal is to gain the best ingestion performance for initial batch ingestion where users always can drop their table when failure happens.

@BugenZhao
Copy link
Member

May I ask how the atomicity is guaranteed under distributed mode?

No atomicity guarantee in this mode. Its goal is to gain the best ingestion performance for initial batch ingestion where users always can drop their table when failure happens.

Got it. What about documenting it somewhere?

@BugenZhao BugenZhao added the user-facing-changes Contains changes that are visible to users label Jan 18, 2024
Copy link
Member

@BugenZhao BugenZhao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM

Comment on lines +41 to +43
BatchSimpleAgg { aggs: [sum()] }
└─BatchExchange { order: [], dist: Single }
└─BatchDelete { table: t }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reminds me of an issue in ancient times 🤣

#2678

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤣

src/frontend/src/optimizer/plan_node/batch_update.rs Outdated Show resolved Hide resolved
src/frontend/src/scheduler/distributed/stage.rs Outdated Show resolved Hide resolved
@chenzl25 chenzl25 requested review from fuyufjh and BugenZhao January 18, 2024 04:27
@st1page
Copy link
Contributor

st1page commented Jan 18, 2024

However, when we use CREATE TABLE t2 (a INT, b CHARACTER VARYING) as the destination table. There is no big difference between distributed DML and the non-distributed one, because conflict check causes a back-pressure.

It takes about 180s, so we need to improve the table (with conflict check) ingestion performance.

It is strange and we need do some invesitigation later.
Because in the

Experiment:
Table t contains 2000W rows about 1.6GB data

4CN: each 1c4g

All keys should be in the cache and the handle conflict should not be the bottleneck.
does all new inserted key has the new pk? maybe a bloomfilter in memory can help the situation.

@chenzl25
Copy link
Contributor Author

All keys should be in the cache and the handle conflict should not be the bottleneck. does all new inserted key has the new pk? maybe a bloomfilter in memory can help the situation.

Table t could be in the cache, but t2 is a newly created table and the compactor is running.

@st1page
Copy link
Contributor

st1page commented Jan 18, 2024

All keys should be in the cache and the handle conflict should not be the bottleneck. does all new inserted key has the new pk? maybe a bloomfilter in memory can help the situation.

Table t could be in the cache, but t2 is a newly created table and the compactor is running.

Ok, So we need a way to check those non-append-only tables without pk to check if the row_id in the insert opRow is a generated by row_id_gen (do not need to handle conflict)?

@chenzl25
Copy link
Contributor Author

Table t could be in the cache, but t2 is a newly created table and the compactor is running.

Let me kill the compactor to test it again first as suggested by @hzxa21

@chenzl25
Copy link
Contributor Author

Table t could be in the cache, but t2 is a newly created table and the compactor is running.

Let me kill the compactor to test it again first as suggested by @hzxa21

Without compactors, barriers would pile up when batch ingestion.

@chenzl25
Copy link
Contributor Author

chenzl25 commented Jan 18, 2024

Ok, So we need a way to check those non-append-only tables without pk to check if the row_id in the insert opRow is a generated by row_id_gen (do not need to handle conflict)?

#14635 I think we can just disable the conflict check for the table without any downstream mv. Our storage actually has provided overwrite semantic.

@chenzl25 chenzl25 added this pull request to the merge queue Jan 18, 2024
@github-merge-queue github-merge-queue bot removed this pull request from the merge queue due to failed status checks Jan 18, 2024
@chenzl25 chenzl25 added this pull request to the merge queue Jan 18, 2024
Merged via the queue into main with commit 0f79291 Jan 18, 2024
27 of 28 checks passed
@chenzl25 chenzl25 deleted the dylan/distributed_dml branch January 18, 2024 15:40
Little-Wallace pushed a commit that referenced this pull request Jan 20, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/feature user-facing-changes Contains changes that are visible to users
Projects
None yet
Development

Successfully merging this pull request may close these issues.

feat: Distributed Insertion
5 participants