Skip to content

Commit

Permalink
feat: add get insert remove insert_batch and scan to DB
Browse files Browse the repository at this point in the history
  • Loading branch information
ethe committed Aug 1, 2024
1 parent 5176194 commit 8d860cf
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 42 deletions.
2 changes: 1 addition & 1 deletion examples/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async fn main() {
let mut txn = db.transaction().await;

// set with owned value
txn.set(User {
txn.insert(User {
name: "Alice".into(),
age: 22,
email: Some("[email protected]".into()),
Expand Down
6 changes: 3 additions & 3 deletions src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,13 +448,13 @@ pub(crate) mod tests {
timestamp::Timestamp,
version::{edit::VersionEdit, Version},
wal::log::LogType,
DataBaseError, DbOption,
DbError, DbOption,
};

async fn build_immutable<R, FP>(
option: &DbOption,
records: Vec<(LogType, R, Timestamp)>,
) -> Result<Immutable<R::Columns>, DataBaseError<R>>
) -> Result<Immutable<R::Columns>, DbError<R>>
where
R: Record + Send,
FP: FileProvider,
Expand All @@ -471,7 +471,7 @@ pub(crate) mod tests {
option: &DbOption,
gen: FileId,
records: Vec<(LogType, R, Timestamp)>,
) -> Result<(), DataBaseError<R>>
) -> Result<(), DbError<R>>
where
R: Record + Send,
FP: Executor,
Expand Down
8 changes: 4 additions & 4 deletions src/inmem/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
Timestamp, EPOCH,
},
wal::{log::LogType, WalFile},
DataBaseError, DbOption,
DbError, DbOption,
};

pub(crate) type MutableScan<'scan, R> = Range<
Expand Down Expand Up @@ -67,7 +67,7 @@ where
log_ty: LogType,
record: R,
ts: Timestamp,
) -> Result<usize, DataBaseError<R>> {
) -> Result<usize, DbError<R>> {
self.append(Some(log_ty), record.key().to_key(), ts, Some(record))
.await
}
Expand All @@ -77,7 +77,7 @@ where
log_ty: LogType,
key: R::Key,
ts: Timestamp,
) -> Result<usize, DataBaseError<R>> {
) -> Result<usize, DbError<R>> {
self.append(Some(log_ty), key, ts, None).await
}

