Skip to content

Commit

Permalink
feat: complete example
Browse files Browse the repository at this point in the history
  • Loading branch information
ethe committed Jul 30, 2024
1 parent 73f8340 commit c661581
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 45 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:
- name: Run cargo test
uses: actions-rs/cargo@v1
with:
command: test
command: test --all-features
# 2
fmt:
name: Rust fmt
Expand Down
69 changes: 56 additions & 13 deletions examples/declare.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,64 @@
use morseldb::morsel_record;
use std::ops::Bound;

use futures_util::stream::StreamExt;

use morseldb::executor::tokio::TokioExecutor;
use morseldb::{morsel_record, Projection, DB};

// Tips: must be public
#[morsel_record]
pub struct Post {
pub struct User {
#[primary_key]
name: String,
vu8: u8,
vu16: u16,
vu32: u32,
vu64: u64,
vi8: i8,
vi16: i16,
vi32: i32,
vi64: i64,
vbool: bool,
// email: Option<String>,
age: u8,
}

fn main() {
println!("Hello, world!");
#[tokio::main]
async fn main() {
let db = DB::<User, _>::new("./db_path/users".into(), TokioExecutor::default())
.await
.unwrap();

{
// morseldb supports transaction
let mut txn = db.transaction().await;

// set with owned value
txn.set(User {
name: "Alice".into(),
// email: None,
age: 22,
});

// get from primary key
let name = "Alice".into();
let user = txn.get(&name, Projection::All).await.unwrap();
assert!(user.is_some());
assert_eq!(user.unwrap().get().age, Some(22));

let upper = "Blob".into();
let mut scan = txn
.scan((Bound::Included(&name), Bound::Excluded(&upper)))
.await
.projection(vec![1])
.take()
.await
.unwrap();
loop {
let user = scan.next().await.transpose().unwrap();
match user {
Some(entry) => {
assert_eq!(
entry.value(),
Some(UserRef {
name: "Alice",
age: Some(22),
})
);
}
None => break,
}
}
}
}
5 changes: 3 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ where
R: Record + Send,
E: Executor,
{
pub async fn new(option: Arc<DbOption>, executor: E) -> Result<Self, WriteError<R>> {
pub async fn new(option: DbOption, executor: E) -> Result<Self, WriteError<R>> {
let option = Arc::new(option);
E::create_dir_all(&option.path).await?;

let schema = Arc::new(RwLock::new(Schema::default()));
Expand Down Expand Up @@ -507,7 +508,7 @@ pub(crate) mod tests {
}

pub(crate) async fn get_test_record_batch<E: Executor>(
option: Arc<DbOption>,
option: DbOption,
executor: E,
) -> RecordBatch {
let db: DB<Test, E> = DB::new(option, executor).await.unwrap();
Expand Down
6 changes: 3 additions & 3 deletions src/ondisk/sstable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ pub(crate) mod tests {
async fn write_sstable() {
let temp_dir = tempfile::tempdir().unwrap();
let record_batch = get_test_record_batch::<TokioExecutor>(
Arc::new(DbOption::from(temp_dir.path())),
DbOption::from(temp_dir.path()),
TokioExecutor::new(),
)
.await;
Expand All @@ -184,7 +184,7 @@ pub(crate) mod tests {
async fn projection_query() {
let temp_dir = tempfile::tempdir().unwrap();
let record_batch = get_test_record_batch::<TokioExecutor>(
Arc::new(DbOption::from(temp_dir.path())),
DbOption::from(temp_dir.path()),
TokioExecutor::new(),
)
.await;
Expand Down Expand Up @@ -255,7 +255,7 @@ pub(crate) mod tests {
async fn projection_scan() {
let temp_dir = tempfile::tempdir().unwrap();
let record_batch = get_test_record_batch::<TokioExecutor>(
Arc::new(DbOption::from(temp_dir.path())),
DbOption::from(temp_dir.path()),
TokioExecutor::new(),
)
.await;
Expand Down
2 changes: 1 addition & 1 deletion src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ where
}
}

pub(crate) fn value(&self) -> Option<R::Ref<'_>> {
pub fn value(&self) -> Option<R::Ref<'_>> {
match self {
Entry::Transaction((_, value)) => value.as_ref().map(R::as_record_ref),
Entry::Mutable(entry) => entry.value().as_ref().map(R::as_record_ref),
Expand Down
44 changes: 19 additions & 25 deletions src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ where
if entry.value().is_none() {
None
} else {
TransactionEntry::Stream(entry).into()
Some(TransactionEntry::Stream(entry))
}
}),
})
Expand Down Expand Up @@ -159,12 +159,12 @@ impl<'entry, R> TransactionEntry<'entry, R>
where
R: Record,
{
pub fn get(&self) -> Option<R::Ref<'_>> {
pub fn get(&self) -> R::Ref<'_> {
match self {
TransactionEntry::Stream(entry) => entry.value(),
TransactionEntry::Stream(entry) => entry.value().unwrap(),
TransactionEntry::Local(value) => {
// Safety: shorter lifetime must be safe
Some(unsafe { transmute::<R::Ref<'entry>, R::Ref<'_>>(*value) })
unsafe { transmute::<R::Ref<'entry>, R::Ref<'_>>(*value) }
}
}
}
Expand Down Expand Up @@ -202,12 +202,10 @@ mod tests {
async fn transaction_read_write() {
let temp_dir = TempDir::new().unwrap();

let db = DB::<String, TokioExecutor>::new(
Arc::new(DbOption::from(temp_dir.path())),
TokioExecutor::new(),
)
.await
.unwrap();
let db =
DB::<String, TokioExecutor>::new(DbOption::from(temp_dir.path()), TokioExecutor::new())
.await
.unwrap();
{
let mut txn1 = db.transaction().await;
txn1.set("foo".to_string());
Expand Down Expand Up @@ -238,12 +236,10 @@ mod tests {
async fn write_conflicts() {
let temp_dir = TempDir::new().unwrap();

let db = DB::<String, TokioExecutor>::new(
Arc::new(DbOption::from(temp_dir.path())),
TokioExecutor::new(),
)
.await
.unwrap();
let db =
DB::<String, TokioExecutor>::new(DbOption::from(temp_dir.path()), TokioExecutor::new())
.await
.unwrap();

let mut txn = db.transaction().await;
txn.set(0.to_string());
Expand Down Expand Up @@ -273,12 +269,10 @@ mod tests {
async fn transaction_projection() {
let temp_dir = TempDir::new().unwrap();

let db = DB::<Test, TokioExecutor>::new(
Arc::new(DbOption::from(temp_dir.path())),
TokioExecutor::new(),
)
.await
.unwrap();
let db =
DB::<Test, TokioExecutor>::new(DbOption::from(temp_dir.path()), TokioExecutor::new())
.await
.unwrap();

let mut txn1 = db.transaction().await;
txn1.set(Test {
Expand All @@ -290,9 +284,9 @@ mod tests {
let key = 0.to_string();
let entry = txn1.get(&key, Projection::All).await.unwrap().unwrap();

assert_eq!(entry.get().unwrap().vstring, 0.to_string());
assert_eq!(entry.get().unwrap().vu32, Some(0));
assert_eq!(entry.get().unwrap().vbool, Some(true));
assert_eq!(entry.get().vstring, 0.to_string());
assert_eq!(entry.get().vu32, Some(0));
assert_eq!(entry.get().vbool, Some(true));
drop(entry);

txn1.commit().await.unwrap();
Expand Down

0 comments on commit c661581

Please sign in to comment.