Skip to content

Commit

Permalink
feat(compactor): check result for all compact task (#14521)
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace authored Jan 17, 2024
1 parent 8a542ad commit 4dde9e6
Show file tree
Hide file tree
Showing 9 changed files with 202 additions and 42 deletions.
6 changes: 3 additions & 3 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,8 +624,8 @@ pub struct StorageConfig {
pub compactor_max_sst_size: u64,
#[serde(default = "default::storage::enable_fast_compaction")]
pub enable_fast_compaction: bool,
#[serde(default = "default::storage::check_fast_compaction_result")]
pub check_fast_compaction_result: bool,
#[serde(default = "default::storage::check_compaction_result")]
pub check_compaction_result: bool,
#[serde(default = "default::storage::max_preload_io_retry_times")]
pub max_preload_io_retry_times: usize,

Expand Down Expand Up @@ -1213,7 +1213,7 @@ pub mod default {
false
}

pub fn check_fast_compaction_result() -> bool {
pub fn check_compaction_result() -> bool {
false
}

Expand Down
2 changes: 1 addition & 1 deletion src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ compactor_max_sst_key_count = 2097152
compact_iter_recreate_timeout_ms = 600000
compactor_max_sst_size = 536870912
enable_fast_compaction = false
check_fast_compaction_result = false
check_compaction_result = false
max_preload_io_retry_times = 3
compactor_fast_max_compact_delete_ratio = 40
compactor_fast_max_compact_task_size = 2147483648
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/hummock/manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ async fn test_print_compact_task() {
);

let s = compact_task_to_string(&compact_task);
assert!(s.contains("Compaction task id: 1, group-id: 2, target level: 0"));
assert!(s.contains("Compaction task id: 1, group-id: 2, task type: Dynamic, target level: 0"));
}

#[tokio::test]
Expand Down
3 changes: 2 additions & 1 deletion src/storage/hummock_sdk/src/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ pub fn compact_task_to_string(compact_task: &CompactTask) -> String {
let mut s = String::new();
writeln!(
s,
"Compaction task id: {:?}, group-id: {:?}, target level: {:?}, target sub level: {:?}",
"Compaction task id: {:?}, group-id: {:?}, task type: {:?}, target level: {:?}, target sub level: {:?}",
compact_task.task_id,
compact_task.compaction_group_id,
compact_task.task_type(),
compact_task.target_level,
compact_task.target_sub_level_id
)
Expand Down
135 changes: 123 additions & 12 deletions src/storage/src/hummock/compactor/compaction_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::Arc;

use itertools::Itertools;
use risingwave_common::constants::hummock::CompactionFilterFlag;
use risingwave_hummock_sdk::compaction_group::StateTableId;
use risingwave_hummock_sdk::key::FullKey;
use risingwave_hummock_sdk::key_range::KeyRange;
use risingwave_hummock_sdk::prost_key_range::KeyRangeExt;
Expand All @@ -37,13 +38,14 @@ use crate::hummock::compactor::{
TtlCompactionFilter,
};
use crate::hummock::iterator::{
ForwardMergeRangeIterator, MergeIterator, SkipWatermarkIterator, UserIterator,
Forward, ForwardMergeRangeIterator, HummockIterator, MergeIterator, SkipWatermarkIterator,
UserIterator,
};
use crate::hummock::multi_builder::TableBuilderFactory;
use crate::hummock::sstable::DEFAULT_ENTRY_SIZE;
use crate::hummock::{
CachePolicy, FilterBuilder, GetObjectId, HummockResult, MemoryLimiter, SstableBuilder,
SstableBuilderOptions, SstableWriterFactory, SstableWriterOptions,
SstableBuilderOptions, SstableDeleteRangeIterator, SstableWriterFactory, SstableWriterOptions,
};
use crate::monitor::StoreLocalStatistic;

Expand Down Expand Up @@ -312,11 +314,36 @@ pub fn estimate_task_output_capacity(context: CompactorContext, task: &CompactTa
std::cmp::min(capacity, total_input_uncompressed_file_size as usize)
}

/// Compare result of compaction task and input. The data saw by user shall not change after applying compaction result.
pub async fn check_compaction_result(
compact_task: &CompactTask,
context: CompactorContext,
) -> HummockResult<()> {
) -> HummockResult<bool> {
let has_ttl = compact_task
.table_options
.iter()
.any(|(_, table_option)| table_option.retention_seconds > 0);

let mut compact_table_ids = compact_task
.input_ssts
.iter()
.flat_map(|level| level.table_infos.iter())
.flat_map(|sst| sst.table_ids.clone())
.collect_vec();
compact_table_ids.sort();
compact_table_ids.dedup();
let existing_table_ids: HashSet<u32> =
HashSet::from_iter(compact_task.existing_table_ids.clone());
let need_clean_state_table = compact_table_ids
.iter()
.any(|table_id| !existing_table_ids.contains(table_id));
// This check method does not consider dropped keys by compaction filter.
if has_ttl || need_clean_state_table {
return Ok(true);
}

let mut table_iters = Vec::new();
let mut del_iter = ForwardMergeRangeIterator::default();
let compact_io_retry_time = context.storage_opts.compact_iter_recreate_timeout_ms;
for level in &compact_task.input_ssts {
if level.table_infos.is_empty() {
Expand All @@ -326,6 +353,7 @@ pub async fn check_compaction_result(
// Do not need to filter the table because manager has done it.
if level.level_type == LevelType::Nonoverlapping as i32 {
debug_assert!(can_concat(&level.table_infos));
del_iter.add_concat_iter(level.table_infos.clone(), context.sstable_store.clone());

table_iters.push(ConcatSstableIterator::new(
compact_task.existing_table_ids.clone(),
Expand All @@ -336,8 +364,13 @@ pub async fn check_compaction_result(
compact_io_retry_time,
));
} else {
let mut stats = StoreLocalStatistic::default();
for table_info in &level.table_infos {
assert_eq!(table_info.range_tombstone_count, 0);
let table = context
.sstable_store
.sstable(table_info, &mut stats)
.await?;
del_iter.add_sst_iter(SstableDeleteRangeIterator::new(table));
table_iters.push(ConcatSstableIterator::new(
compact_task.existing_table_ids.clone(),
vec![table_info.clone()],
Expand All @@ -349,14 +382,20 @@ pub async fn check_compaction_result(
}
}
}

let iter = MergeIterator::for_compactor(table_iters);
let mut left_iter = UserIterator::new(
let left_iter = UserIterator::new(
SkipWatermarkIterator::from_safe_epoch_watermarks(iter, &compact_task.table_watermarks),
(Bound::Unbounded, Bound::Unbounded),
u64::MAX,
0,
None,
ForwardMergeRangeIterator::default(),
del_iter,
);
let mut del_iter = ForwardMergeRangeIterator::default();
del_iter.add_concat_iter(
compact_task.sorted_output_ssts.clone(),
context.sstable_store.clone(),
);
let iter = ConcatSstableIterator::new(
compact_task.existing_table_ids.clone(),
Expand All @@ -366,22 +405,94 @@ pub async fn check_compaction_result(
Arc::new(TaskProgress::default()),
compact_io_retry_time,
);
let mut right_iter = UserIterator::new(
let right_iter = UserIterator::new(
SkipWatermarkIterator::from_safe_epoch_watermarks(iter, &compact_task.table_watermarks),
(Bound::Unbounded, Bound::Unbounded),
u64::MAX,
0,
None,
ForwardMergeRangeIterator::default(),
del_iter,
);

check_result(left_iter, right_iter).await
}

pub async fn check_flush_result<I: HummockIterator<Direction = Forward>>(
left_iter: UserIterator<I>,
existing_table_ids: Vec<StateTableId>,
sort_ssts: Vec<SstableInfo>,
context: CompactorContext,
) -> HummockResult<bool> {
let mut del_iter = ForwardMergeRangeIterator::default();
del_iter.add_concat_iter(sort_ssts.clone(), context.sstable_store.clone());
let iter = ConcatSstableIterator::new(
existing_table_ids.clone(),
sort_ssts.clone(),
KeyRange::inf(),
context.sstable_store.clone(),
Arc::new(TaskProgress::default()),
0,
);
let right_iter = UserIterator::new(
iter,
(Bound::Unbounded, Bound::Unbounded),
u64::MAX,
0,
None,
del_iter,
);
check_result(left_iter, right_iter).await
}

async fn check_result<
I1: HummockIterator<Direction = Forward>,
I2: HummockIterator<Direction = Forward>,
>(
mut left_iter: UserIterator<I1>,
mut right_iter: UserIterator<I2>,
) -> HummockResult<bool> {
left_iter.rewind().await?;
right_iter.rewind().await?;
let mut right_count = 0;
let mut left_count = 0;
while left_iter.is_valid() && right_iter.is_valid() {
assert_eq!(left_iter.key(), right_iter.key());
assert_eq!(left_iter.value(), right_iter.value());
if left_iter.key() != right_iter.key() {
tracing::error!(
"The key of input and output not equal. key: {:?} vs {:?}",
left_iter.key(),
right_iter.key()
);
return Ok(false);
}
if left_iter.value() != right_iter.value() {
tracing::error!(
"The value of input and output not equal. key: {:?}, value: {:?} vs {:?}",
left_iter.key(),
left_iter.value(),
right_iter.value()
);
return Ok(false);
}
left_iter.next().await?;
right_iter.next().await?;
left_count += 1;
right_count += 1;
}
while left_iter.is_valid() {
left_count += 1;
left_iter.next().await?;
}
while right_iter.is_valid() {
right_count += 1;
right_iter.next().await?;
}
assert!(!left_iter.is_valid() && !right_iter.is_valid());
Ok(())
if left_count != right_count {
tracing::error!(
"The key count of input and output not equal: {} vs {}",
left_count,
right_count
);
return Ok(false);
}
Ok(true)
}
17 changes: 1 addition & 16 deletions src/storage/src/hummock/compactor/compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use tokio::sync::oneshot::Receiver;

use super::iterator::MonitoredCompactorIterator;
use super::task_progress::TaskProgress;
use super::{check_compaction_result, CompactionStatistics, TaskConfig};
use super::{CompactionStatistics, TaskConfig};
use crate::filter_key_extractor::{FilterKeyExtractorImpl, FilterKeyExtractorManager};
use crate::hummock::compactor::compaction_utils::{
build_multi_compaction_filter, estimate_task_output_capacity, generate_splits,
Expand Down Expand Up @@ -545,16 +545,6 @@ pub async fn compact(
cost_time,
compact_task_to_string(&compact_task)
);
// TODO: remove this method after we have running risingwave cluster with fast compact algorithm stably for a long time.
if context.storage_opts.check_fast_compaction_result
&& let Err(e) = check_compaction_result(&compact_task, context.clone()).await
{
tracing::error!(
"Failed to check fast compaction task {} because: {:?}",
compact_task.task_id,
e
);
}
return (compact_task, table_stats);
}
for (split_index, _) in compact_task.splits.iter().enumerate() {
Expand Down Expand Up @@ -650,11 +640,6 @@ pub async fn compact(
cost_time,
compact_task_to_string(&compact_task)
);
for level in &compact_task.input_ssts {
for table in &level.table_infos {
context.sstable_store.delete_cache(table.get_object_id());
}
}
(compact_task, table_stats)
}

Expand Down
27 changes: 23 additions & 4 deletions src/storage/src/hummock/compactor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use futures::{pin_mut, StreamExt};
pub use iterator::{ConcatSstableIterator, SstableStreamIterator};
use more_asserts::assert_ge;
use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
use risingwave_hummock_sdk::{HummockCompactionTaskId, LocalSstableInfo};
use risingwave_hummock_sdk::{compact_task_to_string, HummockCompactionTaskId, LocalSstableInfo};
use risingwave_pb::hummock::compact_task::TaskStatus;
use risingwave_pb::hummock::subscribe_compaction_event_request::{
Event as RequestEvent, HeartBeat, PullTask, ReportTask,
Expand All @@ -66,7 +66,8 @@ use tokio::task::JoinHandle;
use tokio::time::Instant;

pub use self::compaction_utils::{
check_compaction_result, CompactionStatistics, RemoteBuilderFactory, TaskConfig,
check_compaction_result, check_flush_result, CompactionStatistics, RemoteBuilderFactory,
TaskConfig,
};
pub use self::task_progress::TaskProgress;
use super::multi_builder::CapacitySplitTableBuilder;
Expand Down Expand Up @@ -665,7 +666,7 @@ pub fn start_shared_compactor(
shutdown.lock().unwrap().remove(&task_id);
let report_compaction_task_request = ReportCompactionTaskRequest {
event: Some(ReportCompactionTaskEvent::ReportTask(ReportSharedTask {
compact_task: Some(compact_task),
compact_task: Some(compact_task.clone()),
table_stats_change: to_prost_table_stats_map(table_stats),
})),
};
Expand All @@ -674,9 +675,27 @@ pub fn start_shared_compactor(
.report_compaction_task(report_compaction_task_request)
.await
{
Ok(_) => {}
Ok(_) => {
// TODO: remove this method after we have running risingwave cluster with fast compact algorithm stably for a long time.

if context.storage_opts.check_compaction_result
&& !compact_task.sorted_output_ssts.is_empty()
&& compact_task.task_status() == TaskStatus::Success
{
match check_compaction_result(&compact_task, context.clone()).await {
Err(e) => {
tracing::warn!("Failed to check compaction task {} because: {:?}",compact_task.task_id, e);
},
Ok(true) => (),
Ok(false) => {
panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
}
}
}
}
Err(e) => tracing::warn!("Failed to report task {task_id:?} . {e:?}"),
}

}
dispatch_compaction_task_request::Task::VacuumTask(vacuum_task) => {
match Vacuum::handle_vacuum_task(
Expand Down
Loading

0 comments on commit 4dde9e6

Please sign in to comment.