From 24f052bdf0509c5773cccf3238009abe2984d754 Mon Sep 17 00:00:00 2001 From: Kould Date: Thu, 18 Jul 2024 17:35:35 +0800 Subject: [PATCH] fix: `SsTableScan` returns only the first element --- src/compaction/mod.rs | 8 +++++--- src/ondisk/sstable.rs | 28 ++++++++++++++++++---------- src/stream/level.rs | 19 +++++++++++++++---- src/version/mod.rs | 6 +++++- 4 files changed, 43 insertions(+), 18 deletions(-) diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index 5fc4d437..7c58ddec 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -218,13 +218,13 @@ where streams.push(ScanStream::SsTable { inner: SsTable::open(file) - .scan((Bound::Unbounded, Bound::Unbounded), u32::MAX.into()) + .scan((Bound::Unbounded, Bound::Unbounded), u32::MAX.into(), None) .await .map_err(CompactionError::Parquet)?, }); } } else { - let (lower, upper) = Self::full_scope(&mut meet_scopes_l)?; + let (lower, upper) = Self::full_scope(&meet_scopes_l)?; let level_scan_l = LevelStream::new( version, level, @@ -232,6 +232,7 @@ where end_l, (Bound::Included(lower), Bound::Included(upper)), u32::MAX.into(), + None, ) .ok_or(CompactionError::EmptyLevel)?; @@ -240,7 +241,7 @@ where }); } // Next Level - let (lower, upper) = Self::full_scope(&mut meet_scopes_ll)?; + let (lower, upper) = Self::full_scope(&meet_scopes_ll)?; let level_scan_ll = LevelStream::new( version, level + 1, @@ -248,6 +249,7 @@ where end_ll, (Bound::Included(lower), Bound::Included(upper)), u32::MAX.into(), + None, ) .ok_or(CompactionError::EmptyLevel)?; diff --git a/src/ondisk/sstable.rs b/src/ondisk/sstable.rs index 59b2cb83..78ffde81 100644 --- a/src/ondisk/sstable.rs +++ b/src/ondisk/sstable.rs @@ -75,33 +75,41 @@ where async fn into_parquet_builder( self, - limit: usize, + limit: Option, ) -> parquet::errors::Result>>> { - Ok(ParquetRecordBatchStreamBuilder::new_with_options( + let mut builder = ParquetRecordBatchStreamBuilder::new_with_options( self.file.compat(), ArrowReaderOptions::default().with_page_index(true), ) - .await? - .with_limit(limit)) + .await?; + if let Some(limit) = limit { + builder = builder.with_limit(limit); + } + Ok(builder) } pub(crate) async fn get( self, key: &TimestampedRef, ) -> parquet::errors::Result>> { - self.scan((Bound::Included(key.value()), Bound::Unbounded), key.ts()) - .await? - .next() - .await - .transpose() + self.scan( + (Bound::Included(key.value()), Bound::Unbounded), + key.ts(), + Some(1), + ) + .await? + .next() + .await + .transpose() } pub(crate) async fn scan<'scan>( self, range: (Bound<&'scan R::Key>, Bound<&'scan R::Key>), ts: Timestamp, + limit: Option, ) -> Result, parquet::errors::ParquetError> { - let builder = self.into_parquet_builder(1).await?; + let builder = self.into_parquet_builder(limit).await?; let schema_descriptor = builder.metadata().file_metadata().schema_descr(); let filter = unsafe { get_range_filter::(schema_descriptor, range, ts) }; diff --git a/src/stream/level.rs b/src/stream/level.rs index 50de6284..a05ab981 100644 --- a/src/stream/level.rs +++ b/src/stream/level.rs @@ -43,6 +43,7 @@ where ts: Timestamp, option: Arc, gens: VecDeque, + limit: Option, status: FutureStatus<'level, R, E>, } @@ -59,6 +60,7 @@ where end: usize, range: (Bound<&'level R::Key>, Bound<&'level R::Key>), ts: Timestamp, + limit: Option, ) -> Option { let (lower, upper) = range; let mut gens: VecDeque = version.level_slice[level][start..end + 1] @@ -74,6 +76,7 @@ where ts, option: version.option().clone(), gens, + limit, status, }) } @@ -105,13 +108,21 @@ where continue; } }, - poll => poll, + Poll::Ready(Some(result)) => { + if let Some(limit) = &mut self.limit { + *limit -= 1; + } + Poll::Ready(Some(result)) + } + Poll::Pending => Poll::Pending, }, FutureStatus::OpenFile(file_future) => match Pin::new(file_future).poll(cx) { Poll::Ready(Ok(file)) => { - self.status = FutureStatus::LoadStream(Box::pin( - SsTable::open(file).scan((self.lower, self.upper), self.ts), - )); + self.status = FutureStatus::LoadStream(Box::pin(SsTable::open(file).scan( + (self.lower, self.upper), + self.ts, + self.limit, + ))); continue; } Poll::Ready(Err(err)) => Poll::Ready(Some(Err(ParquetError::from(err)))), diff --git a/src/version/mod.rs b/src/version/mod.rs index a22066c6..f4b9fc45 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -140,6 +140,7 @@ where iters: &mut Vec>, range: (Bound<&'iters R::Key>, Bound<&'iters R::Key>), ts: Timestamp, + limit: Option, ) -> Result<(), VersionError> { for scope in self.level_slice[0].iter() { let file = E::open(self.option.table_path(&scope.gen)) @@ -148,7 +149,10 @@ where let table = SsTable::open(file); iters.push(ScanStream::SsTable { - inner: table.scan(range, ts).await.map_err(VersionError::Parquet)?, + inner: table + .scan(range, ts, limit) + .await + .map_err(VersionError::Parquet)?, }) } for scopes in self.level_slice[1..].iter() {