Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(storage): fix compactor task parallelism race #16387

Merged
merged 15 commits into from
May 15, 2024

Conversation

Li0k
Copy link
Contributor

@Li0k Li0k commented Apr 18, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

When the compactor node receives an event from streaming RPC, it gonna spawn a new task to handle it. The compactor node generates a virtual compactor_runner in a separate task. And, check the resources of task and try to acquire the execute quota of the task before officially running the task. At the same time, the event loop is still receiving events from streaming RPC and initiating pull task events. Therefore, a race will occur between the spawn task and the main event loop.

In short, before acquiring quota, the event loop initiates a new pull task event, which may cause all newly acquired tasks to fail to acquire quota and be canceled.

This PR will fix the issue mentioned before.

  • Estimate the parallelism of the task before spawning the task (without accessing the object store).
  • Acquire the quota before the spawn task.
  • Avoid excessive spawn, simple events are completed directly in the main event loop
  • Some code Refactor

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

@github-actions github-actions bot added the type/fix Bug fix label Apr 18, 2024
@Li0k Li0k force-pushed the li0k/storage_compactor_parallelism_race branch from 681393b to 874ec03 Compare April 19, 2024 07:41
@Little-Wallace
Copy link
Contributor

Please link some issue or add more description for what this PR fix

…nto li0k/storage_compactor_parallelism_race
@Li0k Li0k requested review from Little-Wallace and hzxa21 April 22, 2024 08:37
Copy link
Contributor

@Little-Wallace Little-Wallace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we also execute try_require_memory before spawn compactor task to avoid memory not enough error?
Anyway, memory not enough may be unusual.
rest LGTM

Comment on lines 214 to 216
indexes.dedup();
if indexes.len() <= parallelism {
return Ok(vec![]);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this line causes a behavior change. Prior to this PR, when the condition is hit, we will only have one split. After this PR, we will have two splits. Is this expected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code implementation was traced back:

  1. generate_splits_fast will only be used when sstable_count > 256 and will not cause the above problems.
  2. even if the branch is triggered and 2 splits are generated, there will be no correctness problem

@Little-Wallace

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generate_splits_fast will only be used when sstable_count > 256 and will not cause the above problems.

It is not a good practice to rely on the caller's behavior to make sure the implementation of a function is expected. It is too implicit. What's the problem of directing returning 1 split if indexes.len() <= parallelism?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After refactoring the calculation, we can unify the parallelism calculation and eliminate the overuse of generate_splits_fast. We can then revert this code.

src/storage/src/hummock/compactor/compaction_utils.rs Outdated Show resolved Hide resolved
.map(|table_info| table_info.file_size)
.sum::<u64>();

generate_splits_fast(&sstable_infos, compaction_size, context)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using generate_splits_fast here seems to be an overkill because we don't actually need to calculate the index from sstable info and generate the split in order to know the task parallelism. We just need the logic in L183-194 and the input sst len:

    let worker_num = context.compaction_executor.worker_num();
    let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;

    let mut parallelism = (compaction_size + parallel_compact_size - 1) / parallel_compact_size;

    parallelism = std::cmp::min(
        worker_num,
        std::cmp::min(
            parallelism as usize,
            context.storage_opts.max_sub_compaction as usize,
        ),
    );

    if input_ssts.len() < parallelism {
        parallelism = 1
    }

Also, I think we can pass the parallelism as a parameter to generate_split and generate_split_fast because we already pre-calculate it after this PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I noticed that the parallelism calculation is not exactly the same in generate_splits (L268-L279) and generate_splits_fast (L183-L194). Is this expected? Which one should we follow here?

src/storage/src/hummock/compactor/compactor_runner.rs Outdated Show resolved Hide resolved
let all_ssts_are_blocked_filter = sstable_infos
.iter()
.all(|table_info| table_info.bloom_filter_kind() == BloomFilterType::Blocked);
let optimize_by_copy_block = optimize_by_copy_block(&compact_task, &context);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we already pre-calculated optimize_by_copy_block prior to entering the compact function in calculate_task_parallelism. I think we don't need to re-compute it again here. How about storing the parallelism and optimize_by_copy_block in CompactorContext so that they can be directly used here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we tolerate this issue in this PR? From my perspective, Context does not belong to a task, but to a virtual compactor runner. However, I don't have a better solution for the time being.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. It is okay. This is not a big deal.

@@ -102,7 +102,7 @@ impl CompactorContext {
}
}

pub fn acquire_task_quota(&self, parallelism: u32) -> bool {
pub fn acquire_task_quota(&self, parallelism: u32) -> Option<ReleaseGuard> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. We don't need to store max_task_parallelism in compactor conext becuase it will only be used in the compactor event loop.
  2. max_task_parallelism doesn't need to be a atomic because it will only be accessed in a single thread. A local var in the event loop is sufficient.
  3. due to 1 and 2, acquire_task_quota, release_task_quota and get_free_quota can be removed.


ResponseEvent::PullTaskAck(_pull_task_ack) => {
// set flag
pull_task_ack.store(true, Ordering::SeqCst);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is pull_task_ack a atomic? I think it can be a local var because it is only accessed in the event loop in a single-threaded manner.

src/storage/src/hummock/compactor/mod.rs Show resolved Hide resolved
src/storage/src/hummock/compactor/mod.rs Outdated Show resolved Hide resolved
src/storage/src/hummock/compactor/mod.rs Outdated Show resolved Hide resolved
@Li0k Li0k added this pull request to the merge queue May 15, 2024
Merged via the queue into main with commit acf7a26 May 15, 2024
27 of 28 checks passed
@Li0k Li0k deleted the li0k/storage_compactor_parallelism_race branch May 15, 2024 07:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/fix Bug fix
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants