-
Notifications
You must be signed in to change notification settings - Fork 594
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(storage): seperate timeout between process and heartbeat #14366
Conversation
} | ||
|
||
fn get_heartbeat_expired_tasks( | ||
task_heartbeats: &HashMap<HummockCompactionTaskId, TaskHeartbeat>, | ||
now: u64, | ||
heartbeat_expiry_ts: u64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it a typo ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not a typo. After we distinguish process and heartbeat timeout, the background thread only needs to check expired_task based on heartbeat_expiry_ts
@@ -321,15 +332,19 @@ impl CompactorManagerInner { | |||
let mut cancel_tasks = vec![]; | |||
for progress in progress_list { | |||
if let Some(task_ref) = self.task_heartbeats.get_mut(&progress.task_id) { | |||
task_ref.update_at = now; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In original implement, we update "expire_at" to "expire_at + 60 secs". And we check it with "now - 30secs". It means that the task will be cancled after "90s" not changed.
But now, we will cancel task if it did not receive heartbeat during "30s". Is it too short ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM!
self.num_progress_key | ||
.fetch_add(inc_key_num, Ordering::Relaxed); | ||
pub fn inc_num_pending_read_io(&self) { | ||
self.num_pending_read_io |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason to use SeqCst
? If this atomic operation is frequent, there may be some performance issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This operation is infrequent, but it does not require precise values. I will change it to Relaxed.
.duration_since(SystemTime::UNIX_EPOCH) | ||
.expect("Clock may have gone backwards") | ||
.as_secs() | ||
- interval; | ||
Self::get_heartbeat_expired_tasks(&self.task_heartbeats, now) | ||
- self.heartbeat_expiry_seconds; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
expiry
-> expire
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Signed-off-by: Bugen Zhao <[email protected]> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: August <[email protected]>
…behavior (#14345) Signed-off-by: TennyZhuang <[email protected]>
Co-authored-by: Bohan Zhang <[email protected]>
Signed-off-by: tabVersion <[email protected]> Co-authored-by: cheskel <[email protected]> Co-authored-by: xxchan <[email protected]>
…es (#14364) Signed-off-by: TennyZhuang <[email protected]>
…ave been marked as created (#14367) Co-authored-by: zwang28 <[email protected]>
Co-authored-by: KeXiangWang <[email protected]>
1faf483
to
58b3bb3
Compare
…nto li0k/storage_task_expire
…nto li0k/storage_task_expire
…nto li0k/storage_task_expire
…nto li0k/storage_task_expire
.duration_since(SystemTime::UNIX_EPOCH) | ||
.expect("Clock may have gone backwards") | ||
.as_secs() | ||
- interval; | ||
Self::get_heartbeat_expired_tasks(&self.task_heartbeats, now) | ||
- self.heartbeat_expiry_seconds; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
@@ -104,6 +108,11 @@ pub struct MergeIteratorInner<I: HummockIterator, NE: NodeExtraOrderInfo> { | |||
heap: BinaryHeap<Node<I, NE>>, | |||
|
|||
last_table_key: Vec<u8>, | |||
|
|||
// compactor task progress report | |||
task_progress: Option<Arc<TaskProgress>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is a bad idea to embed a compactor related concept into a generic merge iterator. cc @wenym1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, compact_and_build_sst
takes a HummockIterator
, not MergeIterator
so it is still possible we miss to update task progress. How about having a wrapper iterator that impl HummockIterator
to wrap the underlying HummockIterator
to do key progress tracking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, introduced a new wrapper iterator to check the processed key
…nto li0k/storage_task_expire
…nto li0k/storage_task_expire
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Signed-off-by: Bugen Zhao <[email protected]> Signed-off-by: TennyZhuang <[email protected]> Signed-off-by: tabVersion <[email protected]> Co-authored-by: Bugen Zhao <[email protected]> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: Eric Fu <[email protected]> Co-authored-by: August <[email protected]> Co-authored-by: TennyZhuang <[email protected]> Co-authored-by: xxchan <[email protected]> Co-authored-by: Bohan Zhang <[email protected]> Co-authored-by: cheskel <[email protected]> Co-authored-by: Xinhao Xu <[email protected]> Co-authored-by: Yuhao Su <[email protected]> Co-authored-by: stonepage <[email protected]> Co-authored-by: zwang28 <[email protected]> Co-authored-by: Kexiang Wang <[email protected]> Co-authored-by: KeXiangWang <[email protected]>
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
#14327
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.