Skip to content

Commit

Permalink
chore: Compactor::major_compaction split it to several submethods
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Jul 22, 2024
1 parent 0c31d94 commit 870d74f
Showing 1 changed file with 125 additions and 89 deletions.
214 changes: 125 additions & 89 deletions src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,54 +152,15 @@ where
if !option.is_threshold_exceeded_major(version, level) {
break;
}
let (meet_scopes_l, start_l, end_l) =
match Self::this_level_scopes(version, min, max, level) {
Some(value) => value,
None => return Ok(()),
};
let (meet_scopes_ll, start_ll, end_ll) =
Self::next_level_scopes(version, &mut min, &mut max, level, &meet_scopes_l)?;

let mut meet_scopes_l = Vec::new();
let start_l = Version::<R, FP>::scope_search(min, &version.level_slice[level]);
let mut end_l = start_l;
{
for scope in version.level_slice[level][start_l..].iter() {
if scope.contains(min) || scope.contains(max) {
meet_scopes_l.push(scope);
end_l += 1;
} else {
break;
}
}
if meet_scopes_l.is_empty() {
return Ok(());
}
}
let mut meet_scopes_ll = Vec::new();
let mut start_ll = 0;
let mut end_ll = 0;
{
if !version.level_slice[level + 1].is_empty() {
let min_key = &meet_scopes_l
.first()
.ok_or(CompactionError::EmptyLevel)?
.min;
let max_key = &meet_scopes_l.last().ok_or(CompactionError::EmptyLevel)?.max;
min = min_key;
max = max_key;

start_ll =
Version::<R, FP>::scope_search(min_key, &version.level_slice[level + 1]);
end_ll =
Version::<R, FP>::scope_search(max_key, &version.level_slice[level + 1]);

let next_level_len = version.level_slice[level + 1].len();
for scope in version.level_slice[level + 1]
[start_ll..cmp::min(end_ll + 1, next_level_len - 1)]
.iter()
{
if scope.contains(min) || scope.contains(max) {
meet_scopes_ll.push(scope);
}
}
}
}
let mut streams = Vec::with_capacity(meet_scopes_l.len() + meet_scopes_ll.len());

// This Level
if level == 0 {
for scope in meet_scopes_l.iter() {
Expand Down Expand Up @@ -244,50 +205,8 @@ where
streams.push(ScanStream::Level {
inner: level_scan_ll,
});
let mut stream = MergeStream::<R, FP>::from_vec(streams).await?;

// Kould: is the capacity parameter necessary?
let mut builder = R::Columns::builder(8192);
let mut written_size = 0;
let mut min = None;
let mut max = None;
Self::build_tables(option, version_edits, level, streams).await?;

while let Some(result) = Pin::new(&mut stream).next().await {
let entry = result?;
let key = entry.key();

if min.is_none() {
min = Some(key.value.to_key())
}
max = Some(key.value.to_key());

written_size += key.size();
builder.push(key, Some(entry.value()));

if written_size >= option.max_sst_file_size {
Self::build_table(
option,
version_edits,
level,
&mut builder,
&mut min,
&mut max,
)
.await?;
written_size = 0;
}
}
if written_size > 0 {
Self::build_table(
option,
version_edits,
level,
&mut builder,
&mut min,
&mut max,
)
.await?;
}
for scope in meet_scopes_l {
version_edits.push(VersionEdit::Remove {
level: level as u8,
Expand All @@ -308,6 +227,123 @@ where
Ok(())
}

fn next_level_scopes<'a>(
version: &'a Version<R, FP>,
min: &mut &'a <R as Record>::Key,
max: &mut &'a <R as Record>::Key,
level: usize,
meet_scopes_l: &[&'a Scope<<R as Record>::Key>],
) -> Result<(Vec<&'a Scope<<R as Record>::Key>>, usize, usize), CompactionError<R>> {
let mut meet_scopes_ll = Vec::new();
let mut start_ll = 0;
let mut end_ll = 0;
{
if !version.level_slice[level + 1].is_empty() {
*min = &meet_scopes_l
.first()
.ok_or(CompactionError::EmptyLevel)?
.min;
*max = &meet_scopes_l
.iter()
.last()
.ok_or(CompactionError::EmptyLevel)?
.max;

start_ll = Version::<R, FP>::scope_search(min, &version.level_slice[level + 1]);
end_ll = Version::<R, FP>::scope_search(max, &version.level_slice[level + 1]);

let next_level_len = version.level_slice[level + 1].len();
for scope in version.level_slice[level + 1]
[start_ll..cmp::min(end_ll + 1, next_level_len - 1)]
.iter()
{
if scope.contains(min) || scope.contains(max) {
meet_scopes_ll.push(scope);
}
}
}
}
Ok((meet_scopes_ll, start_ll, end_ll))
}

fn this_level_scopes<'a>(
version: &'a Version<R, FP>,
min: &<R as Record>::Key,
max: &<R as Record>::Key,
level: usize,
) -> Option<(Vec<&'a Scope<<R as Record>::Key>>, usize, usize)> {
let mut meet_scopes_l = Vec::new();
let start_l = Version::<R, FP>::scope_search(min, &version.level_slice[level]);
let mut end_l = start_l;
{
for scope in version.level_slice[level][start_l..].iter() {
if scope.contains(min) || scope.contains(max) {
meet_scopes_l.push(scope);
end_l += 1;
} else {
break;
}
}
if meet_scopes_l.is_empty() {
return None;
}
}
Some((meet_scopes_l, start_l, end_l))
}

async fn build_tables<'scan>(
option: &DbOption,
version_edits: &mut Vec<VersionEdit<<R as Record>::Key>>,
level: usize,
streams: Vec<ScanStream<'scan, R, FP>>,
) -> Result<(), CompactionError<R>> {
let mut stream = MergeStream::<R, FP>::from_vec(streams).await?;

// Kould: is the capacity parameter necessary?
let mut builder = R::Columns::builder(8192);
let mut written_size = 0;
let mut min = None;
let mut max = None;

while let Some(result) = Pin::new(&mut stream).next().await {
let entry = result?;
let key = entry.key();

if min.is_none() {
min = Some(key.value.to_key())
}
max = Some(key.value.to_key());

written_size += key.size();
builder.push(key, Some(entry.value()));

if written_size >= option.max_sst_file_size {
Self::build_table(
option,
version_edits,
level,
&mut builder,
&mut min,
&mut max,
)
.await?;
written_size = 0;
}
}
if written_size > 0 {
Self::build_table(
option,
version_edits,
level,
&mut builder,
&mut min,
&mut max,
)
.await?;
}
Ok(())
}

fn full_scope<'a>(
meet_scopes: &[&'a Scope<<R as Record>::Key>],
) -> Result<(&'a <R as Record>::Key, &'a <R as Record>::Key), CompactionError<R>> {
Expand Down

0 comments on commit 870d74f

Please sign in to comment.