Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: getting record from sstable does not handle nullable logic #20

Merged
merged 1 commit into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ tokio = ["dep:tokio"]
[dependencies]
arrow = "52"
async-lock = "3"
crc32fast = "1"
crossbeam-skiplist = "0.1"
flume = { version = "0.11", features = ["async"] }
futures-core = "0.3"
Expand Down
6 changes: 3 additions & 3 deletions src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ where
max = Some(key.value.to_key());

written_size += key.size();
builder.push(key, Some(entry.value()));
builder.push(key, entry.value());

if written_size >= option.max_sst_file_size {
Self::build_table(
Expand Down Expand Up @@ -523,7 +523,7 @@ pub(crate) mod tests {
});

let scope = Compactor::<Test, TokioExecutor>::minor_compaction(
&DbOption::new(temp_dir.path()),
&DbOption::from(temp_dir.path()),
VecDeque::from(vec![batch_2, batch_1]),
)
.await
Expand All @@ -537,7 +537,7 @@ pub(crate) mod tests {
async fn major_compaction() {
let temp_dir = TempDir::new().unwrap();

let mut option = DbOption::new(temp_dir.path());
let mut option = DbOption::from(temp_dir.path());
option.major_threshold_with_sst_size = 2;
let option = Arc::new(option);

Expand Down
107 changes: 40 additions & 67 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
#![allow(dead_code)]
pub(crate) mod arrows;
mod arrows;
mod compaction;
pub mod executor;
pub mod fs;
mod inmem;
mod ondisk;
pub mod option;
mod record;
mod scope;
pub mod serdes;
mod stream;
mod timestamp;
mod transaction;
pub(crate) mod version;
mod version;

use std::{
collections::VecDeque, io, marker::PhantomData, mem, ops::Bound, path::PathBuf, sync::Arc,
};
use std::{collections::VecDeque, io, marker::PhantomData, mem, ops::Bound, sync::Arc};

use async_lock::{RwLock, RwLockReadGuard};
use fs::FileProvider;
Expand All @@ -26,40 +25,20 @@ use lockable::LockableHashMap;
use parquet::{
arrow::{arrow_to_parquet_schema, ProjectionMask},
errors::ParquetError,
file::properties::WriterProperties,
};
use record::Record;
use thiserror::Error;
use timestamp::Timestamp;
use tracing::error;
use transaction::Transaction;

pub use crate::option::*;
use crate::{
executor::Executor,
fs::{FileId, FileType},
stream::{merge::MergeStream, Entry, ScanStream},
version::{cleaner::Cleaner, set::VersionSet, Version, VersionError},
};

type LockMap<K> = Arc<LockableHashMap<K, ()>>;

pub enum Projection {
All,
Parts(Vec<usize>),
}

#[derive(Debug)]
pub struct DbOption {
pub path: PathBuf,
pub max_mem_table_size: usize,
pub immutable_chunk_num: usize,
pub major_threshold_with_sst_size: usize,
pub level_sst_magnification: usize,
pub max_sst_file_size: usize,
pub clean_channel_buffer: usize,
pub write_parquet_option: Option<WriterProperties>,
}

pub struct DB<R, E>
where
R: Record,
Expand All @@ -71,46 +50,6 @@ where
_p: PhantomData<E>,
}

impl DbOption {
pub fn new(path: impl Into<PathBuf> + Send) -> Self {
DbOption {
path: path.into(),
max_mem_table_size: 8 * 1024 * 1024,
immutable_chunk_num: 3,
major_threshold_with_sst_size: 10,
level_sst_magnification: 10,
max_sst_file_size: 24 * 1024 * 1024,
clean_channel_buffer: 10,
write_parquet_option: None,
}
}

pub(crate) fn table_path(&self, gen: &FileId) -> PathBuf {
self.path.join(format!("{}.{}", gen, FileType::Parquet))
}

pub(crate) fn wal_path(&self, gen: &FileId) -> PathBuf {
self.path.join(format!("{}.{}", gen, FileType::Wal))
}

pub(crate) fn version_path(&self) -> PathBuf {
self.path.join(format!("version.{}", FileType::Log))
}

pub(crate) fn is_threshold_exceeded_major<R, E>(
&self,
version: &Version<R, E>,
level: usize,
) -> bool
where
R: Record,
E: FileProvider,
{
Version::<R, E>::tables_len(version, level)
>= (self.major_threshold_with_sst_size * self.level_sst_magnification.pow(level as u32))
}
}

impl<R, E> DB<R, E>
where
R: Record + Send,
Expand Down Expand Up @@ -350,6 +289,13 @@ where
Parquet(#[from] ParquetError),
}

type LockMap<K> = Arc<LockableHashMap<K, ()>>;

pub enum Projection {
All,
Parts(Vec<usize>),
}

#[cfg(test)]
pub(crate) mod tests {
use std::{collections::VecDeque, sync::Arc};
Expand All @@ -359,6 +305,7 @@ pub(crate) mod tests {
datatypes::{DataType, Field, Schema, UInt32Type},
};
use async_lock::RwLock;
use futures_util::io;
use once_cell::sync::Lazy;
use parquet::arrow::ProjectionMask;
use tracing::error;
Expand All @@ -370,6 +317,7 @@ pub(crate) mod tests {
mutable::Mutable,
},
record::{internal::InternalRecordRef, RecordRef},
serdes::{Decode, Encode},
version::{cleaner::Cleaner, set::tests::build_version_set, Version},
DbOption, Record, WriteError, DB,
};
Expand All @@ -381,6 +329,17 @@ pub(crate) mod tests {
pub vobool: Option<bool>,
}

impl Decode for Test {
type Error = io::Error;

async fn decode<R>(reader: &mut R) -> Result<Self, Self::Error>
where
R: futures_io::AsyncRead + Unpin,
{
todo!()
}
}

impl Record for Test {
type Columns = TestImmutableArrays;

Expand Down Expand Up @@ -426,6 +385,21 @@ pub(crate) mod tests {
pub vbool: Option<bool>,
}

impl<'r> Encode for TestRef<'r> {
type Error = io::Error;

async fn encode<W>(&self, writer: &mut W) -> Result<(), Self::Error>
where
W: io::AsyncWrite + Unpin + Send + Sync,
{
todo!()
}

fn size(&self) -> usize {
todo!()
}
}

impl<'r> RecordRef<'r> for TestRef<'r> {
type Record = Test;

Expand Down Expand Up @@ -466,7 +440,6 @@ pub(crate) mod tests {
if !vbool_array.is_null(offset) {
vbool = Some(vbool_array.value(offset));
}
column_i += 1;
}

let record = TestRef {
Expand Down
60 changes: 30 additions & 30 deletions src/ondisk/sstable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ pub(crate) mod tests {
async fn write_sstable() {
let temp_dir = tempfile::tempdir().unwrap();
let record_batch = get_test_record_batch::<TokioExecutor>(
Arc::new(DbOption::new(temp_dir.path())),
Arc::new(DbOption::from(temp_dir.path())),
TokioExecutor::new(),
)
.await;
Expand All @@ -185,7 +185,7 @@ pub(crate) mod tests {
async fn projection_query() {
let temp_dir = tempfile::tempdir().unwrap();
let record_batch = get_test_record_batch::<TokioExecutor>(
Arc::new(DbOption::new(temp_dir.path())),
Arc::new(DbOption::from(temp_dir.path())),
TokioExecutor::new(),
)
.await;
Expand All @@ -212,9 +212,9 @@ pub(crate) mod tests {
.await
.unwrap()
.unwrap();
assert_eq!(test_ref_1.get().vstring, "hello");
assert_eq!(test_ref_1.get().vu32, Some(12));
assert_eq!(test_ref_1.get().vbool, None);
assert_eq!(test_ref_1.get().unwrap().vstring, "hello");
assert_eq!(test_ref_1.get().unwrap().vu32, Some(12));
assert_eq!(test_ref_1.get().unwrap().vbool, None);
}
{
let test_ref_2 = open_sstable::<Test, TokioExecutor>(&table_path)
Expand All @@ -229,9 +229,9 @@ pub(crate) mod tests {
.await
.unwrap()
.unwrap();
assert_eq!(test_ref_2.get().vstring, "hello");
assert_eq!(test_ref_2.get().vu32, None);
assert_eq!(test_ref_2.get().vbool, Some(true));
assert_eq!(test_ref_2.get().unwrap().vstring, "hello");
assert_eq!(test_ref_2.get().unwrap().vu32, None);
assert_eq!(test_ref_2.get().unwrap().vbool, Some(true));
}
{
let test_ref_3 = open_sstable::<Test, TokioExecutor>(&table_path)
Expand All @@ -246,17 +246,17 @@ pub(crate) mod tests {
.await
.unwrap()
.unwrap();
assert_eq!(test_ref_3.get().vstring, "hello");
assert_eq!(test_ref_3.get().vu32, None);
assert_eq!(test_ref_3.get().vbool, None);
assert_eq!(test_ref_3.get().unwrap().vstring, "hello");
assert_eq!(test_ref_3.get().unwrap().vu32, None);
assert_eq!(test_ref_3.get().unwrap().vbool, None);
}
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn projection_scan() {
let temp_dir = tempfile::tempdir().unwrap();
let record_batch = get_test_record_batch::<TokioExecutor>(
Arc::new(DbOption::new(temp_dir.path())),
Arc::new(DbOption::from(temp_dir.path())),
TokioExecutor::new(),
)
.await;
Expand Down Expand Up @@ -284,14 +284,14 @@ pub(crate) mod tests {
.unwrap();

let entry_0 = test_ref_1.next().await.unwrap().unwrap();
assert_eq!(entry_0.get().vstring, "hello");
assert_eq!(entry_0.get().vu32, Some(12));
assert_eq!(entry_0.get().vbool, None);
assert_eq!(entry_0.get().unwrap().vstring, "hello");
assert_eq!(entry_0.get().unwrap().vu32, Some(12));
assert_eq!(entry_0.get().unwrap().vbool, None);

let entry_1 = test_ref_1.next().await.unwrap().unwrap();
assert_eq!(entry_1.get().vstring, "world");
assert_eq!(entry_1.get().vu32, Some(12));
assert_eq!(entry_1.get().vbool, None);
assert_eq!(entry_1.get().unwrap().vstring, "world");
assert_eq!(entry_1.get().unwrap().vu32, Some(12));
assert_eq!(entry_1.get().unwrap().vbool, None);
}
{
let mut test_ref_2 = open_sstable::<Test, TokioExecutor>(&table_path)
Expand All @@ -309,14 +309,14 @@ pub(crate) mod tests {
.unwrap();

let entry_0 = test_ref_2.next().await.unwrap().unwrap();
assert_eq!(entry_0.get().vstring, "hello");
assert_eq!(entry_0.get().vu32, None);
assert_eq!(entry_0.get().vbool, Some(true));
assert_eq!(entry_0.get().unwrap().vstring, "hello");
assert_eq!(entry_0.get().unwrap().vu32, None);
assert_eq!(entry_0.get().unwrap().vbool, Some(true));

let entry_1 = test_ref_2.next().await.unwrap().unwrap();
assert_eq!(entry_1.get().vstring, "world");
assert_eq!(entry_1.get().vu32, None);
assert_eq!(entry_1.get().vbool, None);
assert_eq!(entry_1.get().unwrap().vstring, "world");
assert_eq!(entry_1.get().unwrap().vu32, None);
assert_eq!(entry_1.get().unwrap().vbool, None);
}
{
let mut test_ref_3 = open_sstable::<Test, TokioExecutor>(&table_path)
Expand All @@ -334,14 +334,14 @@ pub(crate) mod tests {
.unwrap();

let entry_0 = test_ref_3.next().await.unwrap().unwrap();
assert_eq!(entry_0.get().vstring, "hello");
assert_eq!(entry_0.get().vu32, None);
assert_eq!(entry_0.get().vbool, None);
assert_eq!(entry_0.get().unwrap().vstring, "hello");
assert_eq!(entry_0.get().unwrap().vu32, None);
assert_eq!(entry_0.get().unwrap().vbool, None);

let entry_1 = test_ref_3.next().await.unwrap().unwrap();
assert_eq!(entry_1.get().vstring, "world");
assert_eq!(entry_1.get().vu32, None);
assert_eq!(entry_1.get().vbool, None);
assert_eq!(entry_1.get().unwrap().vstring, "world");
assert_eq!(entry_1.get().unwrap().vu32, None);
assert_eq!(entry_1.get().unwrap().vbool, None);
}
}
}
Loading
Loading