Skip to content

Commit

Permalink
chore: Transaction::get supplementary projection parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould authored and ethe committed Jul 24, 2024
1 parent 68a9795 commit d10ffa5
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 13 deletions.
20 changes: 13 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ use crate::{

type LockMap<K> = Arc<LockableHashMap<K, ()>>;

pub enum Projection {
All,
Some(Vec<usize>),
}

#[derive(Debug)]
pub struct DbOption {
pub path: PathBuf,
Expand Down Expand Up @@ -205,13 +210,14 @@ where
&'get self,
key: &'get R::Key,
ts: Timestamp,
projection: Projection,
) -> Result<Option<Entry<'get, R>>, ParquetError> {
self.scan(Bound::Included(key), Bound::Unbounded, ts)
.take()
.await?
.next()
.await
.transpose()
let mut scan = self.scan(Bound::Included(key), Bound::Unbounded, ts);

if let Projection::Some(projection) = projection {
scan = scan.projection(projection)
}
scan.take().await?.next().await.transpose()
}

fn scan<'scan>(
Expand Down Expand Up @@ -271,7 +277,7 @@ where
}
}

pub fn project(self, mut projection: Vec<usize>) -> Self {
pub fn projection(self, mut projection: Vec<usize>) -> Self {
// skip two columns: _null and _ts
for p in &mut projection {
*p += 2;
Expand Down
22 changes: 16 additions & 6 deletions src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
stream,
timestamp::Timestamp,
version::{set::transaction_ts, VersionRef},
LockMap, Record, Schema,
LockMap, Projection, Record, Schema,
};

pub struct Transaction<'txn, R, FP>
Expand Down Expand Up @@ -52,12 +52,13 @@ where
pub async fn get<'get>(
&'get self,
key: &'get R::Key,
projection: Projection,
) -> Result<Option<TransactionEntry<'get, R>>, ParquetError> {
Ok(match self.local.get(key).and_then(|v| v.as_ref()) {
Some(v) => Some(TransactionEntry::Local(v.as_record_ref())),
None => self
.share
.get(key, self.ts)
.get(key, self.ts, projection)
.await?
.map(TransactionEntry::Stream),
})
Expand Down Expand Up @@ -150,7 +151,8 @@ mod tests {
use tempfile::TempDir;

use crate::{
executor::tokio::TokioExecutor, tests::Test, transaction::CommitError, DbOption, DB,
executor::tokio::TokioExecutor, tests::Test, transaction::CommitError, DbOption,
Projection, DB,
};

#[tokio::test]
Expand All @@ -168,15 +170,23 @@ mod tests {
txn1.set("foo".to_string());

let txn2 = db.transaction().await;
dbg!(txn2.get(&"foo".to_string()).await.unwrap().is_none());
dbg!(txn2
.get(&"foo".to_string(), Projection::All)
.await
.unwrap()
.is_none());

txn1.commit().await.unwrap();
txn2.commit().await.unwrap();
}

{
let txn3 = db.transaction().await;
dbg!(txn3.get(&"foo".to_string()).await.unwrap().is_none());
dbg!(txn3
.get(&"foo".to_string(), Projection::All)
.await
.unwrap()
.is_none());
txn3.commit().await.unwrap();
}
}
Expand Down Expand Up @@ -235,7 +245,7 @@ mod tests {
});

let key = 0.to_string();
let entry = txn1.get(&key).await.unwrap().unwrap();
let entry = txn1.get(&key, Projection::All).await.unwrap().unwrap();

assert_eq!(entry.get().vstring, 0.to_string());
assert_eq!(entry.get().vu32, Some(0));
Expand Down

0 comments on commit d10ffa5

Please sign in to comment.