-
Notifications
You must be signed in to change notification settings - Fork 598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(frontend): support execute insert in local mode #8208
Conversation
5435eb3
to
a9b9424
Compare
a9b9424
to
e4420cf
Compare
b128755
to
2c9da12
Compare
Codecov Report
@@ Coverage Diff @@
## main #8208 +/- ##
==========================================
- Coverage 71.65% 71.63% -0.02%
==========================================
Files 1131 1131
Lines 184150 184230 +80
==========================================
+ Hits 131948 131978 +30
- Misses 52202 52252 +50
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
Do you want to test |
Yes, how I test that |
QA team will add local mode into the existing performance test pipeline |
src/frontend/src/scheduler/local.rs
Outdated
Ok(vec![self.front_env.worker_node_manager().next_random()?]) | ||
} | ||
} else { | ||
Ok(self.front_env.worker_node_manager().list_worker_nodes()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm should we list all worker nodes here?
What if parallelism = 3, and worker nodes = 5, does that mean we choose to schedule 5 workers?
Should it match exactly? i.e. choose N workers for N parallelism.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be workers with target table only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have processed the case that the stage have scan node before, so in this case the stage don't have the scan node, I think we just need to randomlly select N parallelism worker?🤔
(Previous process is list worker node)
self.front_env.worker_node_manager().list_worker_nodes() |
let worker_node = { | ||
let parallel_unit_ids = vnode_mapping.iter_unique().collect_vec(); | ||
let candidates = self.front_env | ||
.worker_node_manager() | ||
.get_workers_by_parallel_unit_ids(¶llel_unit_ids)?; | ||
candidates.choose(&mut rand::thread_rng()).unwrap().clone() | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need get_workers_by_parallel_unit_ids
here? Didn't use this before, so not too familiar.
In the case of Insert
why is this needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The insert executor will send the insert data to the reader registered by the dml executor.
risingwave/src/batch/src/executor/insert.rs
Line 134 in 1ad23ba
.write_chunk(self.table_id, self.table_version_id, stream_chunk) |
risingwave/src/stream/src/executor/dml.rs
Line 94 in 1ad23ba
let batch_reader = batch_reader.stream_reader().into_stream(); |
Hence to access the reader, the insert executor need to schedule the same worker node of the dml executor. So
get_workers_by_parallel_unit_ids
is to get the worker node where dml executor stay in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, thanks for the clear explanation!
Maybe this can be documented, since the logic is split in various places, it does not seem very clear to me at first glance.
(Unless some documentation already exists, in that case feel free to ignore).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with you. (Seems don't have a related doc)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM, just require some refinement.
src/frontend/src/scheduler/local.rs
Outdated
Ok(vec![self.front_env.worker_node_manager().next_random()?]) | ||
} | ||
} else { | ||
Ok(self.front_env.worker_node_manager().list_worker_nodes()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be workers with target table only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
related issue: #7684
support to execute insert without using select query in local mode, such as:
insert into t values (1)
For other dml like: insert-select, delete, update, their plan has more than two stage so that we can't execute them in local mode.
Checklist For Contributors
./risedev check
(or alias,./risedev c
)Checklist For Reviewers
Documentation
Click here for Documentation
Types of user-facing changes
Release note
In past the insert will be executed in distributed mode, but now the insert without using select query will executed in local mode.