Expand All @@ -87,7 +87,7 @@ where
key: R::Key,
ts: Timestamp,
value: Option<R>,
) -> Result<usize, DataBaseError<R>> {
) -> Result<usize, DbError<R>> {
let timestamped_key = Timestamped::new(key, ts);

if let Some(log_ty) = log_ty {
Expand Down
97 changes: 77 additions & 20 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::{
};

use async_lock::{RwLock, RwLockReadGuard};
use async_stream::stream;
use flume::{bounded, Sender};
use fs::FileProvider;
use futures_core::Stream;
Expand All @@ -38,7 +39,7 @@ use record::Record;
use thiserror::Error;
use timestamp::Timestamp;
use tracing::error;
use transaction::Transaction;
use transaction::{CommitError, Transaction, TransactionEntry};

pub use crate::option::*;
use crate::{
Expand Down Expand Up @@ -69,7 +70,7 @@ where
R::Columns: Send + Sync,
E: Executor + Send + Sync + 'static,
{
pub async fn new(option: DbOption, executor: E) -> Result<Self, DataBaseError<R>> {
pub async fn new(option: DbOption, executor: E) -> Result<Self, DbError<R>> {
let option = Arc::new(option);
E::create_dir_all(&option.path).await?;
E::create_dir_all(&option.wal_dir_path()).await?;
Expand Down Expand Up @@ -120,7 +121,69 @@ where
)
}

pub(crate) async fn write(&self, record: R, ts: Timestamp) -> Result<(), DataBaseError<R>> {
pub async fn insert(&self, record: R) -> Result<(), CommitError<R>> {
Ok(self
.write(record, self.version_set.transaction_ts())
.await?)
}

pub async fn insert_batch(
&self,
records: impl ExactSizeIterator<Item = R>,
) -> Result<(), CommitError<R>> {
Ok(self
.write_batch(records, self.version_set.transaction_ts())
.await?)
}

pub async fn remove(&self, key: R::Key) -> Result<(), CommitError<R>> {
let mut txn = self.transaction().await;
txn.remove(key);
txn.commit().await
}

pub async fn get<T>(
&self,
key: &R::Key,
mut f: impl FnMut(TransactionEntry<'_, R>) -> T,
) -> Result<Option<T>, CommitError<R>> {
Ok(self
.schema
.read()
.await
.get(
&*self.version_set.current().await,
key,
self.version_set.transaction_ts(),
Projection::All,
)
.await?
.map(|e| f(TransactionEntry::Stream(e))))
}

pub async fn scan<'scan, T: 'scan>(
&'scan self,
range: (Bound<&'scan R::Key>, Bound<&'scan R::Key>),
mut f: impl FnMut(TransactionEntry<'_, R>) -> T + 'scan,
) -> impl Stream<Item = Result<T, CommitError<R>>> + 'scan {
stream! {
let schema = self.schema.read().await;
let current = self.version_set.current().await;
let mut scan = Scan::new(
&schema,
range,
self.version_set.transaction_ts(),
&*current,
Vec::new(),
).take().await?;

while let Some(record) = scan.next().await {
yield Ok(f(TransactionEntry::Stream(record?)))
}
}
}

pub(crate) async fn write(&self, record: R, ts: Timestamp) -> Result<(), DbError<R>> {
let schema = self.schema.read().await;

if schema.write(LogType::Full, record, ts).await? {
Expand All @@ -134,7 +197,7 @@ where
&self,
mut records: impl ExactSizeIterator<Item = R>,
ts: Timestamp,
) -> Result<(), DataBaseError<R>> {
) -> Result<(), DbError<R>> {
let schema = self.schema.read().await;

if let Some(first) = records.next() {
Expand Down Expand Up @@ -186,7 +249,7 @@ where
option: Arc<DbOption>,
compaction_tx: Sender<CompactTask>,
version_set: &VersionSet<R, FP>,
) -> Result<Self, DataBaseError<R>> {
) -> Result<Self, DbError<R>> {
let mut schema = Schema {
mutable: Mutable::new(&option).await?,
immutables: Default::default(),
Expand Down Expand Up @@ -247,12 +310,7 @@ where
Ok(schema)
}

async fn write(
&self,
log_ty: LogType,
record: R,
ts: Timestamp,
) -> Result<bool, DataBaseError<R>> {
async fn write(&self, log_ty: LogType, record: R, ts: Timestamp) -> Result<bool, DbError<R>> {
Ok(self.mutable.insert(log_ty, record, ts).await? > self.option.max_mem_table_size)
}

Expand All @@ -261,7 +319,7 @@ where
log_ty: LogType,
key: R::Key,
ts: Timestamp,
) -> Result<bool, DataBaseError<R>> {
) -> Result<bool, DbError<R>> {
Ok(self.mutable.remove(log_ty, key, ts).await? > self.option.max_mem_table_size)
}

Expand All @@ -270,7 +328,7 @@ where
key: R::Key,
ts: Timestamp,
value: Option<R>,
) -> Result<bool, DataBaseError<R>> {
) -> Result<bool, DbError<R>> {
Ok(self.mutable.append(None, key, ts, value).await? > self.option.max_mem_table_size)
}

Expand All @@ -280,7 +338,7 @@ where
key: &'get R::Key,
ts: Timestamp,
projection: Projection,
) -> Result<Option<Entry<'get, R>>, DataBaseError<R>>
) -> Result<Option<Entry<'get, R>>, DbError<R>>
where
FP: FileProvider,
{
Expand Down Expand Up @@ -374,7 +432,7 @@ where

pub async fn take(
mut self,
) -> Result<impl Stream<Item = Result<Entry<'scan, R>, ParquetError>>, DataBaseError<R>> {
) -> Result<impl Stream<Item = Result<Entry<'scan, R>, ParquetError>>, DbError<R>> {
self.streams.push(
self.schema
.mutable
Expand Down Expand Up @@ -404,8 +462,7 @@ where
pub async fn package(
mut self,
batch_size: usize,
) -> Result<impl Stream<Item = Result<R::Columns, ParquetError>> + 'scan, DataBaseError<R>>
{
) -> Result<impl Stream<Item = Result<R::Columns, ParquetError>> + 'scan, DbError<R>> {
self.streams.push(
self.schema
.mutable
Expand Down Expand Up @@ -439,7 +496,7 @@ where
}

#[derive(Debug, Error)]
pub enum DataBaseError<R>
pub enum DbError<R>
where
R: Record,
{
Expand Down Expand Up @@ -491,7 +548,7 @@ pub(crate) mod tests {
serdes::{Decode, Encode},
version::{cleaner::Cleaner, set::tests::build_version_set, Version},
wal::log::LogType,
DataBaseError, DbOption, Immutable, Projection, Record, DB,
DbError, DbOption, Immutable, Projection, Record, DB,
};

#[derive(Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -821,7 +878,7 @@ pub(crate) mod tests {
executor: E,
schema: crate::Schema<R, E>,
version: Version<R, E>,
) -> Result<DB<R, E>, DataBaseError<R>>
) -> Result<DB<R, E>, DbError<R>>
where
R: Record + Send + Sync,
R::Columns: Send + Sync,
Expand Down
26 changes: 13 additions & 13 deletions src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{
timestamp::{Timestamp, Timestamped},
version::VersionRef,
wal::log::LogType,
DataBaseError, LockMap, Projection, Record, Scan, Schema,
DbError, LockMap, Projection, Record, Scan, Schema,
};

pub(crate) struct TransactionScan<'scan, R: Record> {
Expand Down Expand Up @@ -76,7 +76,7 @@ where
&'get self,
key: &'get R::Key,
projection: Projection,
) -> Result<Option<TransactionEntry<'get, R>>, DataBaseError<R>> {
) -> Result<Option<TransactionEntry<'get, R>>, DbError<R>> {
Ok(match self.local.get(key).and_then(|v| v.as_ref()) {
Some(v) => Some(TransactionEntry::Local(v.as_record_ref())),
None => self
Expand Down Expand Up @@ -105,7 +105,7 @@ where
Scan::new(&self.share, range, self.ts, &self.version, streams)
}

pub fn set(&mut self, value: R) {
pub fn insert(&mut self, value: R) {
self.entry(value.key().to_key(), Some(value))
}

Expand Down Expand Up @@ -215,7 +215,7 @@ where
#[error("transaction parquet error {:?}", .0)]
Parquet(#[from] ParquetError),
#[error("transaction database error {:?}", .0)]
DataBase(#[from] DataBaseError<R>),
Database(#[from] DbError<R>),
#[error("transaction write conflict: {:?}", .0)]
WriteConflict(R::Key),
}
Expand Down Expand Up @@ -245,7 +245,7 @@ mod tests {
.unwrap();
{
let mut txn1 = db.transaction().await;
txn1.set("foo".to_string());
txn1.insert("foo".to_string());

let txn2 = db.transaction().await;
dbg!(txn2
Expand Down Expand Up @@ -279,18 +279,18 @@ mod tests {
.unwrap();

let mut txn = db.transaction().await;
txn.set(0.to_string());
txn.set(1.to_string());
txn.insert(0.to_string());
txn.insert(1.to_string());
txn.commit().await.unwrap();

let mut txn_0 = db.transaction().await;
let mut txn_1 = db.transaction().await;
let mut txn_2 = db.transaction().await;

txn_0.set(1.to_string());
txn_1.set(1.to_string());
txn_1.set(2.to_string());
txn_2.set(2.to_string());
txn_0.insert(1.to_string());
txn_1.insert(1.to_string());
txn_1.insert(2.to_string());
txn_2.insert(2.to_string());

txn_0.commit().await.unwrap();

Expand All @@ -312,7 +312,7 @@ mod tests {
.unwrap();

let mut txn1 = db.transaction().await;
txn1.set(Test {
txn1.insert(Test {
vstring: 0.to_string(),
vu32: 0,
vbool: Some(true),
Expand Down Expand Up @@ -341,7 +341,7 @@ mod tests {
.unwrap();

let mut txn = db.transaction().await;
txn.set(Test {
txn.insert(Test {
vstring: "king".to_string(),
vu32: 8,
vbool: Some(true),
Expand Down
2 changes: 1 addition & 1 deletion src/wal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ where
}

#[derive(Debug, Error)]
pub(crate) enum RecoverError<E: std::error::Error> {
pub enum RecoverError<E: std::error::Error> {
#[error("wal recover decode error: {0}")]
Decode(E),
#[error("wal recover checksum error")]
Expand Down

0 comments on commit 8d860cf

Please sign in to comment.