Skip to content

Commit

Permalink
refactor: transaction_ts -> Version::transaction & `VersionSet::t…
Browse files Browse the repository at this point in the history
…ransaction`
  • Loading branch information
KKould committed Aug 1, 2024
1 parent eae7796 commit cefc22c
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 77 deletions.
25 changes: 14 additions & 11 deletions src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@ use crate::{
record::{KeyRef, Record},
scope::Scope,
stream::{level::LevelStream, merge::MergeStream, ScanStream},
version::{
edit::VersionEdit,
set::{transaction_ts, VersionSet},
Version, VersionError, MAX_LEVEL,
},
version::{edit::VersionEdit, set::VersionSet, Version, VersionError, MAX_LEVEL},
DbOption, Schema,
};

Expand Down Expand Up @@ -104,7 +100,7 @@ where
}
version_edits.insert(0, VersionEdit::Add { level: 0, scope });
version_edits.push(VersionEdit::LatestTimeStamp {
ts: transaction_ts(),
ts: version_ref.transaction_ts(),
});

self.version_set
Expand Down Expand Up @@ -431,7 +427,10 @@ where

#[cfg(test)]
pub(crate) mod tests {
use std::{collections::VecDeque, sync::Arc};
use std::{
collections::VecDeque,
sync::{atomic::AtomicU32, Arc},
};

use flume::bounded;
use parquet::{arrow::AsyncArrowWriter, errors::ParquetError};
Expand All @@ -449,13 +448,13 @@ pub(crate) mod tests {
timestamp::Timestamp,
version::{edit::VersionEdit, Version},
wal::log::LogType,
DbOption, WriteError,
DataBaseError, DbOption,
};

async fn build_immutable<R, FP>(
option: &DbOption,
records: Vec<(LogType, R, Timestamp)>,
) -> Result<Immutable<R::Columns>, WriteError<R>>
) -> Result<Immutable<R::Columns>, DataBaseError<R>>
where
R: Record + Send,
FP: FileProvider,
Expand All @@ -472,7 +471,7 @@ pub(crate) mod tests {
option: &DbOption,
gen: FileId,
records: Vec<(LogType, R, Timestamp)>,
) -> Result<(), WriteError<R>>
) -> Result<(), DataBaseError<R>>
where
R: Record + Send,
FP: Executor,
Expand Down Expand Up @@ -830,7 +829,11 @@ pub(crate) mod tests {
.unwrap();

let (sender, _) = bounded(1);
let mut version = Version::<Test, TokioExecutor>::new(option.clone(), sender);
let mut version = Version::<Test, TokioExecutor>::new(
option.clone(),
sender,
Arc::new(AtomicU32::default()),
);
version.level_slice[0].push(Scope {
min: 1.to_string(),
max: 3.to_string(),
Expand Down
8 changes: 4 additions & 4 deletions src/inmem/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
Timestamp, EPOCH,
},
wal::{log::LogType, WalFile},
DbOption, WriteError,
DataBaseError, DbOption,
};

pub(crate) type MutableScan<'scan, R> = Range<
Expand Down Expand Up @@ -67,7 +67,7 @@ where
log_ty: LogType,
record: R,
ts: Timestamp,
) -> Result<usize, WriteError<R>> {
) -> Result<usize, DataBaseError<R>> {
self.append(Some(log_ty), record.key().to_key(), ts, Some(record))
.await
}
Expand All @@ -77,7 +77,7 @@ where
log_ty: LogType,
key: R::Key,
ts: Timestamp,
) -> Result<usize, WriteError<R>> {
) -> Result<usize, DataBaseError<R>> {
self.append(Some(log_ty), key, ts, None).await
}

