Skip to content

Commit

Permalink
fix(storage): fix compactor task parallelism race (#16387)
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k committed May 29, 2024
1 parent ce6c8de commit 242990b
Show file tree
Hide file tree
Showing 8 changed files with 467 additions and 403 deletions.
9 changes: 0 additions & 9 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::net::SocketAddr;
use std::sync::atomic::AtomicU32;
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -215,12 +214,6 @@ pub async fn compute_node_serve(
));

let compaction_executor = Arc::new(CompactionExecutor::new(Some(1)));
let max_task_parallelism = Arc::new(AtomicU32::new(
(compaction_executor.worker_num() as f32
* storage_opts.compactor_max_task_multiplier)
.ceil() as u32,
));

let compactor_context = CompactorContext {
storage_opts,
sstable_store: storage.sstable_store(),
Expand All @@ -233,8 +226,6 @@ pub async fn compute_node_serve(
await_tree_reg: await_tree_config
.clone()
.map(new_compaction_await_tree_reg_ref),
running_task_parallelism: Arc::new(AtomicU32::new(0)),
max_task_parallelism,
};

let (handle, shutdown_sender) = start_compactor(
Expand Down
13 changes: 0 additions & 13 deletions src/storage/compactor/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::net::SocketAddr;
use std::sync::atomic::AtomicU32;
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -242,10 +241,6 @@ pub async fn compactor_serve(
let compaction_executor = Arc::new(CompactionExecutor::new(
opts.compaction_worker_threads_number,
));
let max_task_parallelism = Arc::new(AtomicU32::new(
(compaction_executor.worker_num() as f32 * storage_opts.compactor_max_task_multiplier)
.ceil() as u32,
));

let compactor_context = CompactorContext {
storage_opts,
Expand All @@ -257,8 +252,6 @@ pub async fn compactor_serve(

task_progress_manager: Default::default(),
await_tree_reg: await_tree_reg.clone(),
running_task_parallelism: Arc::new(AtomicU32::new(0)),
max_task_parallelism,
};
let mut sub_tasks = vec![
MetaClient::start_heartbeat_loop(
Expand Down Expand Up @@ -378,10 +371,6 @@ pub async fn shared_compactor_serve(
let compaction_executor = Arc::new(CompactionExecutor::new(
opts.compaction_worker_threads_number,
));
let max_task_parallelism = Arc::new(AtomicU32::new(
(compaction_executor.worker_num() as f32 * storage_opts.compactor_max_task_multiplier)
.ceil() as u32,
));
let compactor_context = CompactorContext {
storage_opts,
sstable_store,
Expand All @@ -391,8 +380,6 @@ pub async fn shared_compactor_serve(
memory_limiter,
task_progress_manager: Default::default(),
await_tree_reg,
running_task_parallelism: Arc::new(AtomicU32::new(0)),
max_task_parallelism,
};
let join_handle = tokio::spawn(async move {
tonic::transport::Server::builder()
Expand Down
9 changes: 0 additions & 9 deletions src/storage/hummock_test/src/compactor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ pub(crate) mod tests {

use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::ops::Bound;
use std::sync::atomic::AtomicU32;
use std::sync::Arc;

use bytes::{BufMut, Bytes, BytesMut};
Expand Down Expand Up @@ -200,12 +199,6 @@ pub(crate) mod tests {
storage_opts: Arc<StorageOpts>,
sstable_store: SstableStoreRef,
) -> CompactorContext {
let compaction_executor = Arc::new(CompactionExecutor::new(Some(1)));
let max_task_parallelism = Arc::new(AtomicU32::new(
(compaction_executor.worker_num() as f32 * storage_opts.compactor_max_task_multiplier)
.ceil() as u32,
));

CompactorContext {
storage_opts,
sstable_store,
Expand All @@ -215,8 +208,6 @@ pub(crate) mod tests {
memory_limiter: MemoryLimiter::unlimit(),
task_progress_manager: Default::default(),
await_tree_reg: None,
running_task_parallelism: Arc::new(AtomicU32::new(0)),
max_task_parallelism,
}
}

Expand Down
Loading

0 comments on commit 242990b

Please sign in to comment.