From 9e80dba241c3f52fee3a860fbb4e1725dd051dd6 Mon Sep 17 00:00:00 2001 From: Gwo Tzu-Hsing Date: Mon, 16 Dec 2024 22:26:25 +0800 Subject: [PATCH] refactor: remove R type arg on `DbOption` --- benches/common.rs | 4 +- bindings/python/src/db.rs | 17 +---- bindings/python/src/options.rs | 27 +++---- bindings/python/src/range.rs | 1 + bindings/python/src/transaction.rs | 21 +++--- bindings/python/src/utils.rs | 11 +-- examples/datafusion.rs | 4 +- examples/declare.rs | 4 +- examples/dynamic.rs | 17 +++-- src/compaction/mod.rs | 48 ++++++------ src/inmem/mutable.rs | 30 ++++---- src/lib.rs | 116 ++++++++++------------------- src/ondisk/sstable.rs | 8 +- src/option.rs | 98 +++++------------------- src/record/runtime/record.rs | 66 +--------------- src/snapshot.rs | 4 +- src/stream/level.rs | 15 ++-- src/stream/mem_projection.rs | 6 +- src/stream/merge.rs | 20 ++--- src/stream/package.rs | 6 +- src/transaction.rs | 43 +++++------ src/version/cleaner.rs | 28 +++---- src/version/mod.rs | 6 +- src/version/set.rs | 18 ++--- tests/data_integrity.rs | 11 +-- tests/macros_correctness.rs | 1 - tests/wasm.rs | 89 ++++++++-------------- 27 files changed, 248 insertions(+), 471 deletions(-) diff --git a/benches/common.rs b/benches/common.rs index c0249dc..52480ea 100644 --- a/benches/common.rs +++ b/benches/common.rs @@ -273,7 +273,7 @@ impl BenchDatabase for TonboS3BenchDataBase { .disable_wal(); TonboS3BenchDataBase::new( - tonbo::DB::new(option, TokioExecutor::current()) + tonbo::DB::new(option, TokioExecutor::current(), &CustomerSchema) .await .unwrap(), ) @@ -324,7 +324,7 @@ impl BenchDatabase for TonboBenchDataBase { DbOption::from(fusio::path::Path::from_filesystem_path(path.as_ref()).unwrap()) .disable_wal(); - let db = tonbo::DB::new(option, TokioExecutor::current()) + let db = tonbo::DB::new(option, TokioExecutor::current(), &CustomerSchema) .await .unwrap(); TonboBenchDataBase::new(db) diff --git a/bindings/python/src/db.rs b/bindings/python/src/db.rs index 3b69d9f..db587e7 100644 --- a/bindings/python/src/db.rs +++ b/bindings/python/src/db.rs @@ -9,10 +9,10 @@ use pyo3::{ use pyo3_asyncio::tokio::{future_into_py, get_runtime}; use tonbo::{ executor::tokio::TokioExecutor, - record::DynRecord, + record::{DynRecord, DynSchema, Value, ValueDesc}, DB, }; -use tonbo::record::{DynSchema, Value, ValueDesc}; + use crate::{ column::Column, error::{CommitError, DbError}, @@ -40,7 +40,6 @@ impl TonboDB { let mut desc = vec![]; let mut cols = vec![]; let mut primary_key_index = None; - let mut primary_key_name = None; for i in 0..values.len()? { let value = values.get_item(i)?; @@ -51,23 +50,15 @@ impl TonboDB { panic!("Multiple primary keys is not allowed!") } primary_key_index = Some(desc.len()); - primary_key_name = Some(col.name.clone()); } cols.push(col.clone()); desc.push(ValueDesc::from(col)); } } let schema = DynSchema::new(desc, primary_key_index.unwrap()); - let option = option.into_option(primary_key_index.unwrap(), primary_key_name.unwrap()); + let option = option.into_option(&schema); let db = get_runtime() - .block_on(async { - DB::with_schema( - option, - TokioExecutor::current(), - schema, - ) - .await - }) + .block_on(async { DB::new(option, TokioExecutor::current(), schema).await }) .unwrap(); Ok(Self { db: Arc::new(db), diff --git a/bindings/python/src/options.rs b/bindings/python/src/options.rs index 3e994c6..5fa98ca 100644 --- a/bindings/python/src/options.rs +++ b/bindings/python/src/options.rs @@ -1,6 +1,6 @@ use fusio::path::Path; use pyo3::{pyclass, pymethods, PyResult}; -use tonbo::record::DynRecord; +use tonbo::record::Schema; use crate::{ExceedsMaxLevelError, FsOptions}; @@ -72,21 +72,16 @@ impl DbOption { } impl DbOption { - pub(crate) fn into_option( - self, - primary_key_index: usize, - primary_key_name: String, - ) -> tonbo::DbOption { - let mut opt = - tonbo::DbOption::with_path(Path::from(self.path), primary_key_name, primary_key_index) - .clean_channel_buffer(self.clean_channel_buffer) - .immutable_chunk_num(self.immutable_chunk_num) - .level_sst_magnification(self.level_sst_magnification) - .major_default_oldest_table_num(self.major_default_oldest_table_num) - .major_threshold_with_sst_size(self.major_threshold_with_sst_size) - .max_sst_file_size(self.max_sst_file_size) - .version_log_snapshot_threshold(self.version_log_snapshot_threshold) - .base_fs(fusio_dispatch::FsOptions::from(self.base_fs)); + pub(crate) fn into_option(self, schema: &S) -> tonbo::DbOption { + let mut opt = tonbo::DbOption::new(Path::from(self.path), schema) + .clean_channel_buffer(self.clean_channel_buffer) + .immutable_chunk_num(self.immutable_chunk_num) + .level_sst_magnification(self.level_sst_magnification) + .major_default_oldest_table_num(self.major_default_oldest_table_num) + .major_threshold_with_sst_size(self.major_threshold_with_sst_size) + .max_sst_file_size(self.max_sst_file_size) + .version_log_snapshot_threshold(self.version_log_snapshot_threshold) + .base_fs(fusio_dispatch::FsOptions::from(self.base_fs)); for (level, path) in self.level_paths.into_iter().enumerate() { if let Some((path, fs_options)) = path { opt = opt diff --git a/bindings/python/src/range.rs b/bindings/python/src/range.rs index 84dcd58..297c1ac 100644 --- a/bindings/python/src/range.rs +++ b/bindings/python/src/range.rs @@ -2,6 +2,7 @@ use std::ops; use pyo3::{pyclass, FromPyObject, Py, PyAny, Python}; use tonbo::record::Value; + use crate::{utils::to_col, Column}; #[pyclass] diff --git a/bindings/python/src/transaction.rs b/bindings/python/src/transaction.rs index cfc816f..f9f5212 100644 --- a/bindings/python/src/transaction.rs +++ b/bindings/python/src/transaction.rs @@ -6,8 +6,11 @@ use pyo3::{ Bound, IntoPy, Py, PyAny, PyResult, Python, }; use pyo3_asyncio::tokio::future_into_py; -use tonbo::{record::DynRecord, transaction, Projection}; -use tonbo::record::Value; +use tonbo::{ + record::{DynRecord, Value}, + transaction, Projection, +}; + use crate::{ column::Column, error::{repeated_commit_err, CommitError, DbError}, @@ -181,16 +184,14 @@ impl Transaction { future_into_py(py, async move { let mut scan = txn.scan(( unsafe { - transmute::< - std::ops::Bound<&Value>, - std::ops::Bound<&'static Value>, - >(lower.as_ref()) + transmute::, std::ops::Bound<&'static Value>>( + lower.as_ref(), + ) }, unsafe { - transmute::< - std::ops::Bound<&Value>, - std::ops::Bound<&'static Value>, - >(high.as_ref()) + transmute::, std::ops::Bound<&'static Value>>( + high.as_ref(), + ) }, )); diff --git a/bindings/python/src/utils.rs b/bindings/python/src/utils.rs index 1d231c8..0dfb64f 100644 --- a/bindings/python/src/utils.rs +++ b/bindings/python/src/utils.rs @@ -8,11 +8,7 @@ use tonbo::record::{Datatype, Value}; use crate::{column::Column, datatype::DataType, range}; -pub(crate) fn to_dict( - py: Python, - primary_key_index: usize, - record: Vec, -) -> Bound { +pub(crate) fn to_dict(py: Python, primary_key_index: usize, record: Vec) -> Bound { let dict = PyDict::new_bound(py); for (idx, col) in record.iter().enumerate() { match &col.datatype { @@ -199,10 +195,7 @@ pub(crate) fn to_bound( col: &Column, lower: Option>, high: Option>, -) -> ( - std::ops::Bound, - std::ops::Bound, -) { +) -> (std::ops::Bound, std::ops::Bound) { let lower = match lower { Some(bound) => bound.get().to_bound(py, col), None => std::ops::Bound::Unbounded, diff --git a/examples/datafusion.rs b/examples/datafusion.rs index b9564c9..34a9c45 100644 --- a/examples/datafusion.rs +++ b/examples/datafusion.rs @@ -221,10 +221,10 @@ async fn main() -> Result<()> { // make sure the path exists let _ = fs::create_dir_all("./db_path/music").await; - let options = DbOption::from(( + let options = DbOption::new( Path::from_filesystem_path("./db_path/music").unwrap(), &MusicSchema, - )); + ); let db = DB::new(options, TokioExecutor::current(), MusicSchema) .await diff --git a/examples/declare.rs b/examples/declare.rs index 685255a..a4c7d78 100644 --- a/examples/declare.rs +++ b/examples/declare.rs @@ -22,10 +22,10 @@ async fn main() { // make sure the path exists let _ = fs::create_dir_all("./db_path/users").await; - let options = DbOption::from(( + let options = DbOption::new( Path::from_filesystem_path("./db_path/users").unwrap(), &UserSchema, - )); + ); // pluggable async runtime and I/O let db = DB::new(options, TokioExecutor::current(), UserSchema) .await diff --git a/examples/dynamic.rs b/examples/dynamic.rs index cc5c8da..e982bf2 100644 --- a/examples/dynamic.rs +++ b/examples/dynamic.rs @@ -1,10 +1,11 @@ -use std::fs; -use std::sync::Arc; +use std::{fs, sync::Arc}; use fusio::path::Path; -use tonbo::executor::tokio::TokioExecutor; -use tonbo::record::{Datatype, DynRecord, DynSchema, Value, ValueDesc}; -use tonbo::{DbOption, DB}; +use tonbo::{ + executor::tokio::TokioExecutor, + record::{Datatype, DynRecord, DynSchema, Value, ValueDesc}, + DbOption, DB, +}; #[tokio::main] async fn main() { @@ -18,11 +19,11 @@ async fn main() { 0, ); - let options = DbOption::from(( + let options = DbOption::new( Path::from_filesystem_path("./db_path/users").unwrap(), &schema, - )); - let db = DB::with_schema(options, TokioExecutor::current(), schema) + ); + let db = DB::new(options, TokioExecutor::current(), schema) .await .unwrap(); diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index f4d7da2..cc9262c 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -35,7 +35,7 @@ pub(crate) struct Compactor where R: Record, { - pub(crate) option: Arc>, + pub(crate) option: Arc, pub(crate) schema: Arc>>, pub(crate) version_set: VersionSet, pub(crate) manager: Arc, @@ -49,7 +49,7 @@ where pub(crate) fn new( schema: Arc>>, record_schema: Arc, - option: Arc>, + option: Arc, version_set: VersionSet, manager: Arc, ) -> Self { @@ -148,7 +148,7 @@ where } pub(crate) async fn minor_compaction( - option: &DbOption, + option: &DbOption, recover_wal_ids: Option>, batches: &[( Option, @@ -211,7 +211,7 @@ where #[allow(clippy::too_many_arguments)] pub(crate) async fn major_compaction( version: &Version, - option: &DbOption, + option: &DbOption, mut min: &::Key, mut max: &::Key, version_edits: &mut Vec::Key>>, @@ -419,7 +419,7 @@ where } async fn build_tables<'scan>( - option: &DbOption, + option: &DbOption, version_edits: &mut Vec::Key>>, level: usize, streams: Vec>, @@ -490,7 +490,7 @@ where #[allow(clippy::too_many_arguments)] async fn build_table( - option: &DbOption, + option: &DbOption, version_edits: &mut Vec::Key>>, level: usize, builder: &mut <::Columns as ArrowArrays>::Builder, @@ -582,7 +582,7 @@ pub(crate) mod tests { }; async fn build_immutable( - option: &DbOption, + option: &DbOption, records: Vec<(LogType, R, Timestamp)>, schema: &Arc, fs: &Arc, @@ -601,7 +601,7 @@ pub(crate) mod tests { } pub(crate) async fn build_parquet_table( - option: &DbOption, + option: &DbOption, gen: FileId, records: Vec<(LogType, R, Timestamp)>, schema: &Arc, @@ -634,10 +634,10 @@ pub(crate) mod tests { let temp_dir = tempfile::tempdir().unwrap(); let temp_dir_l0 = tempfile::tempdir().unwrap(); - let option = DbOption::from(( + let option = DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), &TestSchema, - )) + ) .level_path( 0, Path::from_filesystem_path(temp_dir_l0.path()).unwrap(), @@ -747,21 +747,21 @@ pub(crate) mod tests { async fn dyn_minor_compaction() { let temp_dir = tempfile::tempdir().unwrap(); let manager = StoreManager::new(FsOptions::Local, vec![]).unwrap(); - let option = DbOption::with_path( - Path::from_filesystem_path(temp_dir.path()).unwrap(), - "id".to_string(), + let schema = DynSchema::new( + vec![ValueDesc::new("id".to_owned(), Datatype::Int32, false)], 0, ); + let option = DbOption::new( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &schema, + ); manager .base_fs() .create_dir_all(&option.wal_dir_path()) .await .unwrap(); - let instance = Arc::new(DynSchema::new( - vec![ValueDesc::new("id".to_owned(), Datatype::Int32, false)], - 0, - )); + let instance = Arc::new(schema); let mut batch1_data = vec![]; let mut batch2_data = vec![]; @@ -818,10 +818,10 @@ pub(crate) mod tests { let temp_dir_l0 = TempDir::new().unwrap(); let temp_dir_l1 = TempDir::new().unwrap(); - let mut option = DbOption::from(( + let mut option = DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), &TestSchema, - )) + ) .level_path( 0, Path::from_filesystem_path(temp_dir_l0.path()).unwrap(), @@ -900,7 +900,7 @@ pub(crate) mod tests { } pub(crate) async fn build_version( - option: &Arc>, + option: &Arc, manager: &StoreManager, schema: &Arc, ) -> ((FileId, FileId, FileId, FileId, FileId), Version) { @@ -1162,10 +1162,10 @@ pub(crate) mod tests { pub(crate) async fn major_panic() { let temp_dir = TempDir::new().unwrap(); - let mut option = DbOption::from(( + let mut option = DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), &TestSchema, - )); + ); option.major_threshold_with_sst_size = 1; option.level_sst_magnification = 1; let manager = @@ -1273,10 +1273,10 @@ pub(crate) mod tests { async fn test_flush_major_level_sort() { let temp_dir = TempDir::new().unwrap(); - let mut option = DbOption::from(( + let mut option = DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), &TestSchema, - )); + ); option.immutable_chunk_num = 1; option.immutable_chunk_max_num = 0; option.major_threshold_with_sst_size = 2; diff --git a/src/inmem/mutable.rs b/src/inmem/mutable.rs index 79b29b0..0212bf3 100644 --- a/src/inmem/mutable.rs +++ b/src/inmem/mutable.rs @@ -47,7 +47,7 @@ where R: Record, { pub async fn new( - option: &DbOption, + option: &DbOption, trigger: Arc + Send + Sync>>, fs: &Arc, schema: Arc, @@ -234,10 +234,10 @@ mod tests { let temp_dir = tempfile::tempdir().unwrap(); let fs = Arc::new(TokioFs) as Arc; - let option = DbOption::from(( + let option = DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), &TestSchema, - )); + ); fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); @@ -287,10 +287,10 @@ mod tests { async fn range() { let temp_dir = tempfile::tempdir().unwrap(); let fs = Arc::new(TokioFs) as Arc; - let option = DbOption::from(( + let option = DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), &StringSchema, - )); + ); fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); @@ -375,23 +375,23 @@ mod tests { #[tokio::test] async fn test_dyn_read() { let temp_dir = tempfile::tempdir().unwrap(); - let option = DbOption::with_path( - Path::from_filesystem_path(temp_dir.path()).unwrap(), - "age".to_string(), + let schema = DynSchema::new( + vec![ + ValueDesc::new("age".to_string(), Datatype::Int8, false), + ValueDesc::new("height".to_string(), Datatype::Int16, true), + ], 0, ); + let option = DbOption::new( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &schema, + ); let fs = Arc::new(TokioFs) as Arc; fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); - let schema = Arc::new(DynSchema::new( - vec![ - ValueDesc::new("age".to_string(), Datatype::Int8, false), - ValueDesc::new("height".to_string(), Datatype::Int16, true), - ], - 0, - )); + let schema = Arc::new(schema); let mutable = Mutable::::new(&option, trigger, &fs, schema) .await diff --git a/src/lib.rs b/src/lib.rs index 43c3c6a..e58db94 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -54,10 +54,10 @@ //! // make sure the path exists //! let _ = fs::create_dir_all("./db_path/users").await; //! -//! let options = DbOption::from(( +//! let options = DbOption::new( //! Path::from_filesystem_path("./db_path/users").unwrap(), //! &UserSchema, -//! )); +//! ); //! // pluggable async runtime and I/O //! let db = DB::new(options, TokioExecutor::current(), UserSchema) //! .await @@ -154,7 +154,7 @@ use parquet::{ errors::ParquetError, }; use parquet_lru::{DynLruCache, NoCache}; -use record::{DynRecord, Record}; +use record::Record; use thiserror::Error; use timestamp::{Timestamp, TimestampedRef}; use tokio::sync::oneshot; @@ -167,7 +167,7 @@ use crate::{ compaction::{CompactTask, CompactionError, Compactor}, executor::Executor, fs::{manager::StoreManager, parse_file_id, FileType}, - record::{DynSchema, Schema as RecordSchema}, + record::Schema as RecordSchema, serdes::Decode, snapshot::Snapshot, stream::{ @@ -193,22 +193,6 @@ where _p: PhantomData, } -impl DB -where - E: Executor + Send + Sync + 'static, -{ - /// Open [`DB`] with schema which determined by [`ColumnDesc`]. - pub async fn with_schema( - option: DbOption, - executor: E, - schema: DynSchema, - ) -> Result> { - let option = Arc::new(option); - - Self::build(option, executor, schema, Arc::new(NoCache::default())).await - } -} - impl DB where R: Record + Send + Sync, @@ -220,11 +204,7 @@ where /// according to the configuration of [`DbOption`]. /// /// For more configurable options, please refer to [`DbOption`]. - pub async fn new( - option: DbOption, - executor: E, - schema: R::Schema, - ) -> Result> { + pub async fn new(option: DbOption, executor: E, schema: R::Schema) -> Result> { Self::build( Arc::new(option), executor, @@ -242,7 +222,7 @@ where E: Executor + Send + Sync + 'static, { async fn build( - option: Arc>, + option: Arc, executor: E, schema: R::Schema, lru_cache: ParquetLru, @@ -266,7 +246,7 @@ where } let (task_tx, task_rx) = bounded(1); - let (mut cleaner, clean_sender) = Cleaner::::new(option.clone(), manager.clone()); + let (mut cleaner, clean_sender) = Cleaner::new(option.clone(), manager.clone()); let version_set = VersionSet::new(clean_sender, option.clone(), manager.clone()).await?; let schema = Arc::new(RwLock::new( @@ -505,7 +485,7 @@ where R: Record + Send, { async fn new( - option: Arc>, + option: Arc, compaction_tx: Sender, version_set: &VersionSet, record_schema: Arc, @@ -683,10 +663,6 @@ where self.mutable.flush_wal().await?; Ok(()) } - - pub(crate) fn record_schema(&self) -> &Arc { - &self.record_schema - } } /// scan configuration intermediate structure @@ -931,15 +907,14 @@ pub(crate) mod tests { use arrow::{ array::{Array, AsArray, RecordBatch}, - datatypes::{DataType, Field, Schema, UInt32Type}, + datatypes::{Schema, UInt32Type}, }; use async_lock::RwLock; use flume::{bounded, Receiver}; use fusio::{disk::TokioFs, path::Path, DynFs, SeqRead, Write}; use fusio_dispatch::FsOptions; use futures::StreamExt; - use once_cell::sync::Lazy; - use parquet::{arrow::ProjectionMask, format::SortingColumn, schema::types::ColumnPath}; + use parquet::arrow::ProjectionMask; use parquet_lru::NoCache; use tempfile::TempDir; use tracing::error; @@ -947,15 +922,12 @@ pub(crate) mod tests { use crate::{ compaction::{CompactTask, CompactionError, Compactor}, executor::{tokio::TokioExecutor, Executor}, - fs::{generate_file_id, manager::StoreManager, FileId}, - inmem::{ - immutable::tests::{TestImmutableArrays, TestSchema}, - mutable::Mutable, - }, + fs::{generate_file_id, manager::StoreManager}, + inmem::{immutable::tests::TestSchema, mutable::Mutable}, record::{ internal::InternalRecordRef, runtime::test::{test_dyn_item_schema, test_dyn_items}, - Datatype, DynRecord, DynSchema, Key, RecordDecodeError, RecordEncodeError, RecordRef, + Datatype, DynRecord, Key, RecordDecodeError, RecordEncodeError, RecordRef, Schema as RecordSchema, Value, }, serdes::{Decode, Encode}, @@ -1150,7 +1122,7 @@ pub(crate) mod tests { } pub(crate) async fn get_test_record_batch( - option: DbOption, + option: DbOption, executor: E, ) -> RecordBatch { let db: DB = DB::new(option.clone(), executor, TestSchema {}) @@ -1198,7 +1170,7 @@ pub(crate) mod tests { } pub(crate) async fn build_schema( - option: Arc>, + option: Arc, fs: &Arc, ) -> Result<(crate::Schema, Receiver), fusio::Error> { let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); @@ -1307,7 +1279,7 @@ pub(crate) mod tests { } pub(crate) async fn build_db( - option: Arc>, + option: Arc, compaction_rx: Receiver, executor: E, schema: crate::Schema, @@ -1329,7 +1301,7 @@ pub(crate) mod tests { let schema = Arc::new(RwLock::new(schema)); - let (mut cleaner, clean_sender) = Cleaner::::new(option.clone(), manager.clone()); + let (mut cleaner, clean_sender) = Cleaner::new(option.clone(), manager.clone()); let version_set = build_version_set(version, clean_sender, option.clone(), manager.clone()).await?; let mut compactor = Compactor::::new( @@ -1595,7 +1567,7 @@ pub(crate) mod tests { let path = Path::from_filesystem_path(temp_dir.path()).unwrap(); let path_l0 = Path::from_filesystem_path(temp_dir_l0.path()).unwrap(); - let mut option = DbOption::from((path, &TestSchema)) + let mut option = DbOption::new(path, &TestSchema) .level_path(0, path_l0, FsOptions::Local) .unwrap(); option.immutable_chunk_num = 1; @@ -1635,10 +1607,10 @@ pub(crate) mod tests { async fn test_flush() { let temp_dir = TempDir::new().unwrap(); - let mut option = DbOption::from(( + let mut option = DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), &TestSchema, - )); + ); option.immutable_chunk_num = 1; option.immutable_chunk_max_num = 1; option.major_threshold_with_sst_size = 3; @@ -1672,10 +1644,10 @@ pub(crate) mod tests { let temp_dir = TempDir::new().unwrap(); let fs = Arc::new(TokioFs) as Arc; - let option = Arc::new(DbOption::from(( + let option = Arc::new(DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), &TestSchema, - ))); + )); fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); let (task_tx, _task_rx) = bounded(1); @@ -1738,10 +1710,9 @@ pub(crate) mod tests { let manager = StoreManager::new(FsOptions::Local, vec![]).unwrap(); let dyn_schema = Arc::new(test_dyn_item_schema()); - let option = Arc::new(DbOption::with_path( + let option = Arc::new(DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), - "id".to_owned(), - dyn_schema.primary_key_index(), + dyn_schema.as_ref(), )); manager .base_fs() @@ -1777,14 +1748,13 @@ pub(crate) mod tests { schema.flush_wal().await.unwrap(); drop(schema); - let option = DbOption::with_path( + let option = DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), - "id".to_owned(), - dyn_schema.primary_key_index(), + dyn_schema.as_ref(), ); let dyn_schema = test_dyn_item_schema(); let db: DB = - DB::with_schema(option, TokioExecutor::current(), dyn_schema) + DB::new(option, TokioExecutor::current(), dyn_schema) .await .unwrap(); @@ -1817,10 +1787,10 @@ pub(crate) mod tests { async fn test_get_removed() { let temp_dir = TempDir::new().unwrap(); - let mut option = DbOption::from(( + let mut option = DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), &TestSchema, - )); + ); option.immutable_chunk_num = 1; option.immutable_chunk_max_num = 1; option.major_threshold_with_sst_size = 3; @@ -1857,10 +1827,9 @@ pub(crate) mod tests { let temp_dir = TempDir::new().unwrap(); let dyn_schema = test_dyn_item_schema(); - let mut option = DbOption::with_path( + let mut option = DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), - "id".to_string(), - dyn_schema.primary_key_index(), + &dyn_schema, ); option.immutable_chunk_num = 1; option.immutable_chunk_max_num = 1; @@ -1871,7 +1840,7 @@ pub(crate) mod tests { option.trigger_type = TriggerType::Length(5); let db: DB = - DB::with_schema(option, TokioExecutor::current(), dyn_schema) + DB::new(option, TokioExecutor::current(), dyn_schema) .await .unwrap(); @@ -2065,10 +2034,9 @@ pub(crate) mod tests { let temp_dir1 = TempDir::with_prefix("db1").unwrap(); let dyn_schema = test_dyn_item_schema(); - let mut option = DbOption::with_path( + let mut option = DbOption::new( Path::from_filesystem_path(temp_dir1.path()).unwrap(), - "id".to_string(), - dyn_schema.primary_key_index(), + &dyn_schema, ); option.immutable_chunk_num = 1; option.immutable_chunk_max_num = 1; @@ -2077,10 +2045,9 @@ pub(crate) mod tests { option.trigger_type = TriggerType::Length(5); let temp_dir2 = TempDir::with_prefix("db2").unwrap(); - let mut option2 = DbOption::with_path( + let mut option2 = DbOption::new( Path::from_filesystem_path(temp_dir2.path()).unwrap(), - "id".to_string(), - dyn_schema.primary_key_index(), + &dyn_schema, ); option2.immutable_chunk_num = 1; option2.immutable_chunk_max_num = 1; @@ -2089,10 +2056,9 @@ pub(crate) mod tests { option2.trigger_type = TriggerType::Length(5); let temp_dir3 = TempDir::with_prefix("db3").unwrap(); - let mut option3 = DbOption::with_path( + let mut option3 = DbOption::new( Path::from_filesystem_path(temp_dir3.path()).unwrap(), - "id".to_string(), - dyn_schema.primary_key_index(), + &dyn_schema, ); option3.immutable_chunk_num = 1; option3.immutable_chunk_max_num = 1; @@ -2101,15 +2067,15 @@ pub(crate) mod tests { option3.trigger_type = TriggerType::Length(5); let db1: DB = - DB::with_schema(option, TokioExecutor::current(), test_dyn_item_schema()) + DB::new(option, TokioExecutor::current(), test_dyn_item_schema()) .await .unwrap(); let db2: DB = - DB::with_schema(option2, TokioExecutor::current(), test_dyn_item_schema()) + DB::new(option2, TokioExecutor::current(), test_dyn_item_schema()) .await .unwrap(); let db3: DB = - DB::with_schema(option3, TokioExecutor::current(), test_dyn_item_schema()) + DB::new(option3, TokioExecutor::current(), test_dyn_item_schema()) .await .unwrap(); diff --git a/src/ondisk/sstable.rs b/src/ondisk/sstable.rs index 8d5501c..b92acd6 100644 --- a/src/ondisk/sstable.rs +++ b/src/ondisk/sstable.rs @@ -193,10 +193,10 @@ pub(crate) mod tests { let manager = StoreManager::new(FsOptions::Local, vec![]).unwrap(); let base_fs = manager.base_fs(); let record_batch = get_test_record_batch::( - DbOption::from(( + DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), &TestSchema, - )), + ), TokioExecutor::current(), ) .await; @@ -271,10 +271,10 @@ pub(crate) mod tests { let manager = StoreManager::new(FsOptions::Local, vec![]).unwrap(); let base_fs = manager.base_fs(); let record_batch = get_test_record_batch::( - DbOption::from(( + DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), &TestSchema, - )), + ), TokioExecutor::current(), ) .await; diff --git a/src/option.rs b/src/option.rs index 50d2394..3d69e76 100644 --- a/src/option.rs +++ b/src/option.rs @@ -1,30 +1,25 @@ -use std::{ - fmt::{Debug, Formatter}, - marker::PhantomData, -}; +use std::fmt::{Debug, Formatter}; use fusio::path::Path; use fusio_dispatch::FsOptions; use parquet::{ basic::Compression, file::properties::{EnabledStatistics, WriterProperties}, - format::SortingColumn, - schema::types::ColumnPath, }; +use thiserror::Error; use crate::{ fs::{FileId, FileType}, record::{Record, Schema}, trigger::TriggerType, version::{Version, MAX_LEVEL}, - DbError, }; const DEFAULT_WAL_BUFFER_SIZE: usize = 4 * 1024; /// configure the operating parameters of each component in the [`DB`](crate::DB) #[derive(Clone)] -pub struct DbOption { +pub struct DbOption { pub(crate) clean_channel_buffer: usize, pub(crate) base_path: Path, pub(crate) base_fs: FsOptions, @@ -41,17 +36,12 @@ pub struct DbOption { pub(crate) use_wal: bool, pub(crate) wal_buffer_size: usize, pub(crate) write_parquet_properties: WriterProperties, - _p: PhantomData, } -impl DbOption -where - R: Record, -{ +impl DbOption { /// build the default configured [`DbOption`] with base path and primary key - pub fn with_path(base_path: Path, primary_key_name: String, primary_key_index: usize) -> Self { - let (column_paths, sorting_columns) = - Self::primary_key_path(primary_key_name, primary_key_index); + pub fn new(base_path: Path, schema: &S) -> Self { + let (column_paths, sorting_columns) = schema.primary_key_path(); DbOption { immutable_chunk_num: 3, @@ -74,67 +64,14 @@ where major_default_oldest_table_num: 3, major_l_selection_table_max_num: 4, trigger_type: TriggerType::SizeOfMem(64 * 1024 * 1024), - _p: Default::default(), version_log_snapshot_threshold: 200, level_paths: vec![None; MAX_LEVEL], base_fs: FsOptions::Local, } } - - fn primary_key_path( - primary_key_name: String, - primary_key_index: usize, - ) -> (ColumnPath, Vec) { - ( - ColumnPath::new(vec!["_ts".to_string(), primary_key_name]), - vec![ - SortingColumn::new(1_i32, true, true), - SortingColumn::new(primary_key_index as i32, false, true), - ], - ) - } } -impl From<(Path, &R::Schema)> for DbOption -where - R: Record, -{ - /// build the default configured [`DbOption`] based on the passed path - fn from((base_path, schema): (Path, &R::Schema)) -> Self { - let (column_paths, sorting_columns) = schema.primary_key_path(); - DbOption { - immutable_chunk_num: 3, - immutable_chunk_max_num: 5, - major_threshold_with_sst_size: 4, - level_sst_magnification: 10, - max_sst_file_size: 256 * 1024 * 1024, - clean_channel_buffer: 10, - base_path, - base_fs: FsOptions::Local, - write_parquet_properties: WriterProperties::builder() - .set_compression(Compression::LZ4) - .set_column_statistics_enabled(column_paths.clone(), EnabledStatistics::Page) - .set_column_bloom_filter_enabled(column_paths.clone(), true) - .set_sorting_columns(Some(sorting_columns)) - .set_created_by(concat!("tonbo version ", env!("CARGO_PKG_VERSION")).to_owned()) - .build(), - - use_wal: true, - wal_buffer_size: DEFAULT_WAL_BUFFER_SIZE, - major_default_oldest_table_num: 3, - major_l_selection_table_max_num: 4, - trigger_type: TriggerType::SizeOfMem(64 * 1024 * 1024), - _p: Default::default(), - version_log_snapshot_threshold: 200, - level_paths: vec![None; MAX_LEVEL], - } - } -} - -impl DbOption -where - R: Record, -{ +impl DbOption { /// build the [`DB`](crate::DB) storage directory based on the passed path pub fn path(self, path: impl Into) -> Self { DbOption { @@ -232,9 +169,9 @@ where level: usize, path: Path, fs_options: FsOptions, - ) -> Result> { + ) -> Result { if level >= MAX_LEVEL { - Err(DbError::ExceedsMaxLevel)?; + return Err(ExceedsMaxLevel); } self.level_paths[level] = Some((path, fs_options)); Ok(self) @@ -246,10 +183,11 @@ where } } -impl DbOption -where - R: Record, -{ +#[derive(Debug, Error)] +#[error("exceeds max level, max level is {}", MAX_LEVEL)] +pub struct ExceedsMaxLevel; + +impl DbOption { pub(crate) fn table_path(&self, gen: FileId, level: usize) -> Path { self.level_paths[level] .as_ref() @@ -280,13 +218,17 @@ where self.level_paths[level].as_ref().map(|(path, _)| path) } - pub(crate) fn is_threshold_exceeded_major(&self, version: &Version, level: usize) -> bool { + pub(crate) fn is_threshold_exceeded_major( + &self, + version: &Version, + level: usize, + ) -> bool { Version::::tables_len(version, level) >= (self.major_threshold_with_sst_size * self.level_sst_magnification.pow(level as u32)) } } -impl Debug for DbOption { +impl Debug for DbOption { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("DbOption") .field("clean_channel_buffer", &self.clean_channel_buffer) diff --git a/src/record/runtime/record.rs b/src/record/runtime/record.rs index 6b92161..5d6df68 100644 --- a/src/record/runtime/record.rs +++ b/src/record/runtime/record.rs @@ -1,8 +1,8 @@ -use std::{any::Any, sync::Arc}; +use std::sync::Arc; use fusio::SeqRead; -use super::{Datatype, DynRecordRef, Value, ValueDesc}; +use super::{Datatype, DynRecordRef, Value}; use crate::{ record::{DynSchema, Record, RecordDecodeError}, serdes::{Decode, Encode}, @@ -24,68 +24,6 @@ impl DynRecord { } } -impl DynRecord { - pub(crate) fn empty_record(column_descs: Vec, primary_index: usize) -> DynRecord { - let mut columns = vec![]; - for desc in column_descs.iter() { - let value: Arc = match desc.datatype { - Datatype::UInt8 => match desc.is_nullable { - true => Arc::>::new(None), - false => Arc::new(u8::default()), - }, - Datatype::UInt16 => match desc.is_nullable { - true => Arc::>::new(None), - false => Arc::new(u16::default()), - }, - Datatype::UInt32 => match desc.is_nullable { - true => Arc::>::new(None), - false => Arc::new(u32::default()), - }, - Datatype::UInt64 => match desc.is_nullable { - true => Arc::>::new(None), - false => Arc::new(u64::default()), - }, - Datatype::Int8 => match desc.is_nullable { - true => Arc::>::new(None), - false => Arc::new(i8::default()), - }, - Datatype::Int16 => match desc.is_nullable { - true => Arc::>::new(None), - false => Arc::new(i16::default()), - }, - Datatype::Int32 => match desc.is_nullable { - true => Arc::>::new(None), - false => Arc::new(i32::default()), - }, - Datatype::Int64 => match desc.is_nullable { - true => Arc::>::new(None), - false => Arc::new(i64::default()), - }, - Datatype::String => match desc.is_nullable { - true => Arc::>::new(None), - false => Arc::new(String::default()), - }, - Datatype::Boolean => match desc.is_nullable { - true => Arc::>::new(None), - false => Arc::new(bool::default()), - }, - Datatype::Bytes => match desc.is_nullable { - true => Arc::>>::new(None), - false => Arc::new(Vec::::default()), - }, - }; - columns.push(Value::new( - desc.datatype, - desc.name.to_owned(), - value, - desc.is_nullable, - )); - } - - DynRecord::new(columns, primary_index) - } -} - impl Decode for DynRecord { type Error = RecordDecodeError; diff --git a/src/snapshot.rs b/src/snapshot.rs index 24ff2a0..c90f06e 100644 --- a/src/snapshot.rs +++ b/src/snapshot.rs @@ -143,10 +143,10 @@ mod tests { async fn snapshot_scan() { let temp_dir = TempDir::new().unwrap(); let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); - let option = Arc::new(DbOption::from(( + let option = Arc::new(DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), &TestSchema, - ))); + )); manager .base_fs() diff --git a/src/stream/level.rs b/src/stream/level.rs index 1fb580d..24f1c90 100644 --- a/src/stream/level.rs +++ b/src/stream/level.rs @@ -51,7 +51,7 @@ where upper: Bound<&'level ::Key>, ts: Timestamp, level: usize, - option: Arc>, + option: Arc, gens: VecDeque, limit: Option, projection_mask: ProjectionMask, @@ -229,23 +229,18 @@ mod tests { use tempfile::TempDir; use crate::{ - compaction::tests::build_version, - fs::manager::StoreManager, - inmem::immutable::tests::TestSchema, - record::{Record, Schema}, - stream::level::LevelStream, - tests::Test, - DbOption, + compaction::tests::build_version, fs::manager::StoreManager, + inmem::immutable::tests::TestSchema, record::Schema, stream::level::LevelStream, DbOption, }; #[tokio::test] async fn projection_scan() { let temp_dir = TempDir::new().unwrap(); let manager = StoreManager::new(FsOptions::Local, vec![]).unwrap(); - let option = Arc::new(DbOption::from(( + let option = Arc::new(DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), &TestSchema {}, - ))); + )); manager .base_fs() diff --git a/src/stream/mem_projection.rs b/src/stream/mem_projection.rs index 334e356..c22de63 100644 --- a/src/stream/mem_projection.rs +++ b/src/stream/mem_projection.rs @@ -64,7 +64,7 @@ mod tests { use crate::{ inmem::{immutable::tests::TestSchema, mutable::Mutable}, - record::{Record, Schema}, + record::Schema, stream::mem_projection::MemProjectionStream, tests::Test, trigger::TriggerFactory, @@ -76,10 +76,10 @@ mod tests { async fn merge_mutable() { let temp_dir = tempfile::tempdir().unwrap(); let fs = Arc::new(TokioFs) as Arc; - let option = DbOption::from(( + let option = DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), &TestSchema, - )); + ); fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); diff --git a/src/stream/merge.rs b/src/stream/merge.rs index 42ea60d..d4b3f5a 100644 --- a/src/stream/merge.rs +++ b/src/stream/merge.rs @@ -163,22 +163,18 @@ mod tests { use super::MergeStream; use crate::{ - inmem::{immutable::tests::TestSchema, mutable::Mutable}, - record::test::StringSchema, - stream::Entry, - trigger::TriggerFactory, - wal::log::LogType, - DbOption, + inmem::mutable::Mutable, record::test::StringSchema, stream::Entry, + trigger::TriggerFactory, wal::log::LogType, DbOption, }; #[tokio::test] async fn merge_mutable() { let temp_dir = tempfile::tempdir().unwrap(); let fs = Arc::new(TokioFs) as Arc; - let option = DbOption::from(( + let option = DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), &StringSchema, - )); + ); fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); @@ -278,10 +274,10 @@ mod tests { async fn merge_mutable_remove_duplicates() { let temp_dir = tempfile::tempdir().unwrap(); let fs = Arc::new(TokioFs) as Arc; - let option = DbOption::from(( + let option = DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), &StringSchema, - )); + ); fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); @@ -369,10 +365,10 @@ mod tests { async fn merge_mutable_limit() { let temp_dir = tempfile::tempdir().unwrap(); let fs = Arc::new(TokioFs) as Arc; - let option = DbOption::from(( + let option = DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), &StringSchema, - )); + ); fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); diff --git a/src/stream/package.rs b/src/stream/package.rs index 2dec0e0..39ba16f 100644 --- a/src/stream/package.rs +++ b/src/stream/package.rs @@ -96,7 +96,7 @@ mod tests { }, mutable::Mutable, }, - record::{Record, Schema}, + record::Schema, stream::{merge::MergeStream, package::PackageStream}, tests::Test, trigger::TriggerFactory, @@ -108,10 +108,10 @@ mod tests { async fn iter() { let temp_dir = TempDir::new().unwrap(); let fs = Arc::new(TokioFs) as Arc; - let option = DbOption::from(( + let option = DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), &TestSchema, - )); + ); fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); diff --git a/src/transaction.rs b/src/transaction.rs index d6d219a..59257ed 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -258,7 +258,6 @@ mod tests { record::{ runtime::{test::test_dyn_item_schema, Datatype, DynRecord, Value}, test::StringSchema, - ValueDesc, }, tests::{build_db, build_schema, Test}, transaction::CommitError, @@ -271,10 +270,10 @@ mod tests { let temp_dir = TempDir::new().unwrap(); let db = DB::::new( - DbOption::from(( + DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), &StringSchema, - )), + ), TokioExecutor::current(), StringSchema, ) @@ -310,10 +309,10 @@ mod tests { async fn transaction_get() { let temp_dir = TempDir::new().unwrap(); let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); - let option = Arc::new(DbOption::from(( + let option = Arc::new(DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), &TestSchema, - ))); + )); manager .base_fs() @@ -402,10 +401,10 @@ mod tests { #[tokio::test] async fn write_conflicts() { let temp_dir = TempDir::new().unwrap(); - let option = DbOption::from(( + let option = DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), &StringSchema, - )); + ); let db = DB::::new(option, TokioExecutor::current(), StringSchema) .await @@ -438,10 +437,10 @@ mod tests { #[tokio::test] async fn transaction_projection() { let temp_dir = TempDir::new().unwrap(); - let option = DbOption::from(( + let option = DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), &TestSchema, - )); + ); let db = DB::::new(option, TokioExecutor::current(), TestSchema) .await @@ -479,10 +478,10 @@ mod tests { async fn transaction_scan() { let temp_dir = TempDir::new().unwrap(); let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); - let option = Arc::new(DbOption::from(( + let option = Arc::new(DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), &TestSchema, - ))); + )); manager .base_fs() @@ -576,10 +575,10 @@ mod tests { async fn test_transaction_scan_bound() { let temp_dir = TempDir::new().unwrap(); let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); - let option = Arc::new(DbOption::from(( + let option = Arc::new(DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), &TestSchema, - ))); + )); manager .base_fs() @@ -754,10 +753,10 @@ mod tests { async fn test_transaction_scan_limit() { let temp_dir = TempDir::new().unwrap(); let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); - let option = Arc::new(DbOption::from(( + let option = Arc::new(DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), &TestSchema, - ))); + )); manager .base_fs() @@ -817,19 +816,13 @@ mod tests { #[tokio::test] async fn test_dyn_record() { - let descs = vec![ - ValueDesc::new("age".to_string(), Datatype::Int8, false), - ValueDesc::new("height".to_string(), Datatype::Int16, true), - ValueDesc::new("weight".to_string(), Datatype::Int32, false), - ]; - let temp_dir = TempDir::new().unwrap(); - let option = DbOption::with_path( + let schema = test_dyn_item_schema(); + let option = DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), - "age".to_string(), - 0, + &schema, ); - let db = DB::with_schema(option, TokioExecutor::current(), test_dyn_item_schema()) + let db = DB::new(option, TokioExecutor::current(), schema) .await .unwrap(); diff --git a/src/version/cleaner.rs b/src/version/cleaner.rs index 118f0e5..9b1e9c6 100644 --- a/src/version/cleaner.rs +++ b/src/version/cleaner.rs @@ -4,9 +4,8 @@ use flume::{Receiver, Sender}; use crate::{ fs::{manager::StoreManager, FileId}, - record::Record, timestamp::Timestamp, - DbError, DbOption, + DbOption, }; pub enum CleanTag { @@ -23,22 +22,16 @@ pub enum CleanTag { }, } -pub(crate) struct Cleaner -where - R: Record, -{ +pub(crate) struct Cleaner { tag_recv: Receiver, gens_map: BTreeMap, bool)>, - option: Arc>, + option: Arc, manager: Arc, } -impl Cleaner -where - R: Record, -{ +impl Cleaner { pub(crate) fn new( - option: Arc>, + option: Arc, manager: Arc, ) -> (Self, Sender) { let (tag_send, tag_recv) = flume::bounded(option.clean_channel_buffer); @@ -54,7 +47,7 @@ where ) } - pub(crate) async fn listen(&mut self) -> Result<(), DbError> { + pub(crate) async fn listen(&mut self) -> Result<(), fusio::Error> { while let Ok(tag) = self.tag_recv.recv_async().await { match tag { CleanTag::Add { ts, gens } => { @@ -106,9 +99,8 @@ pub(crate) mod tests { use crate::{ executor::{tokio::TokioExecutor, Executor}, - fs::{generate_file_id, manager::StoreManager, FileId, FileType}, + fs::{generate_file_id, manager::StoreManager, FileType}, inmem::immutable::tests::TestSchema, - tests::Test, version::cleaner::{CleanTag, Cleaner}, DbOption, }; @@ -117,10 +109,10 @@ pub(crate) mod tests { async fn test_cleaner() { let temp_dir = TempDir::new().unwrap(); let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); - let option = Arc::new(DbOption::from(( + let option = Arc::new(DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), &TestSchema, - ))); + )); let gen_0 = generate_file_id(); let gen_1 = generate_file_id(); @@ -157,7 +149,7 @@ pub(crate) mod tests { .unwrap(); } - let (mut cleaner, tx) = Cleaner::::new(option.clone(), manager.clone()); + let (mut cleaner, tx) = Cleaner::new(option.clone(), manager.clone()); let executor = TokioExecutor::current(); diff --git a/src/version/mod.rs b/src/version/mod.rs index 28d77cf..4947803 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -46,7 +46,7 @@ where ts: Timestamp, pub(crate) level_slice: [Vec::Key>>; MAX_LEVEL], clean_sender: Sender, - option: Arc>, + option: Arc, timestamp: Arc, log_length: u32, } @@ -58,7 +58,7 @@ where #[cfg(test)] #[allow(unused)] pub(crate) fn new( - option: Arc>, + option: Arc, clean_sender: Sender, timestamp: Arc, ) -> Self { @@ -72,7 +72,7 @@ where } } - pub(crate) fn option(&self) -> &Arc> { + pub(crate) fn option(&self) -> &Arc { &self.option } } diff --git a/src/version/set.rs b/src/version/set.rs index 72943aa..b7e9b25 100644 --- a/src/version/set.rs +++ b/src/version/set.rs @@ -60,7 +60,7 @@ where inner: Arc>>, clean_sender: Sender, timestamp: Arc, - option: Arc>, + option: Arc, manager: Arc, } @@ -98,7 +98,7 @@ where { pub(crate) async fn new( clean_sender: Sender, - option: Arc>, + option: Arc, manager: Arc, ) -> Result> { let fs = manager.base_fs(); @@ -311,7 +311,7 @@ pub(crate) mod tests { pub(crate) async fn build_version_set( version: Version, clean_sender: Sender, - option: Arc>, + option: Arc, manager: Arc, ) -> Result, VersionError> where @@ -344,10 +344,10 @@ pub(crate) mod tests { let temp_dir = TempDir::new().unwrap(); let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); let (sender, _) = bounded(1); - let option = Arc::new(DbOption::from(( + let option = Arc::new(DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), &StringSchema, - ))); + )); manager .base_fs() .create_dir_all(&option.version_log_dir_path()) @@ -382,10 +382,10 @@ pub(crate) mod tests { let temp_dir = TempDir::new().unwrap(); let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); let (sender, _) = bounded(1); - let mut option = DbOption::from(( + let mut option = DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), &StringSchema, - )); + ); option.version_log_snapshot_threshold = 4; let option = Arc::new(option); @@ -512,10 +512,10 @@ pub(crate) mod tests { async fn version_level_sort() { let temp_dir = TempDir::new().unwrap(); let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); - let option = Arc::new(DbOption::from(( + let option = Arc::new(DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), &StringSchema, - ))); + )); let (sender, _) = bounded(1); manager diff --git a/tests/data_integrity.rs b/tests/data_integrity.rs index f109dd4..1b3e626 100644 --- a/tests/data_integrity.rs +++ b/tests/data_integrity.rs @@ -70,14 +70,15 @@ mod tests { let mut write_hasher = crc32fast::Hasher::new(); let temp_dir = TempDir::new().unwrap(); - let option = DbOption::from(( + let option = DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), &CustomerSchema, - )); + ); - let db: DB = DB::new(option, TokioExecutor::current(), CustomerSchema) - .await - .unwrap(); + let db: DB = + DB::new(option, TokioExecutor::current(), CustomerSchema) + .await + .unwrap(); for _ in 0..WRITE_TIMES { let customer = gen_record(&mut rng, &mut primary_key_count); diff --git a/tests/macros_correctness.rs b/tests/macros_correctness.rs index 42b408f..a536343 100644 --- a/tests/macros_correctness.rs +++ b/tests/macros_correctness.rs @@ -1,4 +1,3 @@ -use tonbo::record::Schema; use tonbo_macros::Record; #[derive(Record, Debug, PartialEq)] diff --git a/tests/wasm.rs b/tests/wasm.rs index b2cbb8d..b931765 100644 --- a/tests/wasm.rs +++ b/tests/wasm.rs @@ -73,15 +73,10 @@ mod tests { let fs = fusio::disk::LocalFs {}; fs.create_dir_all(&path).await.unwrap(); - let option = DbOption::with_path( - Path::from_opfs_path("opfs_dir_rw").unwrap(), - "id".to_string(), - 0, - ); + let option = DbOption::new(Path::from_opfs_path("opfs_dir_rw").unwrap(), &schema); - let db: DB = DB::with_schema(option, OpfsExecutor::new(), schema) - .await - .unwrap(); + let db: DB = + DB::new(option, OpfsExecutor::new(), schema).await.unwrap(); for item in test_dyn_items().into_iter() { db.insert(item).await.unwrap(); @@ -158,15 +153,10 @@ mod tests { let path = Path::from_opfs_path("opfs_dir_txn").unwrap(); fs.create_dir_all(&path).await.unwrap(); - let option = DbOption::with_path( - Path::from_opfs_path("opfs_dir_txn").unwrap(), - "id".to_string(), - 0, - ); + let option = DbOption::new(Path::from_opfs_path("opfs_dir_txn").unwrap(), &schema); - let db: DB = DB::with_schema(option, OpfsExecutor::new(), schema) - .await - .unwrap(); + let db: DB = + DB::new(option, OpfsExecutor::new(), schema).await.unwrap(); { let mut txn = db.transaction().await; @@ -241,17 +231,11 @@ mod tests { let fs = fusio::disk::LocalFs {}; fs.create_dir_all(&path).await.unwrap(); - let option = DbOption::with_path( - Path::from_opfs_path("opfs_dir").unwrap(), - "id".to_string(), - 0, - ); + let option = DbOption::new(Path::from_opfs_path("opfs_dir").unwrap(), &schema); { let db: DB = - DB::with_schema(option, OpfsExecutor::new(), schema) - .await - .unwrap(); + DB::new(option, OpfsExecutor::new(), schema).await.unwrap(); for item in test_dyn_items().into_iter() { db.insert(item).await.unwrap(); @@ -261,14 +245,9 @@ mod tests { } let schema = test_dyn_item_schema(); - let option = DbOption::with_path( - Path::from_opfs_path("opfs_dir").unwrap(), - "id".to_string(), - 0, - ); - let db: DB = DB::with_schema(option, OpfsExecutor::new(), schema) - .await - .unwrap(); + let option = DbOption::new(Path::from_opfs_path("opfs_dir").unwrap(), &schema); + let db: DB = + DB::new(option, OpfsExecutor::new(), schema).await.unwrap(); let mut sort_items = BTreeMap::new(); for item in test_dyn_items() { @@ -311,7 +290,7 @@ mod tests { let key_id = option_env!("AWS_ACCESS_KEY_ID").unwrap().to_string(); let secret_key = option_env!("AWS_SECRET_ACCESS_KEY").unwrap().to_string(); - let (cols_desc, primary_key_index) = test_dyn_item_schema(); + let schema = test_dyn_item_schema(); let fs_option = FsOptions::S3 { bucket: "wasm-data".to_string(), @@ -326,33 +305,27 @@ mod tests { region: Some("ap-southeast-2".to_string()), }; - let option = DbOption::with_path( - Path::from_opfs_path("s3_rw").unwrap(), - "id".to_string(), - primary_key_index, - ) - .level_path( - 0, - Path::from_url_path("tonbo/l0").unwrap(), - fs_option.clone(), - ) - .unwrap() - .level_path( - 1, - Path::from_url_path("tonbo/l1").unwrap(), - fs_option.clone(), - ) - .unwrap() - .level_path(2, Path::from_url_path("tonbo/l2").unwrap(), fs_option) - .unwrap() - .major_threshold_with_sst_size(3) - .level_sst_magnification(1) - .max_sst_file_size(1 * 1024); + let option = DbOption::new(Path::from_opfs_path("s3_rw").unwrap(), &schema) + .level_path( + 0, + Path::from_url_path("tonbo/l0").unwrap(), + fs_option.clone(), + ) + .unwrap() + .level_path( + 1, + Path::from_url_path("tonbo/l1").unwrap(), + fs_option.clone(), + ) + .unwrap() + .level_path(2, Path::from_url_path("tonbo/l2").unwrap(), fs_option) + .unwrap() + .major_threshold_with_sst_size(3) + .level_sst_magnification(1) + .max_sst_file_size(1 * 1024); let db: DB = - DB::with_schema(option, OpfsExecutor::new(), cols_desc, primary_key_index) - .await - .unwrap(); + DB::new(option, OpfsExecutor::new(), schema).await.unwrap(); for (i, item) in test_dyn_items().into_iter().enumerate() { db.insert(item).await.unwrap();