Skip to content

Commit

Permalink
fix(compaction): disable fast compact for split sst (#18347)
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k authored Sep 2, 2024
1 parent f60be13 commit cd0eefd
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 8 deletions.
7 changes: 7 additions & 0 deletions src/storage/src/hummock/compactor/compaction_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,12 @@ pub fn optimize_by_copy_block(compact_task: &CompactTask, context: &CompactorCon
.iter()
.any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0));

let has_split_sst = compact_task
.input_ssts
.iter()
.flat_map(|level| level.table_infos.iter())
.any(|sst| sst.sst_id != sst.object_id);

let compact_table_ids: HashSet<u32> = HashSet::from_iter(
compact_task
.input_ssts
Expand All @@ -546,6 +552,7 @@ pub fn optimize_by_copy_block(compact_task: &CompactTask, context: &CompactorCon
&& all_ssts_are_blocked_filter
&& !has_tombstone
&& !has_ttl
&& !has_split_sst
&& single_table
&& compact_task.target_level > 0
&& compact_task.input_ssts.len() == 2
Expand Down
3 changes: 3 additions & 0 deletions src/storage/src/hummock/compactor/fast_compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,9 @@ impl ConcatSstableIterator {
let stats_ptr = self.stats.remote_io_time.clone();
let now = Instant::now();
self.task_progress.inc_num_pending_read_io();

// Fast compact only support the single table compaction.(not split sst)
// So we don't need to filter the block_metas with table_id and key_range
let block_stream = self
.sstable_store
.get_stream_for_blocks(sstable.id, &sstable.meta.block_metas)
Expand Down
14 changes: 8 additions & 6 deletions src/storage/src/hummock/compactor/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ pub struct SstableStreamIterator {

/// For key sanity check of divided SST and debugging
sstable_info: SstableInfo,
existing_table_ids: HashSet<StateTableId>,

/// To Filter out the blocks
sstable_table_ids: HashSet<StateTableId>,
task_progress: Arc<TaskProgress>,
io_retry_times: usize,
max_io_retry_times: usize,
Expand Down Expand Up @@ -86,16 +88,17 @@ impl SstableStreamIterator {
pub fn new(
block_metas: Vec<BlockMeta>,
sstable_info: SstableInfo,
existing_table_ids: HashSet<StateTableId>,
stats: &StoreLocalStatistic,
task_progress: Arc<TaskProgress>,
sstable_store: SstableStoreRef,
max_io_retry_times: usize,
) -> Self {
let sstable_table_ids = HashSet::from_iter(sstable_info.table_ids.iter().cloned());

// filter the block meta with key range
let block_metas = filter_block_metas(
&block_metas,
&existing_table_ids,
&sstable_table_ids,
sstable_info.key_range.clone(),
);

Expand All @@ -109,7 +112,7 @@ impl SstableStreamIterator {
block_metas,
block_idx: 0,
stats_ptr: stats.remote_io_time.clone(),
existing_table_ids,
sstable_table_ids,
sstable_info,
sstable_store,
task_progress,
Expand Down Expand Up @@ -137,7 +140,7 @@ impl SstableStreamIterator {
async fn prune_from_valid_block_iter(&mut self) -> HummockResult<()> {
while let Some(block_iter) = self.block_iter.as_mut() {
if self
.existing_table_ids
.sstable_table_ids
.contains(&block_iter.table_id().table_id)
{
return Ok(());
Expand Down Expand Up @@ -461,7 +464,6 @@ impl ConcatSstableIterator {
let mut sstable_iter = SstableStreamIterator::new(
block_metas,
table_info.clone(),
self.existing_table_ids.clone(),
&self.stats,
self.task_progress.clone(),
self.sstable_store.clone(),
Expand Down
2 changes: 2 additions & 0 deletions src/storage/src/hummock/sstable_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,7 @@ mod tests {
meta.clone(),
sstable_store.clone(),
writer_opts,
vec![SST_ID as u32],
)
.await
.unwrap();
Expand Down Expand Up @@ -806,6 +807,7 @@ mod tests {
meta.clone(),
sstable_store.clone(),
writer_opts,
vec![SST_ID as u32],
)
.await
.unwrap();
Expand Down
2 changes: 2 additions & 0 deletions src/storage/src/hummock/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ pub async fn put_sst(
mut meta: SstableMeta,
sstable_store: SstableStoreRef,
mut options: SstableWriterOptions,
table_ids: Vec<u32>,
) -> HummockResult<SstableInfo> {
options.policy = CachePolicy::NotFill;
let mut writer = sstable_store
Expand All @@ -199,6 +200,7 @@ pub async fn put_sst(
file_size: meta.estimated_size as u64,
meta_offset: meta.meta_offset,
uncompressed_file_size: meta.estimated_size as u64,
table_ids,
..Default::default()
};
let writer_output = writer.finish(meta).await?;
Expand Down
3 changes: 1 addition & 2 deletions src/storage/src/storage_failpoints/test_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::ops::Bound::Unbounded;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -392,6 +391,7 @@ async fn test_failpoints_compactor_iterator_recreate() {
meta.clone(),
sstable_store.clone(),
default_writer_opt_for_test(),
vec![table_id as u32],
)
.await
.unwrap();
Expand All @@ -402,7 +402,6 @@ async fn test_failpoints_compactor_iterator_recreate() {
let mut sstable_iter = SstableStreamIterator::new(
table.meta.block_metas.clone(),
info,
HashSet::from_iter(std::iter::once(0)),
&stats,
Arc::new(TaskProgress::default()),
sstable_store,
Expand Down
2 changes: 2 additions & 0 deletions src/storage/src/storage_failpoints/test_sstable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ async fn test_failpoints_vacuum_and_metadata() {
meta.clone(),
sstable_store.clone(),
default_writer_opt_for_test(),
vec![table_id as u32],
)
.await;
assert!(result.is_err());
Expand All @@ -118,6 +119,7 @@ async fn test_failpoints_vacuum_and_metadata() {
meta,
sstable_store.clone(),
default_writer_opt_for_test(),
vec![table_id as u32],
)
.await
.unwrap();
Expand Down

0 comments on commit cd0eefd

Please sign in to comment.