-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(storage): fix compactor task parallelism race #16387
Conversation
681393b
to
874ec03
Compare
Please link some issue or add more description for what this PR fix |
…nto li0k/storage_compactor_parallelism_race
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we also execute try_require_memory
before spawn compactor task to avoid memory not enough error?
Anyway, memory not enough may be unusual.
rest LGTM
…nto li0k/storage_compactor_parallelism_race
indexes.dedup(); | ||
if indexes.len() <= parallelism { | ||
return Ok(vec![]); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing this line causes a behavior change. Prior to this PR, when the condition is hit, we will only have one split. After this PR, we will have two splits. Is this expected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code implementation was traced back:
- generate_splits_fast will only be used when sstable_count > 256 and will not cause the above problems.
- even if the branch is triggered and 2 splits are generated, there will be no correctness problem
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generate_splits_fast will only be used when sstable_count > 256 and will not cause the above problems.
It is not a good practice to rely on the caller's behavior to make sure the implementation of a function is expected. It is too implicit. What's the problem of directing returning 1 split if indexes.len() <= parallelism
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After refactoring the calculation, we can unify the parallelism calculation and eliminate the overuse of generate_splits_fast
. We can then revert this code.
.map(|table_info| table_info.file_size) | ||
.sum::<u64>(); | ||
|
||
generate_splits_fast(&sstable_infos, compaction_size, context) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using generate_splits_fast
here seems to be an overkill because we don't actually need to calculate the index from sstable info and generate the split in order to know the task parallelism. We just need the logic in L183-194 and the input sst len:
let worker_num = context.compaction_executor.worker_num();
let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
let mut parallelism = (compaction_size + parallel_compact_size - 1) / parallel_compact_size;
parallelism = std::cmp::min(
worker_num,
std::cmp::min(
parallelism as usize,
context.storage_opts.max_sub_compaction as usize,
),
);
if input_ssts.len() < parallelism {
parallelism = 1
}
Also, I think we can pass the parallelism as a parameter to generate_split
and generate_split_fast
because we already pre-calculate it after this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I noticed that the parallelism calculation is not exactly the same in generate_splits
(L268-L279) and generate_splits_fast
(L183-L194). Is this expected? Which one should we follow here?
let all_ssts_are_blocked_filter = sstable_infos | ||
.iter() | ||
.all(|table_info| table_info.bloom_filter_kind() == BloomFilterType::Blocked); | ||
let optimize_by_copy_block = optimize_by_copy_block(&compact_task, &context); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that we already pre-calculated optimize_by_copy_block
prior to entering the compact
function in calculate_task_parallelism
. I think we don't need to re-compute it again here. How about storing the parallelism
and optimize_by_copy_block
in CompactorContext
so that they can be directly used here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we tolerate this issue in this PR? From my perspective, Context
does not belong to a task, but to a virtual compactor runner. However, I don't have a better solution for the time being.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. It is okay. This is not a big deal.
@@ -102,7 +102,7 @@ impl CompactorContext { | |||
} | |||
} | |||
|
|||
pub fn acquire_task_quota(&self, parallelism: u32) -> bool { | |||
pub fn acquire_task_quota(&self, parallelism: u32) -> Option<ReleaseGuard> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- We don't need to store
max_task_parallelism
in compactor conext becuase it will only be used in the compactor event loop. max_task_parallelism
doesn't need to be a atomic because it will only be accessed in a single thread. A local var in the event loop is sufficient.- due to 1 and 2,
acquire_task_quota
,release_task_quota
andget_free_quota
can be removed.
|
||
ResponseEvent::PullTaskAck(_pull_task_ack) => { | ||
// set flag | ||
pull_task_ack.store(true, Ordering::SeqCst); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is pull_task_ack
a atomic? I think it can be a local var because it is only accessed in the event loop in a single-threaded manner.
…nto li0k/storage_compactor_parallelism_race
…nto li0k/storage_compactor_parallelism_race
…nto li0k/storage_compactor_parallelism_race
…nto li0k/storage_compactor_parallelism_race
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
When the compactor node receives an event from streaming RPC, it gonna spawn a new task to handle it. The compactor node generates a virtual compactor_runner in a separate task. And, check the resources of task and try to acquire the execute quota of the task before officially running the task. At the same time, the event loop is still receiving events from streaming RPC and initiating pull task events. Therefore, a race will occur between the spawn task and the main event loop.
In short, before acquiring quota, the event loop initiates a new pull task event, which may cause all newly acquired tasks to fail to acquire quota and be canceled.
This PR will fix the issue mentioned before.
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.