-
Notifications
You must be signed in to change notification settings - Fork 596
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(meta): persist job-level max parallelism & check when ALTER .. SET PARALLELISM
#18740
Merged
+250
−44
Merged
Changes from 5 commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
754b66a
rename expected_vnode_count to max_parallelism
BugenZhao 59e6c8e
persist table fragments max parallelism
BugenZhao 4d745ea
fetch max parallelism to check alter parallelism
BugenZhao 48f5f3c
update test
BugenZhao 9736d32
add e2e test
BugenZhao 10f8a87
rename method
BugenZhao File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
statement ok | ||
create view table_parallelism as select t.name, tf.parallelism from rw_tables t, rw_table_fragments tf where t.id = tf.table_id; | ||
|
||
#### BEGIN | ||
|
||
|
||
statement ok | ||
set streaming_max_parallelism to 4; | ||
|
||
# When the parallelism is specified to a value greater than the max parallelism, return an error. | ||
statement ok | ||
set streaming_parallelism to 6; | ||
|
||
statement error specified parallelism 6 should not exceed max parallelism 4 | ||
create table t; | ||
|
||
# When the parallelism is specified to an valid value, ok. | ||
statement ok | ||
set streaming_parallelism to 4; | ||
|
||
statement ok | ||
create table t; | ||
|
||
query T | ||
select parallelism from table_parallelism where name = 't'; | ||
---- | ||
FIXED(4) | ||
|
||
statement ok | ||
drop table t; | ||
|
||
# When no parallelism is specified, ok, and the parallelism will be adaptive. | ||
|
||
statement ok | ||
set streaming_parallelism to default; | ||
|
||
statement ok | ||
create table t; | ||
|
||
query T | ||
select parallelism from table_parallelism where name = 't'; | ||
---- | ||
ADAPTIVE | ||
|
||
# Alter parallelism to a valid value, ok. | ||
statement ok | ||
alter table t set parallelism to 4; | ||
|
||
query T | ||
select parallelism from table_parallelism where name = 't'; | ||
---- | ||
FIXED(4) | ||
|
||
# Alter parallelism to an invalid value, return an error. | ||
statement error specified parallelism 8 should not exceed max parallelism 4 | ||
alter table t set parallelism to 8; | ||
|
||
statement ok | ||
drop table t; | ||
|
||
#### END | ||
|
||
statement ok | ||
set streaming_max_parallelism to default; | ||
|
||
statement ok | ||
set streaming_parallelism to default; | ||
|
||
statement ok | ||
drop view table_parallelism; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -16,7 +16,7 @@ use std::collections::{BTreeMap, HashMap, HashSet}; | |||||
use std::pin::pin; | ||||||
use std::time::Duration; | ||||||
|
||||||
use anyhow::anyhow; | ||||||
use anyhow::{anyhow, Context}; | ||||||
use futures::future::{select, Either}; | ||||||
use risingwave_common::catalog::{TableId, TableOption}; | ||||||
use risingwave_meta_model_v2::{ObjectId, SourceId}; | ||||||
|
@@ -892,6 +892,24 @@ impl MetadataManager { | |||||
} | ||||||
} | ||||||
|
||||||
pub async fn get_table_max_parallelism(&self, table_id: TableId) -> MetaResult<usize> { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit.
Suggested change
|
||||||
match self { | ||||||
MetadataManager::V1(mgr) => { | ||||||
let fragments = mgr.fragment_manager.get_fragment_read_guard().await; | ||||||
Ok(fragments | ||||||
.table_fragments() | ||||||
.get(&table_id) | ||||||
.map(|tf| tf.max_parallelism) | ||||||
.with_context(|| format!("job {table_id} not found"))?) | ||||||
} | ||||||
MetadataManager::V2(mgr) => { | ||||||
mgr.catalog_controller | ||||||
.get_max_parallelism_by_id(table_id.table_id as _) | ||||||
.await | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
pub fn cluster_id(&self) -> &ClusterId { | ||||||
match self { | ||||||
MetadataManager::V1(mgr) => mgr.cluster_manager.cluster_id(), | ||||||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally, I'd like to use
vnode_count
for all internal occurrences, and only use "max_parallelism" for the user-facing part. Because we (RW developers) are familiar with vnode, sovnode_count
sounds like the most straight-forward naming.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My major concern is that,
vnode_count
of a streaming job may not be a physical concept at all. But it's true that prefixing withexpected
makes it doesn't sounds that confusing. Will change.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Upon further consideration, I still think
max_parallelism
is more understandable because it aligns better with existing job attributes likeparallelism
(orassigned_parallelism
). When the scope goes to fragment, it'll still be named asvnode_count
.