-
Notifications
You must be signed in to change notification settings - Fork 591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sql-backend): support table replace for sql-backend #14415
Conversation
…shboard, metric etc
a9d54d4
to
8aecb4b
Compare
dd8e14e
to
ede0956
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some comments for better review.
src/meta/model_v2/src/table.rs
Outdated
@@ -187,10 +187,28 @@ impl From<PbTable> for ActiveModel { | |||
let table_type = pb_table.table_type(); | |||
let handle_pk_conflict_behavior = pb_table.handle_pk_conflict_behavior(); | |||
|
|||
let fragment_id = if pb_table.fragment_id == u32::MAX - 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are some workaround to determinate wether it's new created or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, this is the placeholder ID. Can we extract a constant as it's not intuitive?
} | ||
|
||
// 2. create streaming object for new replace table. | ||
let obj_id = Self::create_streaming_job_obj( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we still use a dummy id that represents a new streaming job to achieve table replacement. The dummy streaming job will alway be Foreground
, so that if recovery encountered and the replacement is not completed it will be cleaned.
.await?; | ||
|
||
// 3. record dependency for new replace table. | ||
ObjectDependency::insert(object_dependency::ActiveModel { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Record the dependency info for replacement, so that the table is not allowed to be dropped during replacement. The record will be deleted cascade when the dummy job is removed after finish or recovery.
let table = table::ActiveModel::from(table).update(&txn).await?; | ||
|
||
// 1. replace old fragments/actors with new ones. | ||
Fragment::delete_many() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete old fragments and actors and move the new ones from dummy job to it.
|
||
// TODO: remove cache upstream fragment/actor ids and derive them from `actor_dispatcher` table. | ||
let mut to_update_fragment_ids = HashSet::new(); | ||
for merge_update in merge_updates { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update merge information in downstream fragments and actors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rubber stamp
src/meta/model_v2/src/table.rs
Outdated
@@ -187,10 +187,28 @@ impl From<PbTable> for ActiveModel { | |||
let table_type = pb_table.table_type(); | |||
let handle_pk_conflict_behavior = pb_table.handle_pk_conflict_behavior(); | |||
|
|||
let fragment_id = if pb_table.fragment_id == u32::MAX - 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, this is the placeholder ID. Can we extract a constant as it's not intuitive?
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.