Skip to content

Commit

Permalink
refactor: rename Schema to DbStorage
Browse files Browse the repository at this point in the history
  • Loading branch information
ethe committed Dec 17, 2024
1 parent 69f639b commit b450e8e
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 46 deletions.
6 changes: 3 additions & 3 deletions src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
version::{
edit::VersionEdit, set::VersionSet, TransactionTs, Version, VersionError, MAX_LEVEL,
},
DbOption, ParquetLru, Schema,
DbOption, DbStorage, ParquetLru,
};

#[derive(Debug)]
Expand All @@ -36,7 +36,7 @@ where
R: Record,
{
pub(crate) option: Arc<DbOption>,
pub(crate) schema: Arc<RwLock<Schema<R>>>,
pub(crate) schema: Arc<RwLock<DbStorage<R>>>,
pub(crate) version_set: VersionSet<R>,
pub(crate) manager: Arc<StoreManager>,
pub(crate) record_schema: Arc<R::Schema>,
Expand All @@ -47,7 +47,7 @@ where
R: Record,
{
pub(crate) fn new(
schema: Arc<RwLock<Schema<R>>>,
schema: Arc<RwLock<DbStorage<R>>>,
record_schema: Arc<R::Schema>,
option: Arc<DbOption>,
version_set: VersionSet<R>,
Expand Down
66 changes: 30 additions & 36 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ use crate::{
compaction::{CompactTask, CompactionError, Compactor},
executor::Executor,
fs::{manager::StoreManager, parse_file_id, FileType},
record::Schema as RecordSchema,
record::Schema,
serdes::Decode,
snapshot::Snapshot,
stream::{
Expand All @@ -187,9 +187,9 @@ where
R: Record,
E: Executor,
{
schema: Arc<RwLock<Schema<R>>>,
schema: Arc<RwLock<DbStorage<R>>>,
version_set: VersionSet<R>,
lock_map: LockMap<<R::Schema as record::Schema>::Key>,
lock_map: LockMap<<R::Schema as Schema>::Key>,
manager: Arc<StoreManager>,
parquet_lru: ParquetLru,
_p: PhantomData<E>,
Expand All @@ -198,7 +198,7 @@ where
impl<R, E> DB<R, E>
where
R: Record + Send + Sync,
<R::Schema as RecordSchema>::Columns: Send + Sync,
<R::Schema as Schema>::Columns: Send + Sync,
E: Executor + Send + Sync + 'static,
{
/// Open [`DB`] with a [`DbOption`]. This will create a new directory at the
Expand All @@ -220,7 +220,7 @@ where
impl<R, E> DB<R, E>
where
R: Record + Send + Sync,
<R::Schema as RecordSchema>::Columns: Send + Sync,
<R::Schema as Schema>::Columns: Send + Sync,
E: Executor + Send + Sync + 'static,
{
async fn build(
Expand Down Expand Up @@ -252,7 +252,7 @@ where

let version_set = VersionSet::new(clean_sender, option.clone(), manager.clone()).await?;
let schema = Arc::new(RwLock::new(
Schema::new(
DbStorage::new(
option.clone(),
task_tx,
&version_set,
Expand Down Expand Up @@ -341,10 +341,7 @@ where
}

/// delete the record with the primary key as the `key`
pub async fn remove(
&self,
key: <R::Schema as RecordSchema>::Key,
) -> Result<bool, CommitError<R>> {
pub async fn remove(&self, key: <R::Schema as Schema>::Key) -> Result<bool, CommitError<R>> {
Ok(self
.schema
.read()
Expand All @@ -368,7 +365,7 @@ where
/// get the record with `key` as the primary key and process it using closure `f`
pub async fn get<T>(
&self,
key: &<R::Schema as RecordSchema>::Key,
key: &<R::Schema as Schema>::Key,
mut f: impl FnMut(TransactionEntry<'_, R>) -> Option<T>,
) -> Result<Option<T>, CommitError<R>> {
Ok(self
Expand Down Expand Up @@ -397,8 +394,8 @@ where
pub async fn scan<'scan, T: 'scan>(
&'scan self,
range: (
Bound<&'scan <R::Schema as RecordSchema>::Key>,
Bound<&'scan <R::Schema as RecordSchema>::Key>,
Bound<&'scan <R::Schema as Schema>::Key>,
Bound<&'scan <R::Schema as Schema>::Key>,
),
mut f: impl FnMut(TransactionEntry<'_, R>) -> T + 'scan,
) -> impl Stream<Item = Result<T, CommitError<R>>> + 'scan {
Expand Down Expand Up @@ -467,22 +464,19 @@ where
}
}

pub(crate) struct Schema<R>
pub(crate) struct DbStorage<R>
where
R: Record,
{
pub mutable: Mutable<R>,
pub immutables: Vec<(
Option<FileId>,
Immutable<<R::Schema as RecordSchema>::Columns>,
)>,
pub immutables: Vec<(Option<FileId>, Immutable<<R::Schema as Schema>::Columns>)>,
compaction_tx: Sender<CompactTask>,
recover_wal_ids: Option<Vec<FileId>>,
trigger: Arc<Box<dyn Trigger<R> + Send + Sync>>,
record_schema: Arc<R::Schema>,
}

impl<R> Schema<R>
impl<R> DbStorage<R>
where
R: Record + Send,
{
Expand All @@ -494,7 +488,7 @@ where
manager: &StoreManager,
) -> Result<Self, DbError<R>> {
let trigger = Arc::new(TriggerFactory::create(option.trigger_type));
let mut schema = Schema {
let mut schema = DbStorage {
mutable: Mutable::new(
&option,
trigger.clone(),
Expand Down Expand Up @@ -586,15 +580,15 @@ where
async fn remove(
&self,
log_ty: LogType,
key: <R::Schema as RecordSchema>::Key,
key: <R::Schema as Schema>::Key,
ts: Timestamp,
) -> Result<bool, DbError<R>> {
self.mutable.remove(log_ty, key, ts).await
}

async fn recover_append(
&self,
key: <R::Schema as RecordSchema>::Key,
key: <R::Schema as Schema>::Key,
ts: Timestamp,
value: Option<R>,
) -> Result<bool, DbError<R>> {
Expand All @@ -605,7 +599,7 @@ where
&'get self,
version: &'get Version<R>,
manager: &StoreManager,
key: &'get <R::Schema as RecordSchema>::Key,
key: &'get <R::Schema as Schema>::Key,
ts: Timestamp,
projection: Projection,
parquet_lru: ParquetLru,
Expand Down Expand Up @@ -652,7 +646,7 @@ where
.map(|entry| Entry::RecordBatch(entry)))
}

fn check_conflict(&self, key: &<R::Schema as RecordSchema>::Key, ts: Timestamp) -> bool {
fn check_conflict(&self, key: &<R::Schema as Schema>::Key, ts: Timestamp) -> bool {
self.mutable.check_conflict(key, ts)
|| self
.immutables
Expand All @@ -673,10 +667,10 @@ where
R: Record,
'range: 'scan,
{
schema: &'scan Schema<R>,
schema: &'scan DbStorage<R>,
manager: &'scan StoreManager,
lower: Bound<&'range <R::Schema as RecordSchema>::Key>,
upper: Bound<&'range <R::Schema as RecordSchema>::Key>,
lower: Bound<&'range <R::Schema as Schema>::Key>,
upper: Bound<&'range <R::Schema as Schema>::Key>,
ts: Timestamp,

version: &'scan Version<R>,
Expand All @@ -695,11 +689,11 @@ where
R: Record + Send,
{
fn new(
schema: &'scan Schema<R>,
schema: &'scan DbStorage<R>,
manager: &'scan StoreManager,
(lower, upper): (
Bound<&'range <R::Schema as RecordSchema>::Key>,
Bound<&'range <R::Schema as RecordSchema>::Key>,
Bound<&'range <R::Schema as Schema>::Key>,
Bound<&'range <R::Schema as Schema>::Key>,
),
ts: Timestamp,
version: &'scan Version<R>,
Expand Down Expand Up @@ -811,7 +805,7 @@ where
self,
batch_size: usize,
) -> Result<
impl Stream<Item = Result<<R::Schema as RecordSchema>::Columns, ParquetError>> + 'scan,
impl Stream<Item = Result<<R::Schema as Schema>::Columns, ParquetError>> + 'scan,
DbError<R>,
> {
let mut streams = Vec::new();
Expand Down Expand Up @@ -1174,7 +1168,7 @@ pub(crate) mod tests {
pub(crate) async fn build_schema(
option: Arc<DbOption>,
fs: &Arc<dyn DynFs>,
) -> Result<(crate::Schema<Test>, Receiver<CompactTask>), fusio::Error> {
) -> Result<(crate::DbStorage<Test>, Receiver<CompactTask>), fusio::Error> {
let trigger = Arc::new(TriggerFactory::create(option.trigger_type));

let mutable = Mutable::new(&option, trigger.clone(), fs, Arc::new(TestSchema {})).await?;
Expand Down Expand Up @@ -1268,7 +1262,7 @@ pub(crate) mod tests {
let (compaction_tx, compaction_rx) = bounded(1);

Ok((
crate::Schema {
crate::DbStorage {
mutable,
immutables,
compaction_tx,
Expand All @@ -1284,7 +1278,7 @@ pub(crate) mod tests {
option: Arc<DbOption>,
compaction_rx: Receiver<CompactTask>,
executor: E,
schema: crate::Schema<R>,
schema: crate::DbStorage<R>,
record_schema: Arc<R::Schema>,
version: Version<R>,
manager: Arc<StoreManager>,
Expand Down Expand Up @@ -1655,7 +1649,7 @@ pub(crate) mod tests {
let (task_tx, _task_rx) = bounded(1);

let trigger = Arc::new(TriggerFactory::create(option.trigger_type));
let schema: crate::Schema<Test> = crate::Schema {
let schema: crate::DbStorage<Test> = crate::DbStorage {
mutable: Mutable::new(&option, trigger.clone(), &fs, Arc::new(TestSchema))
.await
.unwrap(),
Expand Down Expand Up @@ -1725,7 +1719,7 @@ pub(crate) mod tests {
let (task_tx, _task_rx) = bounded(1);

let trigger = Arc::new(TriggerFactory::create(option.trigger_type));
let schema: crate::Schema<DynRecord> = crate::Schema {
let schema: crate::DbStorage<DynRecord> = crate::DbStorage {
mutable: Mutable::new(
&option,
trigger.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/magic.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
pub const TS: &str = "_ts";
pub const USER_COLUMN_OFFSET: usize = 2;
pub(crate) const USER_COLUMN_OFFSET: usize = 2;
8 changes: 4 additions & 4 deletions src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ use crate::{
stream::ScanStream,
timestamp::Timestamp,
version::{TransactionTs, VersionRef},
DbError, ParquetLru, Projection, Scan, Schema,
DbError, DbStorage, ParquetLru, Projection, Scan,
};

pub struct Snapshot<'s, R>
where
R: Record,
{
ts: Timestamp,
share: RwLockReadGuard<'s, Schema<R>>,
share: RwLockReadGuard<'s, DbStorage<R>>,
version: VersionRef<R>,
manager: Arc<StoreManager>,
parquet_lru: ParquetLru,
Expand Down Expand Up @@ -72,7 +72,7 @@ where
}

pub(crate) fn new(
share: RwLockReadGuard<'s, Schema<R>>,
share: RwLockReadGuard<'s, DbStorage<R>>,
version: VersionRef<R>,
manager: Arc<StoreManager>,
parquet_lru: ParquetLru,
Expand All @@ -94,7 +94,7 @@ where
self.version.increase_ts()
}

pub(crate) fn schema(&self) -> &Schema<R> {
pub(crate) fn schema(&self) -> &DbStorage<R> {
&self.share
}

Expand Down
4 changes: 2 additions & 2 deletions src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{
stream::mem_projection::MemProjectionStream,
timestamp::{Timestamp, Timestamped},
wal::log::LogType,
DbError, LockMap, Projection, Record, Scan, Schema,
DbError, LockMap, Projection, Record, Scan, DbStorage,
};

pub(crate) struct TransactionScan<'scan, R: Record> {
Expand Down Expand Up @@ -186,7 +186,7 @@ where
}

async fn append(
schema: &Schema<R>,
schema: &DbStorage<R>,
log_ty: LogType,
key: <R::Schema as RecordSchema>::Key,
record: Option<R>,
Expand Down

0 comments on commit b450e8e

Please sign in to comment.