Skip to content

Commit

Permalink
fix: SsTableScan returns only the first element
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Jul 18, 2024
1 parent 4eb9c44 commit 24f052b
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 18 deletions.
8 changes: 5 additions & 3 deletions src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,20 +218,21 @@ where

streams.push(ScanStream::SsTable {
inner: SsTable::open(file)
.scan((Bound::Unbounded, Bound::Unbounded), u32::MAX.into())
.scan((Bound::Unbounded, Bound::Unbounded), u32::MAX.into(), None)
.await
.map_err(CompactionError::Parquet)?,
});
}
} else {
let (lower, upper) = Self::full_scope(&mut meet_scopes_l)?;
let (lower, upper) = Self::full_scope(&meet_scopes_l)?;
let level_scan_l = LevelStream::new(
version,
level,
start_l,
end_l,
(Bound::Included(lower), Bound::Included(upper)),
u32::MAX.into(),
None,
)
.ok_or(CompactionError::EmptyLevel)?;

Expand All @@ -240,14 +241,15 @@ where
});
}
// Next Level
let (lower, upper) = Self::full_scope(&mut meet_scopes_ll)?;
let (lower, upper) = Self::full_scope(&meet_scopes_ll)?;
let level_scan_ll = LevelStream::new(
version,
level + 1,
start_ll,
end_ll,
(Bound::Included(lower), Bound::Included(upper)),
u32::MAX.into(),
None,
)
.ok_or(CompactionError::EmptyLevel)?;

Expand Down
28 changes: 18 additions & 10 deletions src/ondisk/sstable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,33 +75,41 @@ where

async fn into_parquet_builder(
self,
limit: usize,
limit: Option<usize>,
) -> parquet::errors::Result<ArrowReaderBuilder<AsyncReader<Compat<E::File>>>> {
Ok(ParquetRecordBatchStreamBuilder::new_with_options(
let mut builder = ParquetRecordBatchStreamBuilder::new_with_options(
self.file.compat(),
ArrowReaderOptions::default().with_page_index(true),
)
.await?
.with_limit(limit))
.await?;
if let Some(limit) = limit {
builder = builder.with_limit(limit);
}
Ok(builder)
}

pub(crate) async fn get(
self,
key: &TimestampedRef<R::Key>,
) -> parquet::errors::Result<Option<RecordBatchEntry<R>>> {
self.scan((Bound::Included(key.value()), Bound::Unbounded), key.ts())
.await?
.next()
.await
.transpose()
self.scan(
(Bound::Included(key.value()), Bound::Unbounded),
key.ts(),
Some(1),
)
.await?
.next()
.await
.transpose()
}

pub(crate) async fn scan<'scan>(
self,
range: (Bound<&'scan R::Key>, Bound<&'scan R::Key>),
ts: Timestamp,
limit: Option<usize>,
) -> Result<SsTableScan<R, E>, parquet::errors::ParquetError> {
let builder = self.into_parquet_builder(1).await?;
let builder = self.into_parquet_builder(limit).await?;

let schema_descriptor = builder.metadata().file_metadata().schema_descr();
let filter = unsafe { get_range_filter::<R>(schema_descriptor, range, ts) };
Expand Down
19 changes: 15 additions & 4 deletions src/stream/level.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ where
ts: Timestamp,
option: Arc<DbOption>,
gens: VecDeque<FileId>,
limit: Option<usize>,
status: FutureStatus<'level, R, E>,
}

Expand All @@ -59,6 +60,7 @@ where
end: usize,
range: (Bound<&'level R::Key>, Bound<&'level R::Key>),
ts: Timestamp,
limit: Option<usize>,
) -> Option<Self> {
let (lower, upper) = range;
let mut gens: VecDeque<FileId> = version.level_slice[level][start..end + 1]
Expand All @@ -74,6 +76,7 @@ where
ts,
option: version.option().clone(),
gens,
limit,
status,
})
}
Expand Down Expand Up @@ -105,13 +108,21 @@ where
continue;
}
},
poll => poll,
Poll::Ready(Some(result)) => {
if let Some(limit) = &mut self.limit {
*limit -= 1;
}
Poll::Ready(Some(result))
}
Poll::Pending => Poll::Pending,
},
FutureStatus::OpenFile(file_future) => match Pin::new(file_future).poll(cx) {
Poll::Ready(Ok(file)) => {
self.status = FutureStatus::LoadStream(Box::pin(
SsTable::open(file).scan((self.lower, self.upper), self.ts),
));
self.status = FutureStatus::LoadStream(Box::pin(SsTable::open(file).scan(
(self.lower, self.upper),
self.ts,
self.limit,
)));
continue;
}
Poll::Ready(Err(err)) => Poll::Ready(Some(Err(ParquetError::from(err)))),
Expand Down
6 changes: 5 additions & 1 deletion src/version/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ where
iters: &mut Vec<ScanStream<'iters, R, E>>,
range: (Bound<&'iters R::Key>, Bound<&'iters R::Key>),
ts: Timestamp,
limit: Option<usize>,
) -> Result<(), VersionError<R>> {
for scope in self.level_slice[0].iter() {
let file = E::open(self.option.table_path(&scope.gen))
Expand All @@ -148,7 +149,10 @@ where
let table = SsTable::open(file);

iters.push(ScanStream::SsTable {
inner: table.scan(range, ts).await.map_err(VersionError::Parquet)?,
inner: table
.scan(range, ts, limit)
.await
.map_err(VersionError::Parquet)?,
})
}
for scopes in self.level_slice[1..].iter() {
Expand Down

0 comments on commit 24f052b

Please sign in to comment.