Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add get insert remove insert_batch and scan to DB #38

Merged
merged 1 commit into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async fn main() {
let mut txn = db.transaction().await;

// set with owned value
txn.set(User {
txn.insert(User {
name: "Alice".into(),
age: 22,
email: Some("[email protected]".into()),
Expand Down
6 changes: 3 additions & 3 deletions src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,13 +448,13 @@ pub(crate) mod tests {
timestamp::Timestamp,
version::{edit::VersionEdit, Version},
wal::log::LogType,
DataBaseError, DbOption,
DbError, DbOption,
};

async fn build_immutable<R, FP>(
option: &DbOption,
records: Vec<(LogType, R, Timestamp)>,
) -> Result<Immutable<R::Columns>, DataBaseError<R>>
) -> Result<Immutable<R::Columns>, DbError<R>>
where
R: Record + Send,
FP: FileProvider,
Expand All @@ -471,7 +471,7 @@ pub(crate) mod tests {
option: &DbOption,
gen: FileId,
records: Vec<(LogType, R, Timestamp)>,
) -> Result<(), DataBaseError<R>>
) -> Result<(), DbError<R>>
where
R: Record + Send,
FP: Executor,
Expand Down
8 changes: 4 additions & 4 deletions src/inmem/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
Timestamp, EPOCH,
},
wal::{log::LogType, WalFile},
DataBaseError, DbOption,
DbError, DbOption,
};

pub(crate) type MutableScan<'scan, R> = Range<
Expand Down Expand Up @@ -67,7 +67,7 @@ where
log_ty: LogType,
record: R,
ts: Timestamp,
) -> Result<usize, DataBaseError<R>> {
) -> Result<usize, DbError<R>> {
self.append(Some(log_ty), record.key().to_key(), ts, Some(record))
.await
}
Expand All @@ -77,7 +77,7 @@ where
log_ty: LogType,
key: R::Key,
ts: Timestamp,
) -> Result<usize, DataBaseError<R>> {
) -> Result<usize, DbError<R>> {
self.append(Some(log_ty), key, ts, None).await
}

