Skip to content

Commit

Permalink
chore: Transaction::get supplementary projection parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Jul 24, 2024
1 parent b149cee commit 22862e5
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 13 deletions.
24 changes: 15 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,10 @@ where
&'get self,
key: &'get R::Key,
ts: Timestamp,
projection_mask: Option<Vec<usize>>,
) -> Result<Option<Entry<'get, R>>, ParquetError> {
self.scan(Bound::Included(key), Bound::Unbounded, ts)
.project(projection_mask)
.take()
.await?
.next()
Expand Down Expand Up @@ -271,16 +273,20 @@ where
}
}

pub fn project(self, mut projection: Vec<usize>) -> Self {
// skip two columns: _null and _ts
for p in &mut projection {
*p += 2;
}
pub fn project(self, projection: Option<Vec<usize>>) -> Self {
let mask = projection
.map(|mut projection| {
// skip two columns: _null and _ts
for p in &mut projection {
*p += 2;
}

let mask = ProjectionMask::roots(
&arrow_to_parquet_schema(R::arrow_schema()).unwrap(),
projection,
);
ProjectionMask::roots(
&arrow_to_parquet_schema(R::arrow_schema()).unwrap(),
projection,
)
})
.unwrap_or_else(ProjectionMask::all);

Self {
projection: mask,
Expand Down
9 changes: 5 additions & 4 deletions src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,13 @@ where
pub async fn get<'get>(
&'get self,
key: &'get R::Key,
projection_mask: Option<Vec<usize>>,
) -> Result<Option<TransactionEntry<'get, R>>, ParquetError> {
Ok(match self.local.get(key).and_then(|v| v.as_ref()) {
Some(v) => Some(TransactionEntry::Local(v.as_record_ref())),
None => self
.share
.get(key, self.ts)
.get(key, self.ts, projection_mask)
.await?
.map(TransactionEntry::Stream),
})
Expand Down Expand Up @@ -168,15 +169,15 @@ mod tests {
txn1.set("foo".to_string());

let txn2 = db.transaction().await;
dbg!(txn2.get(&"foo".to_string()).await.unwrap().is_none());
dbg!(txn2.get(&"foo".to_string(), None).await.unwrap().is_none());

txn1.commit().await.unwrap();
txn2.commit().await.unwrap();
}

{
let txn3 = db.transaction().await;
dbg!(txn3.get(&"foo".to_string()).await.unwrap().is_none());
dbg!(txn3.get(&"foo".to_string(), None).await.unwrap().is_none());
txn3.commit().await.unwrap();
}
}
Expand Down Expand Up @@ -235,7 +236,7 @@ mod tests {
});

let key = 0.to_string();
let entry = txn1.get(&key).await.unwrap().unwrap();
let entry = txn1.get(&key, None).await.unwrap().unwrap();

assert_eq!(entry.get().vstring, 0.to_string());
assert_eq!(entry.get().vu32, Some(0));
Expand Down

0 comments on commit 22862e5

Please sign in to comment.