Expand All @@ -87,7 +87,7 @@ where
key: R::Key,
ts: Timestamp,
value: Option<R>,
) -> Result<usize, WriteError<R>> {
) -> Result<usize, DataBaseError<R>> {
let timestamped_key = Timestamped::new(key, ts);

if let Some(log_ty) = log_ty {
Expand Down
73 changes: 38 additions & 35 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,7 @@ use crate::{
serdes::Decode,
stream::{merge::MergeStream, Entry, ScanStream},
timestamp::Timestamped,
version::{
cleaner::Cleaner,
set::{transaction_ts, VersionSet},
Version, VersionError,
},
version::{cleaner::Cleaner, set::VersionSet, Version, VersionError},
wal::{log::LogType, RecoverError, WalFile},
};

Expand All @@ -73,17 +69,19 @@ where
R::Columns: Send + Sync,
E: Executor + Send + Sync + 'static,
{
pub async fn new(option: DbOption, executor: E) -> Result<Self, WriteError<R>> {
pub async fn new(option: DbOption, executor: E) -> Result<Self, DataBaseError<R>> {
let option = Arc::new(option);
E::create_dir_all(&option.path).await?;
E::create_dir_all(&option.wal_dir_path()).await?;

let (task_tx, task_rx) = bounded(1);
let schema = Arc::new(RwLock::new(Schema::new(option.clone(), task_tx).await?));

let (mut cleaner, clean_sender) = Cleaner::<E>::new(option.clone());

let version_set = VersionSet::new(clean_sender, option.clone()).await?;
let schema = Arc::new(RwLock::new(
Schema::new(option.clone(), task_tx, &version_set).await?,
));
let mut compactor =
Compactor::<R, E>::new(schema.clone(), option.clone(), version_set.clone());

Expand Down Expand Up @@ -122,7 +120,7 @@ where
)
}

pub(crate) async fn write(&self, record: R, ts: Timestamp) -> Result<(), WriteError<R>> {
pub(crate) async fn write(&self, record: R, ts: Timestamp) -> Result<(), DataBaseError<R>> {
let schema = self.schema.read().await;

if schema.write(LogType::Full, record, ts).await? {
Expand All @@ -136,7 +134,7 @@ where
&self,
mut records: impl ExactSizeIterator<Item = R>,
ts: Timestamp,
) -> Result<(), WriteError<R>> {
) -> Result<(), DataBaseError<R>> {
let schema = self.schema.read().await;

if let Some(first) = records.next() {
Expand Down Expand Up @@ -187,7 +185,8 @@ where
async fn new(
option: Arc<DbOption>,
compaction_tx: Sender<CompactTask>,
) -> Result<Self, WriteError<R>> {
version_set: &VersionSet<R, FP>,
) -> Result<Self, DataBaseError<R>> {
let mut schema = Schema {
mutable: Mutable::new(&option).await?,
immutables: Default::default(),
Expand All @@ -212,7 +211,7 @@ where
let is_excess = match log_type {
LogType::Full => {
schema
.recover_append(key, transaction_ts(), value_option)
.recover_append(key, version_set.transaction_ts(), value_option)
.await?
}
LogType::First => {
Expand All @@ -231,7 +230,7 @@ where
let mut records = transaction_map.remove(&ts).unwrap();
records.push((key, value_option));

let ts = transaction_ts();
let ts = version_set.transaction_ts();
for (key, value_option) in records {
is_excess = schema.recover_append(key, ts, value_option).await?;
}
Expand All @@ -253,7 +252,7 @@ where
log_ty: LogType,
record: R,
ts: Timestamp,
) -> Result<bool, WriteError<R>> {
) -> Result<bool, DataBaseError<R>> {
Ok(self.mutable.insert(log_ty, record, ts).await? > self.option.max_mem_table_size)
}

Expand All @@ -262,7 +261,7 @@ where
log_ty: LogType,
key: R::Key,
ts: Timestamp,
) -> Result<bool, WriteError<R>> {
) -> Result<bool, DataBaseError<R>> {
Ok(self.mutable.remove(log_ty, key, ts).await? > self.option.max_mem_table_size)
}

Expand All @@ -271,7 +270,7 @@ where
key: R::Key,
ts: Timestamp,
value: Option<R>,
) -> Result<bool, WriteError<R>> {
) -> Result<bool, DataBaseError<R>> {
Ok(self.mutable.append(None, key, ts, value).await? > self.option.max_mem_table_size)
}

Expand All @@ -281,7 +280,7 @@ where
key: &'get R::Key,
ts: Timestamp,
projection: Projection,
) -> Result<Option<Entry<'get, R>>, WriteError<R>>
) -> Result<Option<Entry<'get, R>>, DataBaseError<R>>
where
FP: FileProvider,
{
Expand Down Expand Up @@ -372,7 +371,7 @@ where

pub async fn take(
mut self,
) -> Result<impl Stream<Item = Result<Entry<'scan, R>, ParquetError>>, WriteError<R>> {
) -> Result<impl Stream<Item = Result<Entry<'scan, R>, ParquetError>>, DataBaseError<R>> {
self.streams.push(
self.schema
.mutable
Expand Down Expand Up @@ -401,7 +400,7 @@ where
}

#[derive(Debug, Error)]
pub enum WriteError<R>
pub enum DataBaseError<R>
where
R: Record,
{
Expand Down Expand Up @@ -451,13 +450,9 @@ pub(crate) mod tests {
inmem::{immutable::tests::TestImmutableArrays, mutable::Mutable},
record::{internal::InternalRecordRef, RecordDecodeError, RecordEncodeError, RecordRef},
serdes::{Decode, Encode},
version::{
cleaner::Cleaner,
set::{tests::build_version_set, transaction_ts},
Version,
},
version::{cleaner::Cleaner, set::tests::build_version_set, Version},
wal::log::LogType,
DbOption, Immutable, Projection, Record, WriteError, DB,
DataBaseError, DbOption, Immutable, Projection, Record, DB,
};

#[derive(Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -787,7 +782,7 @@ pub(crate) mod tests {
executor: E,
schema: crate::Schema<R, E>,
version: Version<R, E>,
) -> Result<DB<R, E>, WriteError<R>>
) -> Result<DB<R, E>, DataBaseError<R>>
where
R: Record + Send + Sync,
R::Columns: Send + Sync,
Expand Down Expand Up @@ -1068,24 +1063,32 @@ pub(crate) mod tests {
.await
.unwrap();

let (task_tx, task_rx) = bounded(1);
let (task_tx, _task_rx) = bounded(1);

let schema: crate::Schema<Test, TokioExecutor> =
crate::Schema::new(option.clone(), task_tx.clone())
.await
.unwrap();
let schema: crate::Schema<Test, TokioExecutor> = crate::Schema {
mutable: Mutable::new(&option).await.unwrap(),
immutables: Default::default(),
compaction_tx: task_tx.clone(),
option: option.clone(),
recover_wal_ids: None,
};

for item in test_items() {
for (i, item) in test_items().into_iter().enumerate() {
schema
.write(LogType::Full, item, transaction_ts())
.write(LogType::Full, item, (i as u32).into())
.await
.unwrap();
}
drop(schema);

let schema: crate::Schema<Test, TokioExecutor> =
crate::Schema::new(option.clone(), task_tx).await.unwrap();
let mut range = schema
let schema: crate::Schema<Test, TokioExecutor> = crate::Schema {
mutable: Mutable::new(&option).await.unwrap(),
immutables: Default::default(),
compaction_tx: task_tx,
option: option.clone(),
recover_wal_ids: None,
};
let range = schema
.mutable
.scan((Bound::Unbounded, Bound::Unbounded), u32::MAX.into());

Expand Down
16 changes: 8 additions & 8 deletions src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ use crate::{
record::{Key, KeyRef},
stream,
timestamp::{Timestamp, Timestamped},
version::{set::transaction_ts, VersionRef},
version::VersionRef,
wal::log::LogType,
LockMap, Projection, Record, Scan, Schema, WriteError,
DataBaseError, LockMap, Projection, Record, Scan, Schema,
};

pub(crate) struct TransactionScan<'scan, R: Record> {
Expand Down Expand Up @@ -64,7 +64,7 @@ where
lock_map: LockMap<R::Key>,
) -> Self {
Self {
ts: transaction_ts(),
ts: version.transaction_ts(),
local: BTreeMap::new(),
share,
version,
Expand All @@ -76,7 +76,7 @@ where
&'get self,
key: &'get R::Key,
projection: Projection,
) -> Result<Option<TransactionEntry<'get, R>>, WriteError<R>> {
) -> Result<Option<TransactionEntry<'get, R>>, DataBaseError<R>> {
Ok(match self.local.get(key).and_then(|v| v.as_ref()) {
Some(v) => Some(TransactionEntry::Local(v.as_record_ref())),
None => self
Expand Down Expand Up @@ -143,12 +143,12 @@ where
let is_excess = match len {
0 => false,
1 => {
let new_ts = transaction_ts();
let new_ts = self.version.transaction_ts();
let (key, record) = self.local.pop_first().unwrap();
Self::append(&self.share, LogType::Full, key, record, new_ts).await?
}
_ => {
let new_ts = transaction_ts();
let new_ts = self.version.transaction_ts();
let mut iter = self.local.into_iter();

let (key, record) = iter.next().unwrap();
Expand Down Expand Up @@ -214,8 +214,8 @@ where
Io(#[from] io::Error),
#[error("transaction parquet error {:?}", .0)]
Parquet(#[from] ParquetError),
#[error("transaction write error {:?}", .0)]
Write(#[from] WriteError<R>),
#[error("transaction database error {:?}", .0)]
DataBase(#[from] DataBaseError<R>),
#[error("transaction write conflict: {:?}", .0)]
WriteConflict(R::Key),
}
Expand Down
Loading

0 comments on commit cefc22c

Please sign in to comment.