Skip to content

Commit

Permalink
fix(compactor): check compaction result for fast compact (#14274)
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace authored Jan 9, 2024
1 parent 580a60b commit e6a4ffc
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 7 deletions.
8 changes: 7 additions & 1 deletion src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,8 @@ pub struct StorageConfig {
pub compactor_max_sst_size: u64,
#[serde(default = "default::storage::enable_fast_compaction")]
pub enable_fast_compaction: bool,
#[serde(default = "default::storage::check_fast_compaction_result")]
pub check_fast_compaction_result: bool,
#[serde(default = "default::storage::max_preload_io_retry_times")]
pub max_preload_io_retry_times: usize,

Expand Down Expand Up @@ -1183,7 +1185,11 @@ pub mod default {
}

pub fn enable_fast_compaction() -> bool {
true
false
}

pub fn check_fast_compaction_result() -> bool {
false
}

pub fn max_preload_io_retry_times() -> usize {
Expand Down
3 changes: 2 additions & 1 deletion src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ max_version_pinning_duration_sec = 10800
compactor_max_sst_key_count = 2097152
compact_iter_recreate_timeout_ms = 600000
compactor_max_sst_size = 536870912
enable_fast_compaction = true
enable_fast_compaction = false
check_fast_compaction_result = false
max_preload_io_retry_times = 3
compactor_fast_max_compact_delete_ratio = 40
compactor_fast_max_compact_task_size = 2147483648
Expand Down
87 changes: 84 additions & 3 deletions src/storage/src/hummock/compactor/compaction_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::collections::{BTreeMap, HashSet};
use std::marker::PhantomData;
use std::ops::Bound;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

Expand All @@ -23,14 +24,20 @@ use risingwave_hummock_sdk::key::FullKey;
use risingwave_hummock_sdk::key_range::KeyRange;
use risingwave_hummock_sdk::prost_key_range::KeyRangeExt;
use risingwave_hummock_sdk::table_stats::TableStatsMap;
use risingwave_hummock_sdk::{EpochWithGap, KeyComparator};
use risingwave_pb::hummock::{compact_task, CompactTask, KeyRange as KeyRange_vec, SstableInfo};
use risingwave_hummock_sdk::{can_concat, EpochWithGap, KeyComparator};
use risingwave_pb::hummock::{
compact_task, CompactTask, KeyRange as KeyRange_vec, LevelType, SstableInfo,
};
use tokio::time::Instant;

pub use super::context::CompactorContext;
use crate::filter_key_extractor::FilterKeyExtractorImpl;
use crate::hummock::compactor::{
MultiCompactionFilter, StateCleanUpCompactionFilter, TtlCompactionFilter,
ConcatSstableIterator, MultiCompactionFilter, StateCleanUpCompactionFilter, TaskProgress,
TtlCompactionFilter,
};
use crate::hummock::iterator::{
ForwardMergeRangeIterator, SkipWatermarkIterator, UnorderedMergeIteratorInner, UserIterator,
};
use crate::hummock::multi_builder::TableBuilderFactory;
use crate::hummock::sstable::DEFAULT_ENTRY_SIZE;
Expand Down Expand Up @@ -304,3 +311,77 @@ pub fn estimate_task_output_capacity(context: CompactorContext, task: &CompactTa
let capacity = std::cmp::min(task.target_file_size as usize, max_target_file_size);
std::cmp::min(capacity, total_input_uncompressed_file_size as usize)
}

pub async fn check_compaction_result(
compact_task: &CompactTask,
context: CompactorContext,
) -> HummockResult<()> {
let mut table_iters = Vec::new();
let compact_io_retry_time = context.storage_opts.compact_iter_recreate_timeout_ms;
for level in &compact_task.input_ssts {
if level.table_infos.is_empty() {
continue;
}

// Do not need to filter the table because manager has done it.
if level.level_type == LevelType::Nonoverlapping as i32 {
debug_assert!(can_concat(&level.table_infos));

table_iters.push(ConcatSstableIterator::new(
compact_task.existing_table_ids.clone(),
level.table_infos.clone(),
KeyRange::inf(),
context.sstable_store.clone(),
Arc::new(TaskProgress::default()),
compact_io_retry_time,
));
} else {
for table_info in &level.table_infos {
assert_eq!(table_info.range_tombstone_count, 0);
table_iters.push(ConcatSstableIterator::new(
compact_task.existing_table_ids.clone(),
vec![table_info.clone()],
KeyRange::inf(),
context.sstable_store.clone(),
Arc::new(TaskProgress::default()),
compact_io_retry_time,
));
}
}
}
let iter = UnorderedMergeIteratorInner::for_compactor(table_iters);
let mut left_iter = UserIterator::new(
SkipWatermarkIterator::from_safe_epoch_watermarks(iter, &compact_task.table_watermarks),
(Bound::Unbounded, Bound::Unbounded),
u64::MAX,
0,
None,
ForwardMergeRangeIterator::default(),
);
let iter = ConcatSstableIterator::new(
compact_task.existing_table_ids.clone(),
compact_task.sorted_output_ssts.clone(),
KeyRange::inf(),
context.sstable_store.clone(),
Arc::new(TaskProgress::default()),
compact_io_retry_time,
);
let mut right_iter = UserIterator::new(
SkipWatermarkIterator::from_safe_epoch_watermarks(iter, &compact_task.table_watermarks),
(Bound::Unbounded, Bound::Unbounded),
u64::MAX,
0,
None,
ForwardMergeRangeIterator::default(),
);
left_iter.rewind().await?;
right_iter.rewind().await?;
while left_iter.is_valid() && right_iter.is_valid() {
assert_eq!(left_iter.key(), right_iter.key());
assert_eq!(left_iter.value(), right_iter.value());
left_iter.next().await?;
right_iter.next().await?;
}
assert!(!left_iter.is_valid() && !right_iter.is_valid());
Ok(())
}
12 changes: 11 additions & 1 deletion src/storage/src/hummock/compactor/compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use risingwave_pb::hummock::{BloomFilterType, CompactTask, LevelType};
use tokio::sync::oneshot::Receiver;

use super::task_progress::TaskProgress;
use super::{CompactionStatistics, TaskConfig};
use super::{check_compaction_result, CompactionStatistics, TaskConfig};
use crate::filter_key_extractor::{FilterKeyExtractorImpl, FilterKeyExtractorManager};
use crate::hummock::compactor::compaction_utils::{
build_multi_compaction_filter, estimate_task_output_capacity, generate_splits,
Expand Down Expand Up @@ -542,6 +542,16 @@ pub async fn compact(
cost_time,
compact_task_to_string(&compact_task)
);
// TODO: remove this method after we have running risingwave cluster with fast compact algorithm stably for a long time.
if context.storage_opts.check_fast_compaction_result
&& let Err(e) = check_compaction_result(&compact_task, context.clone()).await
{
tracing::error!(
"Failed to check fast compaction task {} because: {:?}",
compact_task.task_id,
e
);
}
return (compact_task, table_stats);
}
for (split_index, _) in compact_task.splits.iter().enumerate() {
Expand Down
4 changes: 3 additions & 1 deletion src/storage/src/hummock/compactor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ use tokio::sync::oneshot::Sender;
use tokio::task::JoinHandle;
use tokio::time::Instant;

pub use self::compaction_utils::{CompactionStatistics, RemoteBuilderFactory, TaskConfig};
pub use self::compaction_utils::{
check_compaction_result, CompactionStatistics, RemoteBuilderFactory, TaskConfig,
};
pub use self::task_progress::TaskProgress;
use super::multi_builder::CapacitySplitTableBuilder;
use super::{
Expand Down
2 changes: 2 additions & 0 deletions src/storage/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ pub struct StorageOpts {
pub compactor_max_sst_size: u64,
/// enable FastCompactorRunner.
pub enable_fast_compaction: bool,
pub check_fast_compaction_result: bool,
pub max_preload_io_retry_times: usize,
pub compactor_fast_max_compact_delete_ratio: u32,
pub compactor_fast_max_compact_task_size: u64,
Expand Down Expand Up @@ -248,6 +249,7 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt
compactor_max_task_multiplier: c.storage.compactor_max_task_multiplier,
compactor_max_sst_size: c.storage.compactor_max_sst_size,
enable_fast_compaction: c.storage.enable_fast_compaction,
check_fast_compaction_result: c.storage.check_fast_compaction_result,
mem_table_spill_threshold: c.storage.mem_table_spill_threshold,
object_store_config: c.storage.object_store.clone(),
compactor_fast_max_compact_delete_ratio: c
Expand Down

0 comments on commit e6a4ffc

Please sign in to comment.