Skip to content

Commit

Permalink
refactor: remove R type arg on DbOption
Browse files Browse the repository at this point in the history
  • Loading branch information
ethe committed Dec 16, 2024
1 parent 14d6e5f commit fdbc3e5
Show file tree
Hide file tree
Showing 27 changed files with 247 additions and 466 deletions.
4 changes: 2 additions & 2 deletions benches/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ impl BenchDatabase for TonboS3BenchDataBase {
.disable_wal();

TonboS3BenchDataBase::new(
tonbo::DB::new(option, TokioExecutor::current())
tonbo::DB::new(option, TokioExecutor::current(), &CustomerSchema)
.await
.unwrap(),
)
Expand Down Expand Up @@ -324,7 +324,7 @@ impl BenchDatabase for TonboBenchDataBase {
DbOption::from(fusio::path::Path::from_filesystem_path(path.as_ref()).unwrap())
.disable_wal();

let db = tonbo::DB::new(option, TokioExecutor::current())
let db = tonbo::DB::new(option, TokioExecutor::current(), &CustomerSchema)
.await
.unwrap();
TonboBenchDataBase::new(db)
Expand Down
17 changes: 4 additions & 13 deletions bindings/python/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ use pyo3::{
use pyo3_asyncio::tokio::{future_into_py, get_runtime};
use tonbo::{
executor::tokio::TokioExecutor,
record::DynRecord,
record::{DynRecord, DynSchema, Value, ValueDesc},
DB,
};
use tonbo::record::{DynSchema, Value, ValueDesc};

use crate::{
column::Column,
error::{CommitError, DbError},
Expand Down Expand Up @@ -40,7 +40,6 @@ impl TonboDB {
let mut desc = vec![];
let mut cols = vec![];
let mut primary_key_index = None;
let mut primary_key_name = None;

for i in 0..values.len()? {
let value = values.get_item(i)?;
Expand All @@ -51,23 +50,15 @@ impl TonboDB {
panic!("Multiple primary keys is not allowed!")
}
primary_key_index = Some(desc.len());
primary_key_name = Some(col.name.clone());
}
cols.push(col.clone());
desc.push(ValueDesc::from(col));
}
}
let schema = DynSchema::new(desc, primary_key_index.unwrap());
let option = option.into_option(primary_key_index.unwrap(), primary_key_name.unwrap());
let option = option.into_option(&schema);
let db = get_runtime()
.block_on(async {
DB::with_schema(
option,
TokioExecutor::current(),
schema,
)
.await
})
.block_on(async { DB::new(option, TokioExecutor::current(), schema).await })
.unwrap();
Ok(Self {
db: Arc::new(db),
Expand Down
27 changes: 11 additions & 16 deletions bindings/python/src/options.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use fusio::path::Path;
use pyo3::{pyclass, pymethods, PyResult};
use tonbo::record::DynRecord;
use tonbo::record::Schema;

use crate::{ExceedsMaxLevelError, FsOptions};

Expand Down Expand Up @@ -72,21 +72,16 @@ impl DbOption {
}

impl DbOption {
pub(crate) fn into_option(
self,
primary_key_index: usize,
primary_key_name: String,
) -> tonbo::DbOption<DynRecord> {
let mut opt =
tonbo::DbOption::with_path(Path::from(self.path), primary_key_name, primary_key_index)
.clean_channel_buffer(self.clean_channel_buffer)
.immutable_chunk_num(self.immutable_chunk_num)
.level_sst_magnification(self.level_sst_magnification)
.major_default_oldest_table_num(self.major_default_oldest_table_num)
.major_threshold_with_sst_size(self.major_threshold_with_sst_size)
.max_sst_file_size(self.max_sst_file_size)
.version_log_snapshot_threshold(self.version_log_snapshot_threshold)
.base_fs(fusio_dispatch::FsOptions::from(self.base_fs));
pub(crate) fn into_option<S: Schema>(self, schema: &S) -> tonbo::DbOption {
let mut opt = tonbo::DbOption::new(Path::from(self.path), schema)
.clean_channel_buffer(self.clean_channel_buffer)
.immutable_chunk_num(self.immutable_chunk_num)
.level_sst_magnification(self.level_sst_magnification)
.major_default_oldest_table_num(self.major_default_oldest_table_num)
.major_threshold_with_sst_size(self.major_threshold_with_sst_size)
.max_sst_file_size(self.max_sst_file_size)
.version_log_snapshot_threshold(self.version_log_snapshot_threshold)
.base_fs(fusio_dispatch::FsOptions::from(self.base_fs));
for (level, path) in self.level_paths.into_iter().enumerate() {
if let Some((path, fs_options)) = path {
opt = opt
Expand Down
1 change: 1 addition & 0 deletions bindings/python/src/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::ops;

use pyo3::{pyclass, FromPyObject, Py, PyAny, Python};
use tonbo::record::Value;

use crate::{utils::to_col, Column};

#[pyclass]
Expand Down
21 changes: 11 additions & 10 deletions bindings/python/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ use pyo3::{
Bound, IntoPy, Py, PyAny, PyResult, Python,
};
use pyo3_asyncio::tokio::future_into_py;
use tonbo::{record::DynRecord, transaction, Projection};
use tonbo::record::Value;
use tonbo::{
record::{DynRecord, Value},
transaction, Projection,
};

use crate::{
column::Column,
error::{repeated_commit_err, CommitError, DbError},
Expand Down Expand Up @@ -181,16 +184,14 @@ impl Transaction {
future_into_py(py, async move {
let mut scan = txn.scan((
unsafe {
transmute::<
std::ops::Bound<&Value>,
std::ops::Bound<&'static Value>,
>(lower.as_ref())
transmute::<std::ops::Bound<&Value>, std::ops::Bound<&'static Value>>(
lower.as_ref(),
)
},
unsafe {
transmute::<
std::ops::Bound<&Value>,
std::ops::Bound<&'static Value>,
>(high.as_ref())
transmute::<std::ops::Bound<&Value>, std::ops::Bound<&'static Value>>(
high.as_ref(),
)
},
));

Expand Down
11 changes: 2 additions & 9 deletions bindings/python/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@ use tonbo::record::{Datatype, Value};

use crate::{column::Column, datatype::DataType, range};

pub(crate) fn to_dict(
py: Python,
primary_key_index: usize,
record: Vec<Value>,
) -> Bound<PyDict> {
pub(crate) fn to_dict(py: Python, primary_key_index: usize, record: Vec<Value>) -> Bound<PyDict> {
let dict = PyDict::new_bound(py);
for (idx, col) in record.iter().enumerate() {
match &col.datatype {
Expand Down Expand Up @@ -199,10 +195,7 @@ pub(crate) fn to_bound(
col: &Column,
lower: Option<Py<range::Bound>>,
high: Option<Py<range::Bound>>,
) -> (
std::ops::Bound<Value>,
std::ops::Bound<Value>,
) {
) -> (std::ops::Bound<Value>, std::ops::Bound<Value>) {
let lower = match lower {
Some(bound) => bound.get().to_bound(py, col),
None => std::ops::Bound::Unbounded,
Expand Down
4 changes: 2 additions & 2 deletions examples/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,10 @@ async fn main() -> Result<()> {
// make sure the path exists
let _ = fs::create_dir_all("./db_path/music").await;

let options = DbOption::from((
let options = DbOption::new(
Path::from_filesystem_path("./db_path/music").unwrap(),
&MusicSchema,
));
);

let db = DB::new(options, TokioExecutor::current(), MusicSchema)
.await
Expand Down
4 changes: 2 additions & 2 deletions examples/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ async fn main() {
// make sure the path exists
let _ = fs::create_dir_all("./db_path/users").await;

let options = DbOption::from((
let options = DbOption::new(
Path::from_filesystem_path("./db_path/users").unwrap(),
&UserSchema,
));
);
// pluggable async runtime and I/O
let db = DB::new(options, TokioExecutor::current(), UserSchema)
.await
Expand Down
17 changes: 9 additions & 8 deletions examples/dynamic.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::fs;
use std::sync::Arc;
use std::{fs, sync::Arc};

use fusio::path::Path;
use tonbo::executor::tokio::TokioExecutor;
use tonbo::record::{Datatype, DynRecord, DynSchema, Value, ValueDesc};
use tonbo::{DbOption, DB};
use tonbo::{
executor::tokio::TokioExecutor,
record::{Datatype, DynRecord, DynSchema, Value, ValueDesc},
DbOption, DB,
};

#[tokio::main]
async fn main() {
Expand All @@ -18,11 +19,11 @@ async fn main() {
0,
);

let options = DbOption::from((
let options = DbOption::new(
Path::from_filesystem_path("./db_path/users").unwrap(),
&schema,
));
let db = DB::with_schema(options, TokioExecutor::current(), schema)
);
let db = DB::new(options, TokioExecutor::current(), schema)
.await
.unwrap();

Expand Down
48 changes: 24 additions & 24 deletions src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub(crate) struct Compactor<R>
where
R: Record,
{
pub(crate) option: Arc<DbOption<R>>,
pub(crate) option: Arc<DbOption>,
pub(crate) schema: Arc<RwLock<Schema<R>>>,
pub(crate) version_set: VersionSet<R>,
pub(crate) manager: Arc<StoreManager>,
Expand All @@ -49,7 +49,7 @@ where
pub(crate) fn new(
schema: Arc<RwLock<Schema<R>>>,
record_schema: Arc<R::Schema>,
option: Arc<DbOption<R>>,
option: Arc<DbOption>,
version_set: VersionSet<R>,
manager: Arc<StoreManager>,
) -> Self {
Expand Down Expand Up @@ -148,7 +148,7 @@ where
}

pub(crate) async fn minor_compaction(
option: &DbOption<R>,
option: &DbOption,
recover_wal_ids: Option<Vec<FileId>>,
batches: &[(
Option<FileId>,
Expand Down Expand Up @@ -211,7 +211,7 @@ where
#[allow(clippy::too_many_arguments)]
pub(crate) async fn major_compaction(
version: &Version<R>,
option: &DbOption<R>,
option: &DbOption,
mut min: &<R::Schema as RecordSchema>::Key,
mut max: &<R::Schema as RecordSchema>::Key,
version_edits: &mut Vec<VersionEdit<<R::Schema as RecordSchema>::Key>>,
Expand Down Expand Up @@ -419,7 +419,7 @@ where
}

async fn build_tables<'scan>(
option: &DbOption<R>,
option: &DbOption,
version_edits: &mut Vec<VersionEdit<<R::Schema as RecordSchema>::Key>>,
level: usize,
streams: Vec<ScanStream<'scan, R>>,
Expand Down Expand Up @@ -490,7 +490,7 @@ where

#[allow(clippy::too_many_arguments)]
async fn build_table(
option: &DbOption<R>,
option: &DbOption,
version_edits: &mut Vec<VersionEdit<<R::Schema as RecordSchema>::Key>>,
level: usize,
builder: &mut <<R::Schema as RecordSchema>::Columns as ArrowArrays>::Builder,
Expand Down Expand Up @@ -582,7 +582,7 @@ pub(crate) mod tests {
};

async fn build_immutable<R>(
option: &DbOption<R>,
option: &DbOption,
records: Vec<(LogType, R, Timestamp)>,
schema: &Arc<R::Schema>,
fs: &Arc<dyn DynFs>,
Expand All @@ -601,7 +601,7 @@ pub(crate) mod tests {
}

pub(crate) async fn build_parquet_table<R>(
option: &DbOption<R>,
option: &DbOption,
gen: FileId,
records: Vec<(LogType, R, Timestamp)>,
schema: &Arc<R::Schema>,
Expand Down Expand Up @@ -634,10 +634,10 @@ pub(crate) mod tests {
let temp_dir = tempfile::tempdir().unwrap();
let temp_dir_l0 = tempfile::tempdir().unwrap();

let option = DbOption::from((
let option = DbOption::new(
Path::from_filesystem_path(temp_dir.path()).unwrap(),
&TestSchema,
))
)
.level_path(
0,
Path::from_filesystem_path(temp_dir_l0.path()).unwrap(),
Expand Down Expand Up @@ -747,21 +747,21 @@ pub(crate) mod tests {
async fn dyn_minor_compaction() {
let temp_dir = tempfile::tempdir().unwrap();
let manager = StoreManager::new(FsOptions::Local, vec![]).unwrap();
let option = DbOption::with_path(
Path::from_filesystem_path(temp_dir.path()).unwrap(),
"id".to_string(),
let schema = DynSchema::new(
vec![ValueDesc::new("id".to_owned(), Datatype::Int32, false)],
0,
);
let option = DbOption::new(
Path::from_filesystem_path(temp_dir.path()).unwrap(),
&schema,
);
manager
.base_fs()
.create_dir_all(&option.wal_dir_path())
.await
.unwrap();

let instance = Arc::new(DynSchema::new(
vec![ValueDesc::new("id".to_owned(), Datatype::Int32, false)],
0,
));
let instance = Arc::new(schema);

let mut batch1_data = vec![];
let mut batch2_data = vec![];
Expand Down Expand Up @@ -818,10 +818,10 @@ pub(crate) mod tests {
let temp_dir_l0 = TempDir::new().unwrap();
let temp_dir_l1 = TempDir::new().unwrap();

let mut option = DbOption::from((
let mut option = DbOption::new(
Path::from_filesystem_path(temp_dir.path()).unwrap(),
&TestSchema,
))
)
.level_path(
0,
Path::from_filesystem_path(temp_dir_l0.path()).unwrap(),
Expand Down Expand Up @@ -900,7 +900,7 @@ pub(crate) mod tests {
}

pub(crate) async fn build_version(
option: &Arc<DbOption<Test>>,
option: &Arc<DbOption>,
manager: &StoreManager,
schema: &Arc<TestSchema>,
) -> ((FileId, FileId, FileId, FileId, FileId), Version<Test>) {
Expand Down Expand Up @@ -1162,10 +1162,10 @@ pub(crate) mod tests {
pub(crate) async fn major_panic() {
let temp_dir = TempDir::new().unwrap();

let mut option = DbOption::from((
let mut option = DbOption::new(
Path::from_filesystem_path(temp_dir.path()).unwrap(),
&TestSchema,
));
);
option.major_threshold_with_sst_size = 1;
option.level_sst_magnification = 1;
let manager =
Expand Down Expand Up @@ -1273,10 +1273,10 @@ pub(crate) mod tests {
async fn test_flush_major_level_sort() {
let temp_dir = TempDir::new().unwrap();

let mut option = DbOption::from((
let mut option = DbOption::new(
Path::from_filesystem_path(temp_dir.path()).unwrap(),
&TestSchema,
));
);
option.immutable_chunk_num = 1;
option.immutable_chunk_max_num = 0;
option.major_threshold_with_sst_size = 2;
Expand Down
Loading

0 comments on commit fdbc3e5

Please sign in to comment.