Skip to content

Commit

Permalink
feat(storage): Improve task measurement on the compactor side with pa…
Browse files Browse the repository at this point in the history
…rallelism (#13812)
  • Loading branch information
Li0k authored Jan 3, 2024
1 parent 56c2d5f commit 035da38
Show file tree
Hide file tree
Showing 17 changed files with 177 additions and 45 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

8 changes: 7 additions & 1 deletion grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,13 @@ def section_compaction(outer_panels):
[
panels.target(
f"avg({metric('storage_compact_task_pending_num')}) by({COMPONENT_LABEL}, {NODE_LABEL})",
"compactor_task_split_count - {{%s}} @ {{%s}}"
"compactor_task_count - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),

panels.target(
f"avg({metric('storage_compact_task_pending_parallelism')}) by({COMPONENT_LABEL}, {NODE_LABEL})",
"compactor_task_pending_parallelism - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
],
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ message CompactTask {
PENDING = 1;
SUCCESS = 2;
HEARTBEAT_CANCELED = 3;
NO_AVAIL_RESOURCE_CANCELED = 4;
NO_AVAIL_MEMORY_RESOURCE_CANCELED = 4;
ASSIGN_FAIL_CANCELED = 5;
SEND_FAIL_CANCELED = 6;
MANUAL_CANCELED = 7;
Expand All @@ -294,6 +294,7 @@ message CompactTask {
EXECUTE_FAILED = 10;
JOIN_HANDLE_FAILED = 11;
TRACK_SST_OBJECT_ID_FAILED = 12;
NO_AVAIL_CPU_RESOURCE_CANCELED = 13;
}
// SSTs to be compacted, which will be removed from LSM after compaction
repeated InputLevel input_ssts = 1;
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1118,7 +1118,7 @@ pub mod default {
}

pub fn compactor_max_task_multiplier() -> f32 {
1.5000
2.5000
}

pub fn compactor_memory_available_proportion() -> f64 {
Expand Down
13 changes: 11 additions & 2 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,17 +206,26 @@ pub async fn compute_node_serve(
let memory_limiter = Arc::new(MemoryLimiter::new(
storage_opts.compactor_memory_limit_mb as u64 * 1024 * 1024 / 2,
));

let compaction_executor = Arc::new(CompactionExecutor::new(Some(1)));
let max_task_parallelism = Arc::new(AtomicU32::new(
(compaction_executor.worker_num() as f32
* storage_opts.compactor_max_task_multiplier)
.ceil() as u32,
));

let compactor_context = CompactorContext {
storage_opts,
sstable_store: storage.sstable_store(),
compactor_metrics: compactor_metrics.clone(),
is_share_buffer_compact: false,
compaction_executor: Arc::new(CompactionExecutor::new(Some(1))),
compaction_executor,
memory_limiter,

task_progress_manager: Default::default(),
await_tree_reg: None,
running_task_count: Arc::new(AtomicU32::new(0)),
running_task_parallelism: Arc::new(AtomicU32::new(0)),
max_task_parallelism,
};

let (handle, shutdown_sender) = start_compactor(
Expand Down
2 changes: 1 addition & 1 deletion src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ imm_merge_threshold = 4
write_conflict_detection_enabled = true
disable_remote_compactor = false
share_buffer_upload_concurrency = 8
compactor_max_task_multiplier = 1.5
compactor_max_task_multiplier = 2.5
compactor_memory_available_proportion = 0.8
sstable_id_remote_fetch_number = 10
min_sst_size_for_streaming_upload = 33554432
Expand Down
3 changes: 2 additions & 1 deletion src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ pub static CANCEL_STATUS_SET: LazyLock<HashSet<TaskStatus>> = LazyLock::new(|| {
TaskStatus::AssignFailCanceled,
TaskStatus::HeartbeatCanceled,
TaskStatus::InvalidGroupCanceled,
TaskStatus::NoAvailResourceCanceled,
TaskStatus::NoAvailMemoryResourceCanceled,
TaskStatus::NoAvailCpuResourceCanceled,
]
.into_iter()
.collect()
Expand Down
30 changes: 22 additions & 8 deletions src/storage/compactor/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,19 +237,27 @@ pub async fn compactor_serve(
let filter_key_extractor_manager = FilterKeyExtractorManager::RpcFilterKeyExtractorManager(
filter_key_extractor_manager.clone(),
);

let compaction_executor = Arc::new(CompactionExecutor::new(
opts.compaction_worker_threads_number,
));
let max_task_parallelism = Arc::new(AtomicU32::new(
(compaction_executor.worker_num() as f32 * storage_opts.compactor_max_task_multiplier)
.ceil() as u32,
));

let compactor_context = CompactorContext {
storage_opts,
sstable_store: sstable_store.clone(),
compactor_metrics,
is_share_buffer_compact: false,
compaction_executor: Arc::new(CompactionExecutor::new(
opts.compaction_worker_threads_number,
)),
compaction_executor,
memory_limiter,

task_progress_manager: Default::default(),
await_tree_reg: await_tree_reg.clone(),
running_task_count: Arc::new(AtomicU32::new(0)),
running_task_parallelism: Arc::new(AtomicU32::new(0)),
max_task_parallelism,
};
let mut sub_tasks = vec![
MetaClient::start_heartbeat_loop(
Expand Down Expand Up @@ -366,18 +374,24 @@ pub async fn shared_compactor_serve(
heap_profiler.start();

let (shutdown_send, mut shutdown_recv) = tokio::sync::oneshot::channel();
let compaction_executor = Arc::new(CompactionExecutor::new(
opts.compaction_worker_threads_number,
));
let max_task_parallelism = Arc::new(AtomicU32::new(
(compaction_executor.worker_num() as f32 * storage_opts.compactor_max_task_multiplier)
.ceil() as u32,
));
let compactor_context = CompactorContext {
storage_opts,
sstable_store,
compactor_metrics,
is_share_buffer_compact: false,
compaction_executor: Arc::new(CompactionExecutor::new(
opts.compaction_worker_threads_number,
)),
compaction_executor,
memory_limiter,
task_progress_manager: Default::default(),
await_tree_reg,
running_task_count: Arc::new(AtomicU32::new(0)),
running_task_parallelism: Arc::new(AtomicU32::new(0)),
max_task_parallelism,
};
let join_handle = tokio::spawn(async move {
tonic::transport::Server::builder()
Expand Down
9 changes: 8 additions & 1 deletion src/storage/hummock_test/src/compactor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@ pub(crate) mod tests {
storage_opts: Arc<StorageOpts>,
sstable_store: SstableStoreRef,
) -> CompactorContext {
let compaction_executor = Arc::new(CompactionExecutor::new(Some(1)));
let max_task_parallelism = Arc::new(AtomicU32::new(
(compaction_executor.worker_num() as f32 * storage_opts.compactor_max_task_multiplier)
.ceil() as u32,
));

CompactorContext {
storage_opts,
sstable_store,
Expand All @@ -197,7 +203,8 @@ pub(crate) mod tests {
memory_limiter: MemoryLimiter::unlimit(),
task_progress_manager: Default::default(),
await_tree_reg: None,
running_task_count: Arc::new(AtomicU32::new(0)),
running_task_parallelism: Arc::new(AtomicU32::new(0)),
max_task_parallelism,
}
}

Expand Down
40 changes: 37 additions & 3 deletions src/storage/src/hummock/compactor/compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::{HashMap, HashSet};
use std::sync::atomic::Ordering;
use std::sync::Arc;

use await_tree::InstrumentAwait;
Expand Down Expand Up @@ -394,6 +395,7 @@ pub async fn compact(
&& delete_key_count * 100
< context.storage_opts.compactor_fast_max_compact_delete_ratio as u64 * total_key_count
&& compact_task.task_type() == TaskType::Dynamic;

if !optimize_by_copy_block {
match generate_splits(&sstable_infos, compaction_size, context.clone()).await {
Ok(splits) => {
Expand All @@ -412,6 +414,27 @@ pub async fn compact(
// Number of splits (key ranges) is equal to number of compaction tasks
let parallelism = compact_task.splits.len();
assert_ne!(parallelism, 0, "splits cannot be empty");
if !context.acquire_task_quota(parallelism as u32) {
tracing::warn!(
"Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}",
compact_task.task_id,
parallelism,
context.running_task_parallelism.load(Ordering::Relaxed),
context.max_task_parallelism.load(Ordering::Relaxed),
);
return compact_done(
compact_task,
context.clone(),
vec![],
TaskStatus::NoAvailCpuResourceCanceled,
);
}

let _release_quota_guard =
scopeguard::guard((parallelism, context.clone()), |(parallelism, context)| {
context.release_task_quota(parallelism as u32);
});

let mut output_ssts = Vec::with_capacity(parallelism);
let mut compaction_futures = vec![];
let mut abort_handles = vec![];
Expand Down Expand Up @@ -460,11 +483,24 @@ pub async fn compact(
context.memory_limiter.get_memory_usage(),
context.memory_limiter.quota()
);
task_status = TaskStatus::NoAvailResourceCanceled;
task_status = TaskStatus::NoAvailMemoryResourceCanceled;
return compact_done(compact_task, context.clone(), output_ssts, task_status);
}

context.compactor_metrics.compact_task_pending_num.inc();
context
.compactor_metrics
.compact_task_pending_parallelism
.add(parallelism as _);
let _release_metrics_guard =
scopeguard::guard((parallelism, context.clone()), |(parallelism, context)| {
context.compactor_metrics.compact_task_pending_num.dec();
context
.compactor_metrics
.compact_task_pending_parallelism
.sub(parallelism as _);
});

if optimize_by_copy_block {
let runner = fast_compactor_runner::CompactorRunner::new(
context.clone(),
Expand Down Expand Up @@ -497,7 +533,6 @@ pub async fn compact(
}
}

context.compactor_metrics.compact_task_pending_num.dec();
// After a compaction is done, mutate the compaction task.
let (compact_task, table_stats) =
compact_done(compact_task, context.clone(), output_ssts, task_status);
Expand Down Expand Up @@ -602,7 +637,6 @@ pub async fn compact(
cost_time,
compact_task_to_string(&compact_task)
);
context.compactor_metrics.compact_task_pending_num.dec();
for level in &compact_task.input_ssts {
for table in &level.table_infos {
context.sstable_store.delete_cache(table.get_object_id());
Expand Down
58 changes: 55 additions & 3 deletions src/storage/src/hummock/compactor/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::atomic::AtomicU32;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;

use more_asserts::assert_ge;
use parking_lot::RwLock;

use super::task_progress::TaskProgressManagerRef;
Expand Down Expand Up @@ -47,7 +48,9 @@ pub struct CompactorContext {

pub await_tree_reg: Option<Arc<RwLock<await_tree::Registry<String>>>>,

pub running_task_count: Arc<AtomicU32>,
pub running_task_parallelism: Arc<AtomicU32>,

pub max_task_parallelism: Arc<AtomicU32>,
}

impl CompactorContext {
Expand Down Expand Up @@ -75,7 +78,56 @@ impl CompactorContext {
memory_limiter: MemoryLimiter::unlimit(),
task_progress_manager: Default::default(),
await_tree_reg: None,
running_task_count: Arc::new(AtomicU32::new(0)),
running_task_parallelism: Arc::new(AtomicU32::new(0)),
max_task_parallelism: Arc::new(AtomicU32::new(u32::MAX)),
}
}

pub fn acquire_task_quota(&self, parallelism: u32) -> bool {
let mut running_u32 = self.running_task_parallelism.load(Ordering::SeqCst);
let max_u32 = self.max_task_parallelism.load(Ordering::SeqCst);

while parallelism + running_u32 <= max_u32 {
match self.running_task_parallelism.compare_exchange(
running_u32,
running_u32 + parallelism,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => {
return true;
}
Err(old_running_u32) => {
running_u32 = old_running_u32;
}
}
}

false
}

pub fn release_task_quota(&self, parallelism: u32) {
let prev = self
.running_task_parallelism
.fetch_sub(parallelism, Ordering::SeqCst);

assert_ge!(
prev,
parallelism,
"running {} parallelism {}",
prev,
parallelism
);
}

pub fn get_free_quota(&self) -> u32 {
let running_u32 = self.running_task_parallelism.load(Ordering::SeqCst);
let max_u32 = self.max_task_parallelism.load(Ordering::SeqCst);

if max_u32 > running_u32 {
max_u32 - running_u32
} else {
0
}
}
}
Loading

0 comments on commit 035da38

Please sign in to comment.