Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into refactor/schema-trait
Browse files Browse the repository at this point in the history
  • Loading branch information
ethe committed Dec 16, 2024
2 parents ca90c0e + 1a48277 commit 14d6e5f
Show file tree
Hide file tree
Showing 15 changed files with 45 additions and 42 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub struct User {
#[tokio::main]
async fn main() {
// pluggable async runtime and I/O
let db = DB::new("./db_path/users".into(), TokioExecutor::default())
let db = DB::new("./db_path/users".into(), TokioExecutor::current())
.await
.unwrap();

Expand Down
10 changes: 8 additions & 2 deletions benches/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,11 @@ impl BenchDatabase for TonboS3BenchDataBase {
.unwrap()
.disable_wal();

TonboS3BenchDataBase::new(tonbo::DB::new(option, TokioExecutor::new()).await.unwrap())
TonboS3BenchDataBase::new(
tonbo::DB::new(option, TokioExecutor::current())
.await
.unwrap(),
)
}
}

Expand Down Expand Up @@ -320,7 +324,9 @@ impl BenchDatabase for TonboBenchDataBase {
DbOption::from(fusio::path::Path::from_filesystem_path(path.as_ref()).unwrap())
.disable_wal();

let db = tonbo::DB::new(option, TokioExecutor::new()).await.unwrap();
let db = tonbo::DB::new(option, TokioExecutor::current())
.await
.unwrap();
TonboBenchDataBase::new(db)
}
}
Expand Down
2 changes: 1 addition & 1 deletion benches/criterion/writes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ fn single_write(c: &mut Criterion) {
let option = DbOption::from(fusio::path::Path::from_filesystem_path("/tmp/tonbo").unwrap())
.disable_wal();
let db = runtime
.block_on(async { DB::new(option, TokioExecutor::default()).await })
.block_on(async { DB::new(option, TokioExecutor::current()).await })
.unwrap();

group.bench_with_input(BenchmarkId::new("Tonbo", batch), &batch, |b, batch| {
Expand Down
2 changes: 1 addition & 1 deletion bindings/python/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl TonboDB {
.block_on(async {
DB::with_schema(
option,
TokioExecutor::new(),
TokioExecutor::current(),
schema,
)
.await
Expand Down
2 changes: 1 addition & 1 deletion examples/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ async fn main() -> Result<()> {
&MusicSchema,
));

let db = DB::new(options, TokioExecutor::default(), MusicSchema)
let db = DB::new(options, TokioExecutor::current(), MusicSchema)
.await
.unwrap();
for (id, name, like) in [
Expand Down
2 changes: 1 addition & 1 deletion examples/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async fn main() {
&UserSchema,
));
// pluggable async runtime and I/O
let db = DB::new(options, TokioExecutor::default(), UserSchema)
let db = DB::new(options, TokioExecutor::current(), UserSchema)
.await
.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion examples/dynamic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async fn main() {
Path::from_filesystem_path("./db_path/users").unwrap(),
&schema,
));
let db = DB::with_schema(options, TokioExecutor::new(), schema)
let db = DB::with_schema(options, TokioExecutor::current(), schema)
.await
.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1286,7 +1286,7 @@ pub(crate) mod tests {
option.major_default_oldest_table_num = 1;
option.trigger_type = TriggerType::Length(5);

let db: DB<Test, TokioExecutor> = DB::new(option, TokioExecutor::new(), TestSchema)
let db: DB<Test, TokioExecutor> = DB::new(option, TokioExecutor::current(), TestSchema)
.await
.unwrap();

Expand Down
8 changes: 1 addition & 7 deletions src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,8 @@ pub mod tokio {
handle: Handle,
}

impl Default for TokioExecutor {
fn default() -> Self {
Self::new()
}
}

impl TokioExecutor {
pub fn new() -> Self {
pub fn current() -> Self {
Self {
handle: Handle::current(),
}
Expand Down
29 changes: 16 additions & 13 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
//! &UserSchema,
//! ));
//! // pluggable async runtime and I/O
//! let db = DB::new(options, TokioExecutor::default(), UserSchema)
//! let db = DB::new(options, TokioExecutor::current(), UserSchema)
//! .await
//! .unwrap();
//! // insert with owned value
Expand Down Expand Up @@ -1606,7 +1606,7 @@ pub(crate) mod tests {
option.major_default_oldest_table_num = 1;
option.trigger_type = TriggerType::Length(/* max_mutable_len */ 5);

let db: DB<Test, TokioExecutor> = DB::new(option, TokioExecutor::new(), TestSchema)
let db: DB<Test, TokioExecutor> = DB::new(option, TokioExecutor::current(), TestSchema)
.await
.unwrap();

Expand Down Expand Up @@ -1647,7 +1647,7 @@ pub(crate) mod tests {
option.major_default_oldest_table_num = 1;
option.trigger_type = TriggerType::Length(/* max_mutable_len */ 50);

let db: DB<Test, TokioExecutor> = DB::new(option, TokioExecutor::new(), TestSchema)
let db: DB<Test, TokioExecutor> = DB::new(option, TokioExecutor::current(), TestSchema)
.await
.unwrap();

Expand Down Expand Up @@ -1701,10 +1701,13 @@ pub(crate) mod tests {
schema.flush_wal().await.unwrap();
drop(schema);

let db: DB<Test, TokioExecutor> =
DB::new(option.as_ref().to_owned(), TokioExecutor::new(), TestSchema)
.await
.unwrap();
let db: DB<Test, TokioExecutor> = DB::new(
option.as_ref().to_owned(),
TokioExecutor::current(),
TestSchema,
)
.await
.unwrap();

let mut sort_items = BTreeMap::new();
for item in test_items() {
Expand Down Expand Up @@ -1781,7 +1784,7 @@ pub(crate) mod tests {
);
let dyn_schema = test_dyn_item_schema();
let db: DB<DynRecord, TokioExecutor> =
DB::with_schema(option, TokioExecutor::new(), dyn_schema)
DB::with_schema(option, TokioExecutor::current(), dyn_schema)
.await
.unwrap();

Expand Down Expand Up @@ -1823,7 +1826,7 @@ pub(crate) mod tests {
option.major_threshold_with_sst_size = 3;
option.major_default_oldest_table_num = 1;
option.trigger_type = TriggerType::Length(5);
let db: DB<Test, TokioExecutor> = DB::new(option, TokioExecutor::new(), TestSchema)
let db: DB<Test, TokioExecutor> = DB::new(option, TokioExecutor::current(), TestSchema)
.await
.unwrap();

Expand Down Expand Up @@ -1868,7 +1871,7 @@ pub(crate) mod tests {
option.trigger_type = TriggerType::Length(5);

let db: DB<DynRecord, TokioExecutor> =
DB::with_schema(option, TokioExecutor::new(), dyn_schema)
DB::with_schema(option, TokioExecutor::current(), dyn_schema)
.await
.unwrap();

Expand Down Expand Up @@ -2098,15 +2101,15 @@ pub(crate) mod tests {
option3.trigger_type = TriggerType::Length(5);

let db1: DB<DynRecord, TokioExecutor> =
DB::with_schema(option, TokioExecutor::new(), test_dyn_item_schema())
DB::with_schema(option, TokioExecutor::current(), test_dyn_item_schema())
.await
.unwrap();
let db2: DB<DynRecord, TokioExecutor> =
DB::with_schema(option2, TokioExecutor::new(), test_dyn_item_schema())
DB::with_schema(option2, TokioExecutor::current(), test_dyn_item_schema())
.await
.unwrap();
let db3: DB<DynRecord, TokioExecutor> =
DB::with_schema(option3, TokioExecutor::new(), test_dyn_item_schema())
DB::with_schema(option3, TokioExecutor::current(), test_dyn_item_schema())
.await
.unwrap();

Expand Down
4 changes: 2 additions & 2 deletions src/ondisk/sstable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ pub(crate) mod tests {
Path::from_filesystem_path(temp_dir.path()).unwrap(),
&TestSchema,
)),
TokioExecutor::new(),
TokioExecutor::current(),
)
.await;
let table_path = temp_dir.path().join("projection_query_test.parquet");
Expand Down Expand Up @@ -275,7 +275,7 @@ pub(crate) mod tests {
Path::from_filesystem_path(temp_dir.path()).unwrap(),
&TestSchema,
)),
TokioExecutor::new(),
TokioExecutor::current(),
)
.await;
let table_path = temp_dir.path().join("projection_scan_test.parquet");
Expand Down
2 changes: 1 addition & 1 deletion src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ mod tests {
let db = build_db(
option,
compaction_rx,
TokioExecutor::new(),
TokioExecutor::current(),
schema,
Arc::new(TestSchema),
version,
Expand Down
16 changes: 8 additions & 8 deletions src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ mod tests {
Path::from_filesystem_path(temp_dir.path()).unwrap(),
&StringSchema,
)),
TokioExecutor::new(),
TokioExecutor::current(),
StringSchema,
)
.await
Expand Down Expand Up @@ -333,7 +333,7 @@ mod tests {
let db = build_db(
option,
compaction_rx,
TokioExecutor::new(),
TokioExecutor::current(),
schema,
Arc::new(TestSchema),
version,
Expand Down Expand Up @@ -407,7 +407,7 @@ mod tests {
&StringSchema,
));

let db = DB::<String, TokioExecutor>::new(option, TokioExecutor::new(), StringSchema)
let db = DB::<String, TokioExecutor>::new(option, TokioExecutor::current(), StringSchema)
.await
.unwrap();

Expand Down Expand Up @@ -443,7 +443,7 @@ mod tests {
&TestSchema,
));

let db = DB::<Test, TokioExecutor>::new(option, TokioExecutor::new(), TestSchema)
let db = DB::<Test, TokioExecutor>::new(option, TokioExecutor::current(), TestSchema)
.await
.unwrap();

Expand Down Expand Up @@ -502,7 +502,7 @@ mod tests {
let db = build_db(
option,
compaction_rx,
TokioExecutor::new(),
TokioExecutor::current(),
schema,
Arc::new(TestSchema),
version,
Expand Down Expand Up @@ -599,7 +599,7 @@ mod tests {
let db = build_db(
option,
compaction_rx,
TokioExecutor::new(),
TokioExecutor::current(),
schema,
Arc::new(TestSchema),
version,
Expand Down Expand Up @@ -777,7 +777,7 @@ mod tests {
let db = build_db(
option,
compaction_rx,
TokioExecutor::new(),
TokioExecutor::current(),
schema,
Arc::new(TestSchema),
version,
Expand Down Expand Up @@ -829,7 +829,7 @@ mod tests {
"age".to_string(),
0,
);
let db = DB::with_schema(option, TokioExecutor::default(), test_dyn_item_schema())
let db = DB::with_schema(option, TokioExecutor::current(), test_dyn_item_schema())
.await
.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion src/version/cleaner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ pub(crate) mod tests {

let (mut cleaner, tx) = Cleaner::<Test>::new(option.clone(), manager.clone());

let executor = TokioExecutor::new();
let executor = TokioExecutor::current();

executor.spawn(async move {
if let Err(err) = cleaner.listen().await {
Expand Down
2 changes: 1 addition & 1 deletion tests/data_integrity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ mod tests {
&CustomerSchema,
));

let db: DB<Customer, TokioExecutor> = DB::new(option, TokioExecutor::new(), CustomerSchema)
let db: DB<Customer, TokioExecutor> = DB::new(option, TokioExecutor::current(), CustomerSchema)
.await
.unwrap();

Expand Down

0 comments on commit 14d6e5f

Please sign in to comment.