Skip to content

Commit

Permalink
feat: impl Projection Pushdown
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Jul 23, 2024
1 parent 47571a9 commit f026a2e
Show file tree
Hide file tree
Showing 12 changed files with 612 additions and 123 deletions.
145 changes: 86 additions & 59 deletions src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{

use async_lock::RwLock;
use futures_util::StreamExt;
use parquet::arrow::AsyncArrowWriter;
use parquet::arrow::{AsyncArrowWriter, ProjectionMask};
use thiserror::Error;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use ulid::Ulid;
Expand Down Expand Up @@ -175,7 +175,12 @@ where

streams.push(ScanStream::SsTable {
inner: SsTable::open(file)
.scan((Bound::Unbounded, Bound::Unbounded), u32::MAX.into(), None)
.scan(
(Bound::Unbounded, Bound::Unbounded),
u32::MAX.into(),
None,
ProjectionMask::all(),
)
.await?,
});
}
Expand All @@ -189,6 +194,7 @@ where
(Bound::Included(lower), Bound::Included(upper)),
u32::MAX.into(),
None,
ProjectionMask::all(),
)
.ok_or(CompactionError::EmptyLevel)?;

Expand All @@ -206,6 +212,7 @@ where
(Bound::Included(lower), Bound::Included(upper)),
u32::MAX.into(),
None,
ProjectionMask::all(),
)
.ok_or(CompactionError::EmptyLevel)?;

Expand Down Expand Up @@ -406,7 +413,7 @@ where
}

#[cfg(test)]
mod tests {
pub(crate) mod tests {
use std::{collections::VecDeque, sync::Arc};

use flume::bounded;
Expand Down Expand Up @@ -435,7 +442,7 @@ mod tests {
Immutable::from(mutable)
}

async fn build_parquet_table<R: Record + Send, FP: Executor>(
pub(crate) async fn build_parquet_table<R: Record + Send, FP: Executor>(
option: &DbOption,
gen: FileId,
fn_mutable: impl FnOnce(&mut Mutable<R>),
Expand Down Expand Up @@ -531,6 +538,57 @@ mod tests {
option.major_threshold_with_sst_size = 2;
let option = Arc::new(option);

let ((table_gen_1, table_gen_2, table_gen_3, table_gen_4, _), version) =
build_version(&option).await;

let min = 2.to_string();
let max = 5.to_string();
let mut version_edits = Vec::new();

Compactor::<Test, TokioExecutor>::major_compaction(
&version,
&option,
&min,
&max,
&mut version_edits,
&mut vec![],
)
.await
.unwrap();
if let VersionEdit::Add { level, scope } = &version_edits[0] {
assert_eq!(*level, 1);
assert_eq!(scope.min, 1.to_string());
assert_eq!(scope.max, 6.to_string());
}
assert_eq!(
version_edits[1..5].to_vec(),
vec![
VersionEdit::Remove {
level: 0,
gen: table_gen_1,
},
VersionEdit::Remove {
level: 0,
gen: table_gen_2,
},
VersionEdit::Remove {
level: 1,
gen: table_gen_3,
},
VersionEdit::Remove {
level: 1,
gen: table_gen_4,
},
]
);
}

pub(crate) async fn build_version(
option: &Arc<DbOption>,
) -> (
(FileId, FileId, FileId, FileId, FileId),
Version<Test, TokioExecutor>,
) {
// level 0
let table_gen_1 = FileId::new();
let table_gen_2 = FileId::new();
Expand All @@ -539,23 +597,23 @@ mod tests {
Test {
vstring: 1.to_string(),
vu32: 0,
vobool: None,
vobool: Some(true),
},
1.into(),
);
mutable.insert(
Test {
vstring: 2.to_string(),
vu32: 0,
vobool: None,
vobool: Some(true),
},
1.into(),
);
mutable.insert(
Test {
vstring: 3.to_string(),
vu32: 0,
vobool: None,
vobool: Some(true),
},
0.into(),
);
Expand All @@ -567,23 +625,23 @@ mod tests {
Test {
vstring: 4.to_string(),
vu32: 0,
vobool: None,
vobool: Some(true),
},
1.into(),
);
mutable.insert(
Test {
vstring: 5.to_string(),
vu32: 0,
vobool: None,
vobool: Some(true),
},
1.into(),
);
mutable.insert(
Test {
vstring: 6.to_string(),
vu32: 0,
vobool: None,
vobool: Some(true),
},
0.into(),
);
Expand All @@ -600,23 +658,23 @@ mod tests {
Test {
vstring: 1.to_string(),
vu32: 0,
vobool: None,
vobool: Some(true),
},
0.into(),
);
mutable.insert(
Test {
vstring: 2.to_string(),
vu32: 0,
vobool: None,
vobool: Some(true),
},
0.into(),
);
mutable.insert(
Test {
vstring: 3.to_string(),
vu32: 0,
vobool: None,
vobool: Some(true),
},
0.into(),
);
Expand All @@ -628,23 +686,23 @@ mod tests {
Test {
vstring: 4.to_string(),
vu32: 0,
vobool: None,
vobool: Some(true),
},
0.into(),
);
mutable.insert(
Test {
vstring: 5.to_string(),
vu32: 0,
vobool: None,
vobool: Some(true),
},
0.into(),
);
mutable.insert(
Test {
vstring: 6.to_string(),
vu32: 0,
vobool: None,
vobool: Some(true),
},
0.into(),
);
Expand All @@ -656,23 +714,23 @@ mod tests {
Test {
vstring: 7.to_string(),
vu32: 0,
vobool: None,
vobool: Some(true),
},
0.into(),
);
mutable.insert(
Test {
vstring: 8.to_string(),
vu32: 0,
vobool: None,
vobool: Some(true),
},
0.into(),
);
mutable.insert(
Test {
vstring: 9.to_string(),
vu32: 0,
vobool: None,
vobool: Some(true),
},
0.into(),
);
Expand Down Expand Up @@ -712,46 +770,15 @@ mod tests {
gen: table_gen_5,
wal_ids: None,
});

let min = 2.to_string();
let max = 5.to_string();
let mut version_edits = Vec::new();

Compactor::<Test, TokioExecutor>::major_compaction(
&version,
&option,
&min,
&max,
&mut version_edits,
&mut vec![],
(
(
table_gen_1,
table_gen_2,
table_gen_3,
table_gen_4,
table_gen_5,
),
version,
)
.await
.unwrap();
if let VersionEdit::Add { level, scope } = &version_edits[0] {
assert_eq!(*level, 1);
assert_eq!(scope.min, 1.to_string());
assert_eq!(scope.max, 6.to_string());
}
assert_eq!(
version_edits[1..5].to_vec(),
vec![
VersionEdit::Remove {
level: 0,
gen: table_gen_1,
},
VersionEdit::Remove {
level: 0,
gen: table_gen_2,
},
VersionEdit::Remove {
level: 1,
gen: table_gen_3,
},
VersionEdit::Remove {
level: 1,
gen: table_gen_4,
},
]
);
}
}
Loading

0 comments on commit f026a2e

Please sign in to comment.