diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index 0ce88a76..53e1fa70 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -152,54 +152,15 @@ where if !option.is_threshold_exceeded_major(version, level) { break; } + let (meet_scopes_l, start_l, end_l) = + match Self::this_level_scopes(version, min, max, level) { + Some(value) => value, + None => return Ok(()), + }; + let (meet_scopes_ll, start_ll, end_ll) = + Self::next_level_scopes(version, &mut min, &mut max, level, &meet_scopes_l)?; - let mut meet_scopes_l = Vec::new(); - let start_l = Version::::scope_search(min, &version.level_slice[level]); - let mut end_l = start_l; - { - for scope in version.level_slice[level][start_l..].iter() { - if scope.contains(min) || scope.contains(max) { - meet_scopes_l.push(scope); - end_l += 1; - } else { - break; - } - } - if meet_scopes_l.is_empty() { - return Ok(()); - } - } - let mut meet_scopes_ll = Vec::new(); - let mut start_ll = 0; - let mut end_ll = 0; - { - if !version.level_slice[level + 1].is_empty() { - let min_key = &meet_scopes_l - .first() - .ok_or(CompactionError::EmptyLevel)? - .min; - let max_key = &meet_scopes_l.last().ok_or(CompactionError::EmptyLevel)?.max; - min = min_key; - max = max_key; - - start_ll = - Version::::scope_search(min_key, &version.level_slice[level + 1]); - end_ll = - Version::::scope_search(max_key, &version.level_slice[level + 1]); - - let next_level_len = version.level_slice[level + 1].len(); - for scope in version.level_slice[level + 1] - [start_ll..cmp::min(end_ll + 1, next_level_len - 1)] - .iter() - { - if scope.contains(min) || scope.contains(max) { - meet_scopes_ll.push(scope); - } - } - } - } let mut streams = Vec::with_capacity(meet_scopes_l.len() + meet_scopes_ll.len()); - // This Level if level == 0 { for scope in meet_scopes_l.iter() { @@ -244,50 +205,8 @@ where streams.push(ScanStream::Level { inner: level_scan_ll, }); - let mut stream = MergeStream::::from_vec(streams).await?; - - // Kould: is the capacity parameter necessary? - let mut builder = R::Columns::builder(8192); - let mut written_size = 0; - let mut min = None; - let mut max = None; + Self::build_tables(option, version_edits, level, streams).await?; - while let Some(result) = Pin::new(&mut stream).next().await { - let entry = result?; - let key = entry.key(); - - if min.is_none() { - min = Some(key.value.to_key()) - } - max = Some(key.value.to_key()); - - written_size += key.size(); - builder.push(key, Some(entry.value())); - - if written_size >= option.max_sst_file_size { - Self::build_table( - option, - version_edits, - level, - &mut builder, - &mut min, - &mut max, - ) - .await?; - written_size = 0; - } - } - if written_size > 0 { - Self::build_table( - option, - version_edits, - level, - &mut builder, - &mut min, - &mut max, - ) - .await?; - } for scope in meet_scopes_l { version_edits.push(VersionEdit::Remove { level: level as u8, @@ -308,6 +227,123 @@ where Ok(()) } + fn next_level_scopes<'a>( + version: &'a Version, + min: &mut &'a ::Key, + max: &mut &'a ::Key, + level: usize, + meet_scopes_l: &[&'a Scope<::Key>], + ) -> Result<(Vec<&'a Scope<::Key>>, usize, usize), CompactionError> { + let mut meet_scopes_ll = Vec::new(); + let mut start_ll = 0; + let mut end_ll = 0; + { + if !version.level_slice[level + 1].is_empty() { + *min = &meet_scopes_l + .first() + .ok_or(CompactionError::EmptyLevel)? + .min; + *max = &meet_scopes_l + .iter() + .last() + .ok_or(CompactionError::EmptyLevel)? + .max; + + start_ll = Version::::scope_search(min, &version.level_slice[level + 1]); + end_ll = Version::::scope_search(max, &version.level_slice[level + 1]); + + let next_level_len = version.level_slice[level + 1].len(); + for scope in version.level_slice[level + 1] + [start_ll..cmp::min(end_ll + 1, next_level_len - 1)] + .iter() + { + if scope.contains(min) || scope.contains(max) { + meet_scopes_ll.push(scope); + } + } + } + } + Ok((meet_scopes_ll, start_ll, end_ll)) + } + + fn this_level_scopes<'a>( + version: &'a Version, + min: &::Key, + max: &::Key, + level: usize, + ) -> Option<(Vec<&'a Scope<::Key>>, usize, usize)> { + let mut meet_scopes_l = Vec::new(); + let start_l = Version::::scope_search(min, &version.level_slice[level]); + let mut end_l = start_l; + { + for scope in version.level_slice[level][start_l..].iter() { + if scope.contains(min) || scope.contains(max) { + meet_scopes_l.push(scope); + end_l += 1; + } else { + break; + } + } + if meet_scopes_l.is_empty() { + return None; + } + } + Some((meet_scopes_l, start_l, end_l)) + } + + async fn build_tables<'scan>( + option: &DbOption, + version_edits: &mut Vec::Key>>, + level: usize, + streams: Vec>, + ) -> Result<(), CompactionError> { + let mut stream = MergeStream::::from_vec(streams).await?; + + // Kould: is the capacity parameter necessary? + let mut builder = R::Columns::builder(8192); + let mut written_size = 0; + let mut min = None; + let mut max = None; + + while let Some(result) = Pin::new(&mut stream).next().await { + let entry = result?; + let key = entry.key(); + + if min.is_none() { + min = Some(key.value.to_key()) + } + max = Some(key.value.to_key()); + + written_size += key.size(); + builder.push(key, Some(entry.value())); + + if written_size >= option.max_sst_file_size { + Self::build_table( + option, + version_edits, + level, + &mut builder, + &mut min, + &mut max, + ) + .await?; + written_size = 0; + } + } + if written_size > 0 { + Self::build_table( + option, + version_edits, + level, + &mut builder, + &mut min, + &mut max, + ) + .await?; + } + Ok(()) + } + fn full_scope<'a>( meet_scopes: &[&'a Scope<::Key>], ) -> Result<(&'a ::Key, &'a ::Key), CompactionError> {