Expand All @@ -87,7 +87,7 @@ where
key: R::Key,
ts: Timestamp,
value: Option<R>,
) -> Result<usize, DataBaseError<R>> {
) -> Result<usize, DbError<R>> {
let timestamped_key = Timestamped::new(key, ts);

if let Some(log_ty) = log_ty {
Expand Down
100 changes: 80 additions & 20 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::{
};

use async_lock::{RwLock, RwLockReadGuard};
use async_stream::stream;
use flume::{bounded, Sender};
use fs::FileProvider;
use futures_core::Stream;
Expand All @@ -38,7 +39,7 @@ use record::Record;
use thiserror::Error;
use timestamp::Timestamp;
use tracing::error;
use transaction::Transaction;
use transaction::{CommitError, Transaction, TransactionEntry};

pub use crate::option::*;
use crate::{
Expand Down Expand Up @@ -69,7 +70,7 @@ where
R::Columns: Send + Sync,
E: Executor + Send + Sync + 'static,
{
pub async fn new(option: DbOption, executor: E) -> Result<Self, DataBaseError<R>> {
pub async fn new(option: DbOption, executor: E) -> Result<Self, DbError<R>> {
let option = Arc::new(option);
E::create_dir_all(&option.path).await?;
E::create_dir_all(&option.wal_dir_path()).await?;
Expand Down Expand Up @@ -120,7 +121,72 @@ where
)
}

pub(crate) async fn write(&self, record: R, ts: Timestamp) -> Result<(), DataBaseError<R>> {
pub async fn insert(&self, record: R) -> Result<(), CommitError<R>> {
Ok(self
.write(record, self.version_set.transaction_ts())
.await?)
}

pub async fn insert_batch(
&self,
records: impl ExactSizeIterator<Item = R>,
) -> Result<(), CommitError<R>> {
Ok(self
.write_batch(records, self.version_set.transaction_ts())
.await?)
}

pub async fn remove(&self, key: R::Key) -> Result<bool, CommitError<R>> {
Ok(self
.schema
.read()
.await
.remove(LogType::Full, key, self.version_set.transaction_ts())
.await?)
}

pub async fn get<T>(
&self,
key: &R::Key,
mut f: impl FnMut(TransactionEntry<'_, R>) -> T,
) -> Result<Option<T>, CommitError<R>> {
Ok(self
.schema
.read()
.await
.get(
&*self.version_set.current().await,
key,
self.version_set.transaction_ts(),
Projection::All,
)
.await?
.map(|e| f(TransactionEntry::Stream(e))))
}

pub async fn scan<'scan, T: 'scan>(
&'scan self,
range: (Bound<&'scan R::Key>, Bound<&'scan R::Key>),
mut f: impl FnMut(TransactionEntry<'_, R>) -> T + 'scan,
) -> impl Stream<Item = Result<T, CommitError<R>>> + 'scan {
stream! {
let schema = self.schema.read().await;
let current = self.version_set.current().await;
let mut scan = Scan::new(
&schema,
range,
self.version_set.transaction_ts(),
&*current,
Vec::new(),
).take().await?;

while let Some(record) = scan.next().await {
yield Ok(f(TransactionEntry::Stream(record?)))
}
}
}

pub(crate) async fn write(&self, record: R, ts: Timestamp) -> Result<(), DbError<R>> {
let schema = self.schema.read().await;

if schema.write(LogType::Full, record, ts).await? {
Expand All @@ -134,7 +200,7 @@ where
&self,
mut records: impl ExactSizeIterator<Item = R>,
ts: Timestamp,
) -> Result<(), DataBaseError<R>> {
) -> Result<(), DbError<R>> {
let schema = self.schema.read().await;

if let Some(first) = records.next() {
Expand Down Expand Up @@ -186,7 +252,7 @@ where
option: Arc<DbOption>,
compaction_tx: Sender<CompactTask>,
version_set: &VersionSet<R, FP>,
) -> Result<Self, DataBaseError<R>> {
) -> Result<Self, DbError<R>> {
let mut schema = Schema {
mutable: Mutable::new(&option).await?,
immutables: Default::default(),
Expand Down Expand Up @@ -247,12 +313,7 @@ where
Ok(schema)
}

async fn write(
&self,
log_ty: LogType,
record: R,
ts: Timestamp,
) -> Result<bool, DataBaseError<R>> {
async fn write(&self, log_ty: LogType, record: R, ts: Timestamp) -> Result<bool, DbError<R>> {
Ok(self.mutable.insert(log_ty, record, ts).await? > self.option.max_mem_table_size)
}

Expand All @@ -261,7 +322,7 @@ where
log_ty: LogType,
key: R::Key,
ts: Timestamp,
) -> Result<bool, DataBaseError<R>> {
) -> Result<bool, DbError<R>> {
Ok(self.mutable.remove(log_ty, key, ts).await? > self.option.max_mem_table_size)
}

Expand All @@ -270,7 +331,7 @@ where
key: R::Key,
ts: Timestamp,
value: Option<R>,
) -> Result<bool, DataBaseError<R>> {
) -> Result<bool, DbError<R>> {
Ok(self.mutable.append(None, key, ts, value).await? > self.option.max_mem_table_size)
}

Expand All @@ -280,7 +341,7 @@ where
key: &'get R::Key,
ts: Timestamp,
projection: Projection,
) -> Result<Option<Entry<'get, R>>, DataBaseError<R>>
) -> Result<Option<Entry<'get, R>>, DbError<R>>
where
FP: FileProvider,
{
Expand Down Expand Up @@ -374,7 +435,7 @@ where

pub async fn take(
mut self,
) -> Result<impl Stream<Item = Result<Entry<'scan, R>, ParquetError>>, DataBaseError<R>> {
) -> Result<impl Stream<Item = Result<Entry<'scan, R>, ParquetError>>, DbError<R>> {
self.streams.push(
self.schema
.mutable
Expand Down Expand Up @@ -404,8 +465,7 @@ where
pub async fn package(
mut self,
batch_size: usize,
) -> Result<impl Stream<Item = Result<R::Columns, ParquetError>> + 'scan, DataBaseError<R>>
{
) -> Result<impl Stream<Item = Result<R::Columns, ParquetError>> + 'scan, DbError<R>> {
self.streams.push(
self.schema
.mutable
Expand Down Expand Up @@ -439,7 +499,7 @@ where
}

#[derive(Debug, Error)]
pub enum DataBaseError<R>
pub enum DbError<R>
where
R: Record,
{
Expand Down Expand Up @@ -491,7 +551,7 @@ pub(crate) mod tests {
serdes::{Decode, Encode},
version::{cleaner::Cleaner, set::tests::build_version_set, Version},
wal::log::LogType,
DataBaseError, DbOption, Immutable, Projection, Record, DB,
DbError, DbOption, Immutable, Projection, Record, DB,
};

#[derive(Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -821,7 +881,7 @@ pub(crate) mod tests {
executor: E,
schema: crate::Schema<R, E>,
version: Version<R, E>,
) -> Result<DB<R, E>, DataBaseError<R>>
) -> Result<DB<R, E>, DbError<R>>
where
R: Record + Send + Sync,
R::Columns: Send + Sync,
Expand Down
26 changes: 13 additions & 13 deletions src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{
timestamp::{Timestamp, Timestamped},
version::VersionRef,
wal::log::LogType,
DataBaseError, LockMap, Projection, Record, Scan, Schema,
DbError, LockMap, Projection, Record, Scan, Schema,
};

pub(crate) struct TransactionScan<'scan, R: Record> {
Expand Down Expand Up @@ -76,7 +76,7 @@ where
&'get self,
key: &'get R::Key,
projection: Projection,
) -> Result<Option<TransactionEntry<'get, R>>, DataBaseError<R>> {
) -> Result<Option<TransactionEntry<'get, R>>, DbError<R>> {
Ok(match self.local.get(key).and_then(|v| v.as_ref()) {
Some(v) => Some(TransactionEntry::Local(v.as_record_ref())),
None => self
Expand Down Expand Up @@ -105,7 +105,7 @@ where
Scan::new(&self.share, range, self.ts, &self.version, streams)
}

pub fn set(&mut self, value: R) {
pub fn insert(&mut self, value: R) {
self.entry(value.key().to_key(), Some(value))
}

Expand Down Expand Up @@ -215,7 +215,7 @@ where
#[error("transaction parquet error {:?}", .0)]
Parquet(#[from] ParquetError),
#[error("transaction database error {:?}", .0)]
DataBase(#[from] DataBaseError<R>),
Database(#[from] DbError<R>),
#[error("transaction write conflict: {:?}", .0)]
WriteConflict(R::Key),
}
Expand Down Expand Up @@ -245,7 +245,7 @@ mod tests {
.unwrap();
{
let mut txn1 = db.transaction().await;
txn1.set("foo".to_string());
txn1.insert("foo".to_string());

let txn2 = db.transaction().await;
dbg!(txn2
Expand Down Expand Up @@ -279,18 +279,18 @@ mod tests {
.unwrap();

let mut txn = db.transaction().await;
txn.set(0.to_string());
txn.set(1.to_string());
txn.insert(0.to_string());
txn.insert(1.to_string());
txn.commit().await.unwrap();

let mut txn_0 = db.transaction().await;
let mut txn_1 = db.transaction().await;
let mut txn_2 = db.transaction().await;

txn_0.set(1.to_string());
txn_1.set(1.to_string());
txn_1.set(2.to_string());
txn_2.set(2.to_string());
txn_0.insert(1.to_string());
txn_1.insert(1.to_string());
txn_1.insert(2.to_string());
txn_2.insert(2.to_string());

txn_0.commit().await.unwrap();

Expand All @@ -312,7 +312,7 @@ mod tests {
.unwrap();

let mut txn1 = db.transaction().await;
txn1.set(Test {
txn1.insert(Test {
vstring: 0.to_string(),
vu32: 0,
vbool: Some(true),
Expand Down Expand Up @@ -341,7 +341,7 @@ mod tests {
.unwrap();

let mut txn = db.transaction().await;
txn.set(Test {
txn.insert(Test {
vstring: "king".to_string(),
vu32: 8,
vbool: Some(true),
Expand Down
2 changes: 1 addition & 1 deletion src/wal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ where
}

#[derive(Debug, Error)]
pub(crate) enum RecoverError<E: std::error::Error> {
pub enum RecoverError<E: std::error::Error> {
#[error("wal recover decode error: {0}")]
Decode(E),
#[error("wal recover checksum error")]
Expand Down
Loading