Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: avoid sst compacat twice #154

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 26 additions & 24 deletions src/compaction/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
use std::{cmp, collections::Bound, mem, pin::Pin, sync::Arc};
use std::{
cmp,
collections::{Bound, HashSet},
mem,
pin::Pin,
sync::Arc,
};

use async_lock::{RwLock, RwLockUpgradableReadGuard};
use futures_util::StreamExt;
Expand Down Expand Up @@ -180,15 +186,17 @@ where
delete_gens: &mut Vec<FileId>,
) -> Result<(), CompactionError<R>> {
let mut level = 0;

let mut compact_status = HashSet::new();
while level < MAX_LEVEL - 2 {
if !option.is_threshold_exceeded_major(version, level) {
break;
}
let (meet_scopes_l, start_l, end_l) = Self::this_level_scopes(version, min, max, level);
let (meet_scopes_ll, start_ll, end_ll) =
let meet_scopes_l = Self::this_level_scopes(version, min, max, level);
let mut meet_scopes_ll =
Self::next_level_scopes(version, &mut min, &mut max, level, &meet_scopes_l)?;

meet_scopes_ll.retain(|scope| compact_status.insert(scope.gen));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this retain meet_scopes_l? Because there will be no new sstable in meet_scopes_ll because compaction will increase step by step at level.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be meet_scopes_ll. Because we only care sstables that may be compacted in next compaction loop

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The meet_scopes_ll of this compaction is the meet_scopes_l of the next compaction, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, meet_scopes_l also need filtering, but it needn't insert to compact_status. So the code may be like this:

meet_scopes_l.retain(|scope| !compact_status.contains(&scope.gen));
meet_scopes_ll.iter().for_each(|scope| {
    compact_status.insert(scope.gen);
});

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if compact_status requires both meet_scopes_ll to be inserted, new sstables are ignored here, and is it better to just insert new sstables?

let mut streams = Vec::with_capacity(meet_scopes_l.len() + meet_scopes_ll.len());
// This Level
if level == 0 {
Expand All @@ -208,11 +216,11 @@ where
}
} else {
let (lower, upper) = Self::full_scope(&meet_scopes_l)?;
let gens = meet_scopes_l.iter().map(|scope| scope.gen).collect();

let level_scan_l = LevelStream::new(
version,
level,
start_l,
end_l,
gens,
(Bound::Included(lower), Bound::Included(upper)),
u32::MAX.into(),
None,
Expand All @@ -227,11 +235,10 @@ where
if !meet_scopes_ll.is_empty() {
// Next Level
let (lower, upper) = Self::full_scope(&meet_scopes_ll)?;
let gens = meet_scopes_ll.iter().map(|scope| scope.gen).collect();
let level_scan_ll = LevelStream::new(
version,
level + 1,
start_ll,
end_ll,
gens,
(Bound::Included(lower), Bound::Included(upper)),
u32::MAX.into(),
None,
Expand Down Expand Up @@ -271,10 +278,8 @@ where
max: &mut &'a <R as Record>::Key,
level: usize,
meet_scopes_l: &[&'a Scope<<R as Record>::Key>],
) -> Result<(Vec<&'a Scope<<R as Record>::Key>>, usize, usize), CompactionError<R>> {
) -> Result<Vec<&'a Scope<<R as Record>::Key>>, CompactionError<R>> {
let mut meet_scopes_ll = Vec::new();
let mut start_ll = 0;
let mut end_ll = 0;

if !version.level_slice[level + 1].is_empty() {
*min = meet_scopes_l
Expand All @@ -289,8 +294,8 @@ where
.max()
.ok_or(CompactionError::EmptyLevel)?;

start_ll = Version::<R, FP>::scope_search(min, &version.level_slice[level + 1]);
end_ll = Version::<R, FP>::scope_search(max, &version.level_slice[level + 1]);
let start_ll = Version::<R, FP>::scope_search(min, &version.level_slice[level + 1]);
let end_ll = Version::<R, FP>::scope_search(max, &version.level_slice[level + 1]);

let next_level_len = version.level_slice[level + 1].len();
for scope in version.level_slice[level + 1]
Expand All @@ -302,45 +307,42 @@ where
}
}
}
Ok((meet_scopes_ll, start_ll, end_ll))
Ok(meet_scopes_ll)
}

fn this_level_scopes<'a>(
version: &'a Version<R, FP>,
min: &<R as Record>::Key,
max: &<R as Record>::Key,
level: usize,
) -> (Vec<&'a Scope<<R as Record>::Key>>, usize, usize) {
) -> Vec<&'a Scope<<R as Record>::Key>> {
let mut meet_scopes_l = Vec::new();
let mut start_l = Version::<R, FP>::scope_search(min, &version.level_slice[level]);
let mut end_l = start_l;
let start_l = Version::<R, FP>::scope_search(min, &version.level_slice[level]);
let option = version.option();

for scope in version.level_slice[level][start_l..].iter() {
if (scope.contains(min) || scope.contains(max))
&& meet_scopes_l.len() <= option.major_l_selection_table_max_num
{
meet_scopes_l.push(scope);
end_l += 1;
} else {
break;
}
}
if meet_scopes_l.is_empty() {
start_l = 0;
end_l = cmp::min(
let end = cmp::min(
option.major_default_oldest_table_num,
version.level_slice[level].len(),
);

for scope in version.level_slice[level][..end_l].iter() {
for scope in version.level_slice[level][..end].iter() {
if meet_scopes_l.len() > option.major_l_selection_table_max_num {
break;
}
meet_scopes_l.push(scope);
}
}
(meet_scopes_l, start_l, end_l - 1)
meet_scopes_l
}

async fn build_tables<'scan>(
Expand Down
34 changes: 13 additions & 21 deletions src/stream/level.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use crate::{
fs::{FileId, FileProvider},
ondisk::{scan::SsTableScan, sstable::SsTable},
record::Record,
scope::Scope,
stream::record_batch::RecordBatchEntry,
timestamp::Timestamp,
version::Version,
Expand Down Expand Up @@ -64,19 +63,13 @@ where
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
version: &Version<R, FP>,
level: usize,
start: usize,
end: usize,
mut gens: VecDeque<FileId>,
range: (Bound<&'level R::Key>, Bound<&'level R::Key>),
ts: Timestamp,
limit: Option<usize>,
projection_mask: ProjectionMask,
) -> Option<Self> {
let (lower, upper) = range;
let mut gens: VecDeque<FileId> = version.level_slice[level][start..end + 1]
.iter()
.map(Scope::gen)
.collect();
let first_gen = gens.pop_front()?;
let status = FutureStatus::Init(first_gen);

Expand Down Expand Up @@ -155,30 +148,33 @@ where

#[cfg(test)]
mod tests {
use std::{collections::Bound, sync::Arc};
use std::{
collections::{Bound, VecDeque},
sync::Arc,
};

use futures_util::StreamExt;
use parquet::arrow::{arrow_to_parquet_schema, ProjectionMask};
use tempfile::TempDir;

use crate::{
compaction::tests::build_version, record::Record, stream::level::LevelStream, tests::Test,
DbOption,
compaction::tests::build_version, fs::FileId, record::Record, stream::level::LevelStream,
tests::Test, DbOption,
};

#[tokio::test]
async fn projection_scan() {
let temp_dir = TempDir::new().unwrap();
let option = Arc::new(DbOption::from(temp_dir.path()));

let (_, version) = build_version(&option).await;
let ((gen1, gen2, _, _, _), version) = build_version(&option).await;

let gens: VecDeque<FileId> = [gen1, gen2].into_iter().collect();

{
let mut level_stream_1 = LevelStream::new(
&version,
0,
0,
1,
gens.clone(),
(Bound::Unbounded, Bound::Unbounded),
1_u32.into(),
None,
Expand Down Expand Up @@ -211,9 +207,7 @@ mod tests {
{
let mut level_stream_1 = LevelStream::new(
&version,
0,
0,
1,
gens.clone(),
(Bound::Unbounded, Bound::Unbounded),
1_u32.into(),
None,
Expand Down Expand Up @@ -246,9 +240,7 @@ mod tests {
{
let mut level_stream_1 = LevelStream::new(
&version,
0,
0,
1,
gens,
(Bound::Unbounded, Bound::Unbounded),
1_u32.into(),
None,
Expand Down
33 changes: 10 additions & 23 deletions src/version/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub(crate) mod edit;
pub(crate) mod set;

use std::{
collections::VecDeque,
marker::PhantomData,
ops::Bound,
sync::{
Expand Down Expand Up @@ -207,38 +208,24 @@ where
.map_err(VersionError::Parquet)?,
})
}
for (i, scopes) in self.level_slice[1..].iter().enumerate() {
for scopes in self.level_slice[1..].iter() {
if scopes.is_empty() {
continue;
}

let (mut start, mut end) = (None, None);

for (idx, scope) in scopes.iter().enumerate() {
if scope.meets_range(range) {
if start.is_none() {
start = Some(idx);
}
end = Some(idx);
}
}
if start.is_none() {
let gens: VecDeque<FileId> = scopes
.iter()
.filter(|scope| scope.meets_range(range))
.map(Scope::gen)
.collect();
if gens.is_empty() {
continue;
}

streams.push(ScanStream::Level {
// SAFETY: checked scopes no empty
inner: LevelStream::new(
self,
i + 1,
start.unwrap(),
end.unwrap(),
range,
ts,
limit,
projection_mask.clone(),
)
.unwrap(),
inner: LevelStream::new(self, gens, range, ts, limit, projection_mask.clone())
.unwrap(),
});
}
Ok(())
Expand Down
Loading