Skip to content

Commit

Permalink
fix: getting record from sstable does not handle nullable logic
Browse files Browse the repository at this point in the history
  • Loading branch information
ethe committed Jul 24, 2024
1 parent 6a0d6a2 commit 65b3f2c
Show file tree
Hide file tree
Showing 15 changed files with 306 additions and 169 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ tokio = ["dep:tokio"]
[dependencies]
arrow = "52"
async-lock = "3"
crc32fast = "1"
crossbeam-skiplist = "0.1"
flume = { version = "0.11", features = ["async"] }
futures-core = "0.3"
Expand Down
6 changes: 3 additions & 3 deletions src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ where
max = Some(key.value.to_key());

written_size += key.size();
builder.push(key, Some(entry.value()));
builder.push(key, entry.value());

if written_size >= option.max_sst_file_size {
Self::build_table(
Expand Down Expand Up @@ -523,7 +523,7 @@ pub(crate) mod tests {
});

let scope = Compactor::<Test, TokioExecutor>::minor_compaction(
&DbOption::new(temp_dir.path()),
&DbOption::from(temp_dir.path()),
VecDeque::from(vec![batch_2, batch_1]),
)
.await
Expand All @@ -537,7 +537,7 @@ pub(crate) mod tests {
async fn major_compaction() {
let temp_dir = TempDir::new().unwrap();

let mut option = DbOption::new(temp_dir.path());
let mut option = DbOption::from(temp_dir.path());
option.major_threshold_with_sst_size = 2;
let option = Arc::new(option);

Expand Down
107 changes: 40 additions & 67 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
#![allow(dead_code)]
pub(crate) mod arrows;
mod arrows;
mod compaction;
pub mod executor;
pub mod fs;
mod inmem;
mod ondisk;
pub mod option;
mod record;
mod scope;
pub mod serdes;
mod stream;
mod timestamp;
mod transaction;
pub(crate) mod version;
mod version;

use std::{
collections::VecDeque, io, marker::PhantomData, mem, ops::Bound, path::PathBuf, sync::Arc,
};
use std::{collections::VecDeque, io, marker::PhantomData, mem, ops::Bound, sync::Arc};

use async_lock::{RwLock, RwLockReadGuard};
use fs::FileProvider;
Expand All @@ -26,40 +25,20 @@ use lockable::LockableHashMap;
use parquet::{
arrow::{arrow_to_parquet_schema, ProjectionMask},
errors::ParquetError,
file::properties::WriterProperties,
};
use record::Record;
use thiserror::Error;
use timestamp::Timestamp;
use tracing::error;
use transaction::Transaction;

pub use crate::option::*;
use crate::{
executor::Executor,
fs::{FileId, FileType},
stream::{merge::MergeStream, Entry, ScanStream},
version::{cleaner::Cleaner, set::VersionSet, Version, VersionError},
};

type LockMap<K> = Arc<LockableHashMap<K, ()>>;

pub enum Projection {
All,
Parts(Vec<usize>),
}

#[derive(Debug)]
pub struct DbOption {
pub path: PathBuf,
pub max_mem_table_size: usize,
pub immutable_chunk_num: usize,
pub major_threshold_with_sst_size: usize,
pub level_sst_magnification: usize,
pub max_sst_file_size: usize,
pub clean_channel_buffer: usize,
pub write_parquet_option: Option<WriterProperties>,
}

pub struct DB<R, E>
where
R: Record,
Expand All @@ -71,46 +50,6 @@ where
_p: PhantomData<E>,
}

impl DbOption {
pub fn new(path: impl Into<PathBuf> + Send) -> Self {
DbOption {
path: path.into(),
max_mem_table_size: 8 * 1024 * 1024,
immutable_chunk_num: 3,
major_threshold_with_sst_size: 10,
level_sst_magnification: 10,
max_sst_file_size: 24 * 1024 * 1024,
clean_channel_buffer: 10,
write_parquet_option: None,
}
}

pub(crate) fn table_path(&self, gen: &FileId) -> PathBuf {
self.path.join(format!("{}.{}", gen, FileType::Parquet))
}

pub(crate) fn wal_path(&self, gen: &FileId) -> PathBuf {
self.path.join(format!("{}.{}", gen, FileType::Wal))
}

pub(crate) fn version_path(&self) -> PathBuf {
self.path.join(format!("version.{}", FileType::Log))
}

pub(crate) fn is_threshold_exceeded_major<R, E>(
&self,
version: &Version<R, E>,
level: usize,
) -> bool
where
R: Record,
E: FileProvider,
{
Version::<R, E>::tables_len(version, level)
>= (self.major_threshold_with_sst_size * self.level_sst_magnification.pow(level as u32))
}
}

impl<R, E> DB<R, E>
where
R: Record + Send,
Expand Down Expand Up @@ -350,6 +289,13 @@ where
Parquet(#[from] ParquetError),
}

type LockMap<K> = Arc<LockableHashMap<K, ()>>;

pub enum Projection {
All,
Parts(Vec<usize>),
}

#[cfg(test)]
pub(crate) mod tests {
use std::{collections::VecDeque, sync::Arc};
Expand All @@ -359,10 +305,12 @@ pub(crate) mod tests {
datatypes::{DataType, Field, Schema, UInt32Type},
};
use async_lock::RwLock;
use futures_util::io;
use once_cell::sync::Lazy;
use parquet::arrow::ProjectionMask;
use tracing::error;

use crate::serdes::{Decode, Encode};
use crate::{
executor::{tokio::TokioExecutor, Executor},
inmem::{
Expand All @@ -381,6 +329,17 @@ pub(crate) mod tests {
pub vobool: Option<bool>,
}

impl Decode for Test {
type Error = io::Error;

async fn decode<R>(reader: &mut R) -> Result<Self, Self::Error>
where
R: futures_io::AsyncRead + Unpin,
{
todo!()
}
}

impl Record for Test {
type Columns = TestImmutableArrays;

Expand Down Expand Up @@ -426,6 +385,21 @@ pub(crate) mod tests {
pub vbool: Option<bool>,
}

impl<'r> Encode for TestRef<'r> {
type Error = io::Error;

async fn encode<W>(&self, writer: &mut W) -> Result<(), Self::Error>
where
W: io::AsyncWrite + Unpin + Send + Sync,
{
todo!()
}

fn size(&self) -> usize {
todo!()
}
}

impl<'r> RecordRef<'r> for TestRef<'r> {
type Record = Test;

Expand Down Expand Up @@ -466,7 +440,6 @@ pub(crate) mod tests {
if !vbool_array.is_null(offset) {
vbool = Some(vbool_array.value(offset));
}
column_i += 1;
}

let record = TestRef {
Expand Down
60 changes: 30 additions & 30 deletions src/ondisk/sstable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ pub(crate) mod tests {
async fn write_sstable() {
let temp_dir = tempfile::tempdir().unwrap();
let record_batch = get_test_record_batch::<TokioExecutor>(
Arc::new(DbOption::new(temp_dir.path())),
Arc::new(DbOption::from(temp_dir.path())),
TokioExecutor::new(),
)
.await;
Expand All @@ -185,7 +185,7 @@ pub(crate) mod tests {
async fn projection_query() {
let temp_dir = tempfile::tempdir().unwrap();
let record_batch = get_test_record_batch::<TokioExecutor>(
Arc::new(DbOption::new(temp_dir.path())),
Arc::new(DbOption::from(temp_dir.path())),
TokioExecutor::new(),
)
.await;
Expand All @@ -212,9 +212,9 @@ pub(crate) mod tests {
.await
.unwrap()
.unwrap();
assert_eq!(test_ref_1.get().vstring, "hello");
assert_eq!(test_ref_1.get().vu32, Some(12));
assert_eq!(test_ref_1.get().vbool, None);
assert_eq!(test_ref_1.get().unwrap().vstring, "hello");
assert_eq!(test_ref_1.get().unwrap().vu32, Some(12));
assert_eq!(test_ref_1.get().unwrap().vbool, None);
}
{
let test_ref_2 = open_sstable::<Test, TokioExecutor>(&table_path)
Expand All @@ -229,9 +229,9 @@ pub(crate) mod tests {
.await
.unwrap()
.unwrap();
assert_eq!(test_ref_2.get().vstring, "hello");
assert_eq!(test_ref_2.get().vu32, None);
assert_eq!(test_ref_2.get().vbool, Some(true));
assert_eq!(test_ref_2.get().unwrap().vstring, "hello");
assert_eq!(test_ref_2.get().unwrap().vu32, None);
assert_eq!(test_ref_2.get().unwrap().vbool, Some(true));
}
{
let test_ref_3 = open_sstable::<Test, TokioExecutor>(&table_path)
Expand All @@ -246,17 +246,17 @@ pub(crate) mod tests {
.await
.unwrap()
.unwrap();
assert_eq!(test_ref_3.get().vstring, "hello");
assert_eq!(test_ref_3.get().vu32, None);
assert_eq!(test_ref_3.get().vbool, None);
assert_eq!(test_ref_3.get().unwrap().vstring, "hello");
assert_eq!(test_ref_3.get().unwrap().vu32, None);
assert_eq!(test_ref_3.get().unwrap().vbool, None);
}
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn projection_scan() {
let temp_dir = tempfile::tempdir().unwrap();
let record_batch = get_test_record_batch::<TokioExecutor>(
Arc::new(DbOption::new(temp_dir.path())),
Arc::new(DbOption::from(temp_dir.path())),
TokioExecutor::new(),
)
.await;
Expand Down Expand Up @@ -284,14 +284,14 @@ pub(crate) mod tests {
.unwrap();

let entry_0 = test_ref_1.next().await.unwrap().unwrap();
assert_eq!(entry_0.get().vstring, "hello");
assert_eq!(entry_0.get().vu32, Some(12));
assert_eq!(entry_0.get().vbool, None);
assert_eq!(entry_0.get().unwrap().vstring, "hello");
assert_eq!(entry_0.get().unwrap().vu32, Some(12));
assert_eq!(entry_0.get().unwrap().vbool, None);

let entry_1 = test_ref_1.next().await.unwrap().unwrap();
assert_eq!(entry_1.get().vstring, "world");
assert_eq!(entry_1.get().vu32, Some(12));
assert_eq!(entry_1.get().vbool, None);
assert_eq!(entry_1.get().unwrap().vstring, "world");
assert_eq!(entry_1.get().unwrap().vu32, Some(12));
assert_eq!(entry_1.get().unwrap().vbool, None);
}
{
let mut test_ref_2 = open_sstable::<Test, TokioExecutor>(&table_path)
Expand All @@ -309,14 +309,14 @@ pub(crate) mod tests {
.unwrap();

let entry_0 = test_ref_2.next().await.unwrap().unwrap();
assert_eq!(entry_0.get().vstring, "hello");
assert_eq!(entry_0.get().vu32, None);
assert_eq!(entry_0.get().vbool, Some(true));
assert_eq!(entry_0.get().unwrap().vstring, "hello");
assert_eq!(entry_0.get().unwrap().vu32, None);
assert_eq!(entry_0.get().unwrap().vbool, Some(true));

let entry_1 = test_ref_2.next().await.unwrap().unwrap();
assert_eq!(entry_1.get().vstring, "world");
assert_eq!(entry_1.get().vu32, None);
assert_eq!(entry_1.get().vbool, None);
assert_eq!(entry_1.get().unwrap().vstring, "world");
assert_eq!(entry_1.get().unwrap().vu32, None);
assert_eq!(entry_1.get().unwrap().vbool, None);
}
{
let mut test_ref_3 = open_sstable::<Test, TokioExecutor>(&table_path)
Expand All @@ -334,14 +334,14 @@ pub(crate) mod tests {
.unwrap();

let entry_0 = test_ref_3.next().await.unwrap().unwrap();
assert_eq!(entry_0.get().vstring, "hello");
assert_eq!(entry_0.get().vu32, None);
assert_eq!(entry_0.get().vbool, None);
assert_eq!(entry_0.get().unwrap().vstring, "hello");
assert_eq!(entry_0.get().unwrap().vu32, None);
assert_eq!(entry_0.get().unwrap().vbool, None);

let entry_1 = test_ref_3.next().await.unwrap().unwrap();
assert_eq!(entry_1.get().vstring, "world");
assert_eq!(entry_1.get().vu32, None);
assert_eq!(entry_1.get().vbool, None);
assert_eq!(entry_1.get().unwrap().vstring, "world");
assert_eq!(entry_1.get().unwrap().vu32, None);
assert_eq!(entry_1.get().unwrap().vbool, None);
}
}
}
Loading

0 comments on commit 65b3f2c

Please sign in to comment.