From de5800444474f31fffde964fae04c62b867906a2 Mon Sep 17 00:00:00 2001 From: Kould Date: Wed, 31 Jul 2024 17:13:46 +0800 Subject: [PATCH] feat: impl `PackageStream` for packaging data into RecordBatch --- src/compaction/mod.rs | 2 +- src/inmem/immutable.rs | 13 +++- src/lib.rs | 42 ++++++++++- src/record/str.rs | 2 +- src/stream/mod.rs | 1 + src/stream/package.rs | 168 +++++++++++++++++++++++++++++++++++++++++ 6 files changed, 220 insertions(+), 8 deletions(-) create mode 100644 src/stream/package.rs diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index 891886bb..1d85113b 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -389,7 +389,7 @@ where debug_assert!(max.is_some()); let gen = Ulid::new(); - let columns = builder.finish(); + let columns = builder.finish(None); let mut writer = AsyncArrowWriter::try_new( FP::open(option.table_path(&gen)).await?.compat(), R::arrow_schema().clone(), diff --git a/src/inmem/immutable.rs b/src/inmem/immutable.rs index 4fddf32f..50b7ce35 100644 --- a/src/inmem/immutable.rs +++ b/src/inmem/immutable.rs @@ -42,7 +42,7 @@ where fn written_size(&self) -> usize; - fn finish(&mut self) -> S; + fn finish(&mut self, indices: Option<&[usize]>) -> S; } pub(crate) struct Immutable @@ -70,7 +70,7 @@ where index.insert(key, offset as u32); } - let data = builder.finish(); + let data = builder.finish(None); Self { data, index } } @@ -294,13 +294,13 @@ pub(crate) mod tests { + mem::size_of_val(self.vobool.values_slice()) } - fn finish(&mut self) -> TestImmutableArrays { + fn finish(&mut self, indices: Option<&[usize]>) -> TestImmutableArrays { let vstring = Arc::new(self.vstring.finish()); let vu32 = Arc::new(self.vu32.finish()); let vbool = Arc::new(self.vobool.finish()); let _null = Arc::new(BooleanArray::new(self._null.finish(), None)); let _ts = Arc::new(self._ts.finish()); - let record_batch = RecordBatch::try_new( + let mut record_batch = RecordBatch::try_new( Arc::clone( <::Record as Record>::arrow_schema(), ), @@ -313,6 +313,11 @@ pub(crate) mod tests { ], ) .expect("create record batch must be successful"); + if let Some(indices) = indices { + record_batch = record_batch + .project(indices) + .expect("projection indices must be successful"); + } TestImmutableArrays { vstring, diff --git a/src/lib.rs b/src/lib.rs index eac63c69..48955e68 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -46,7 +46,7 @@ use crate::{ executor::Executor, fs::{FileId, FileType}, serdes::Decode, - stream::{merge::MergeStream, Entry, ScanStream}, + stream::{merge::MergeStream, package::PackageStream, Entry, ScanStream}, timestamp::Timestamped, version::{cleaner::Cleaner, set::VersionSet, Version, VersionError}, wal::{log::LogType, RecoverError, WalFile}, @@ -321,6 +321,7 @@ where streams: Vec>, limit: Option, + projection_indices: Option>, projection: ProjectionMask, } @@ -344,6 +345,7 @@ where version, streams, limit: None, + projection_indices: None, projection: ProjectionMask::all(), } } @@ -360,11 +362,12 @@ where projection.extend([0, 1, R::primary_key_index()]); let mask = ProjectionMask::roots( &arrow_to_parquet_schema(R::arrow_schema()).unwrap(), - projection, + projection.clone(), ); Self { projection: mask, + projection_indices: Some(projection), ..self } } @@ -397,6 +400,41 @@ where Ok(MergeStream::from_vec(self.streams).await?) } + + pub async fn package( + mut self, + batch_size: usize, + ) -> Result> + 'scan, WriteError> { + self.streams.push( + self.schema + .mutable + .scan((self.lower, self.upper), self.ts) + .into(), + ); + for immutable in &self.schema.immutables { + self.streams.push( + immutable + .scan((self.lower, self.upper), self.ts, self.projection.clone()) + .into(), + ); + } + self.version + .streams( + &mut self.streams, + (self.lower, self.upper), + self.ts, + self.limit, + self.projection, + ) + .await?; + let merge_stream = MergeStream::from_vec(self.streams).await?; + + Ok(PackageStream::new( + batch_size, + merge_stream, + self.projection_indices, + )) + } } #[derive(Debug, Error)] diff --git a/src/record/str.rs b/src/record/str.rs index 24c13fe4..f3faa2b5 100644 --- a/src/record/str.rs +++ b/src/record/str.rs @@ -157,7 +157,7 @@ impl Builder for StringColumnsBuilder { + self.string.values_slice().len() } - fn finish(&mut self) -> StringColumns { + fn finish(&mut self, _: Option<&[usize]>) -> StringColumns { let _null = Arc::new(BooleanArray::new(self._null.finish(), None)); let _ts = Arc::new(self._ts.finish()); let string = Arc::new(self.string.finish()); diff --git a/src/stream/mod.rs b/src/stream/mod.rs index b5ed8f2d..219534cd 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -1,5 +1,6 @@ pub(crate) mod level; pub(crate) mod merge; +pub(crate) mod package; pub(crate) mod record_batch; use std::{ diff --git a/src/stream/package.rs b/src/stream/package.rs new file mode 100644 index 00000000..79fa6540 --- /dev/null +++ b/src/stream/package.rs @@ -0,0 +1,168 @@ +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use futures_core::Stream; +use pin_project_lite::pin_project; + +use crate::{ + fs::FileProvider, + inmem::immutable::{ArrowArrays, Builder}, + record::Record, + stream::merge::MergeStream, +}; + +pin_project! { + pub struct PackageStream<'package, R, FP> + where + R: Record, + FP: FileProvider, + { + row_count: usize, + batch_size: usize, + inner: MergeStream<'package, R, FP>, + builder: ::Builder, + projection_indices: Option>, + } +} + +impl<'package, R, FP> PackageStream<'package, R, FP> +where + R: Record, + FP: FileProvider + 'package, +{ + pub(crate) fn new( + batch_size: usize, + merge: MergeStream<'package, R, FP>, + projection_indices: Option>, + ) -> Self { + Self { + row_count: 0, + batch_size, + inner: merge, + builder: R::Columns::builder(batch_size), + projection_indices, + } + } +} + +impl<'package, R, FP> Stream for PackageStream<'package, R, FP> +where + R: Record, + FP: FileProvider + 'package, +{ + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut project = self.project(); + + while project.row_count <= project.batch_size { + match Pin::new(&mut project.inner).poll_next(cx) { + Poll::Ready(Some(Ok(entry))) => { + if let Some(record) = entry.value() { + // filter null + project.builder.push(entry.key(), Some(record)); + *project.row_count += 1; + } + } + Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err))), + Poll::Ready(None) => break, + Poll::Pending => return Poll::Pending, + } + } + Poll::Ready((*project.row_count != 0).then(|| { + *project.row_count = 0; + Ok(project + .builder + .finish(project.projection_indices.as_ref().map(Vec::as_slice))) + })) + } +} + +#[cfg(test)] +mod tests { + use std::collections::Bound; + + use futures_util::StreamExt; + + use crate::{ + executor::tokio::TokioExecutor, + inmem::{ + immutable::{tests::TestImmutableArrays, ArrowArrays}, + mutable::Mutable, + }, + stream::{merge::MergeStream, package::PackageStream}, + tests::Test, + }; + + #[tokio::test] + async fn iter() { + let m1 = Mutable::::new(); + m1.insert( + Test { + vstring: "a".into(), + vu32: 0, + vbool: Some(true), + }, + 0.into(), + ); + m1.insert( + Test { + vstring: "b".into(), + vu32: 1, + vbool: Some(true), + }, + 1.into(), + ); + m1.insert( + Test { + vstring: "c".into(), + vu32: 2, + vbool: Some(true), + }, + 2.into(), + ); + m1.insert( + Test { + vstring: "d".into(), + vu32: 3, + vbool: Some(true), + }, + 3.into(), + ); + m1.insert( + Test { + vstring: "e".into(), + vu32: 4, + vbool: Some(true), + }, + 4.into(), + ); + m1.insert( + Test { + vstring: "f".into(), + vu32: 5, + vbool: Some(true), + }, + 5.into(), + ); + + let merge = MergeStream::::from_vec(vec![m1 + .scan((Bound::Unbounded, Bound::Unbounded), 6.into()) + .into()]) + .await + .unwrap(); + let projection_indices = vec![0, 1, 2, 3]; + + let mut package = PackageStream { + row_count: 0, + batch_size: 8192, + inner: merge, + builder: TestImmutableArrays::builder(8192), + projection_indices: Some(projection_indices), + }; + + dbg!(package.next().await); + } +}