Skip to content

Commit

Permalink
feat: impl PackageStream for packaging data into RecordBatch
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Aug 1, 2024
1 parent cefc22c commit de58004
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ where
debug_assert!(max.is_some());

let gen = Ulid::new();
let columns = builder.finish();
let columns = builder.finish(None);
let mut writer = AsyncArrowWriter::try_new(
FP::open(option.table_path(&gen)).await?.compat(),
R::arrow_schema().clone(),
Expand Down
13 changes: 9 additions & 4 deletions src/inmem/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ where

fn written_size(&self) -> usize;

fn finish(&mut self) -> S;
fn finish(&mut self, indices: Option<&[usize]>) -> S;
}

pub(crate) struct Immutable<A>
Expand Down Expand Up @@ -70,7 +70,7 @@ where
index.insert(key, offset as u32);
}

let data = builder.finish();
let data = builder.finish(None);

Self { data, index }
}
Expand Down Expand Up @@ -294,13 +294,13 @@ pub(crate) mod tests {
+ mem::size_of_val(self.vobool.values_slice())
}

fn finish(&mut self) -> TestImmutableArrays {
fn finish(&mut self, indices: Option<&[usize]>) -> TestImmutableArrays {
let vstring = Arc::new(self.vstring.finish());
let vu32 = Arc::new(self.vu32.finish());
let vbool = Arc::new(self.vobool.finish());
let _null = Arc::new(BooleanArray::new(self._null.finish(), None));
let _ts = Arc::new(self._ts.finish());
let record_batch = RecordBatch::try_new(
let mut record_batch = RecordBatch::try_new(
Arc::clone(
<<TestImmutableArrays as ArrowArrays>::Record as Record>::arrow_schema(),
),
Expand All @@ -313,6 +313,11 @@ pub(crate) mod tests {
],
)
.expect("create record batch must be successful");
if let Some(indices) = indices {
record_batch = record_batch
.project(indices)
.expect("projection indices must be successful");
}

TestImmutableArrays {
vstring,
Expand Down
42 changes: 40 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use crate::{
executor::Executor,
fs::{FileId, FileType},
serdes::Decode,
stream::{merge::MergeStream, Entry, ScanStream},
stream::{merge::MergeStream, package::PackageStream, Entry, ScanStream},
timestamp::Timestamped,
version::{cleaner::Cleaner, set::VersionSet, Version, VersionError},
wal::{log::LogType, RecoverError, WalFile},
Expand Down Expand Up @@ -321,6 +321,7 @@ where
streams: Vec<ScanStream<'scan, R, FP>>,

limit: Option<usize>,
projection_indices: Option<Vec<usize>>,
projection: ProjectionMask,
}

Expand All @@ -344,6 +345,7 @@ where
version,
streams,
limit: None,
projection_indices: None,
projection: ProjectionMask::all(),
}
}
Expand All @@ -360,11 +362,12 @@ where
projection.extend([0, 1, R::primary_key_index()]);
let mask = ProjectionMask::roots(
&arrow_to_parquet_schema(R::arrow_schema()).unwrap(),
projection,
projection.clone(),
);

Self {
projection: mask,
projection_indices: Some(projection),
..self
}
}
Expand Down Expand Up @@ -397,6 +400,41 @@ where

Ok(MergeStream::from_vec(self.streams).await?)
}

pub async fn package(
mut self,
batch_size: usize,
) -> Result<impl Stream<Item = Result<R::Columns, ParquetError>> + 'scan, WriteError<R>> {
self.streams.push(
self.schema
.mutable
.scan((self.lower, self.upper), self.ts)
.into(),
);
for immutable in &self.schema.immutables {
self.streams.push(
immutable
.scan((self.lower, self.upper), self.ts, self.projection.clone())
.into(),
);
}
self.version
.streams(
&mut self.streams,
(self.lower, self.upper),
self.ts,
self.limit,
self.projection,
)
.await?;
let merge_stream = MergeStream::from_vec(self.streams).await?;

Ok(PackageStream::new(
batch_size,
merge_stream,
self.projection_indices,
))
}
}

#[derive(Debug, Error)]
Expand Down
2 changes: 1 addition & 1 deletion src/record/str.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl Builder<StringColumns> for StringColumnsBuilder {
+ self.string.values_slice().len()
}

fn finish(&mut self) -> StringColumns {
fn finish(&mut self, _: Option<&[usize]>) -> StringColumns {
let _null = Arc::new(BooleanArray::new(self._null.finish(), None));
let _ts = Arc::new(self._ts.finish());
let string = Arc::new(self.string.finish());
Expand Down
1 change: 1 addition & 0 deletions src/stream/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub(crate) mod level;
pub(crate) mod merge;
pub(crate) mod package;
pub(crate) mod record_batch;

use std::{
Expand Down
168 changes: 168 additions & 0 deletions src/stream/package.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
use std::{
pin::Pin,
task::{Context, Poll},
};

use futures_core::Stream;
use pin_project_lite::pin_project;

use crate::{
fs::FileProvider,
inmem::immutable::{ArrowArrays, Builder},
record::Record,
stream::merge::MergeStream,
};

pin_project! {
pub struct PackageStream<'package, R, FP>
where
R: Record,
FP: FileProvider,
{
row_count: usize,
batch_size: usize,
inner: MergeStream<'package, R, FP>,
builder: <R::Columns as ArrowArrays>::Builder,
projection_indices: Option<Vec<usize>>,
}
}

impl<'package, R, FP> PackageStream<'package, R, FP>
where
R: Record,
FP: FileProvider + 'package,
{
pub(crate) fn new(
batch_size: usize,
merge: MergeStream<'package, R, FP>,
projection_indices: Option<Vec<usize>>,
) -> Self {
Self {
row_count: 0,
batch_size,
inner: merge,
builder: R::Columns::builder(batch_size),
projection_indices,
}
}
}

impl<'package, R, FP> Stream for PackageStream<'package, R, FP>
where
R: Record,
FP: FileProvider + 'package,
{
type Item = Result<R::Columns, parquet::errors::ParquetError>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut project = self.project();

while project.row_count <= project.batch_size {
match Pin::new(&mut project.inner).poll_next(cx) {
Poll::Ready(Some(Ok(entry))) => {
if let Some(record) = entry.value() {
// filter null
project.builder.push(entry.key(), Some(record));
*project.row_count += 1;
}
}
Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err))),
Poll::Ready(None) => break,
Poll::Pending => return Poll::Pending,
}
}
Poll::Ready((*project.row_count != 0).then(|| {
*project.row_count = 0;
Ok(project
.builder
.finish(project.projection_indices.as_ref().map(Vec::as_slice)))
}))
}
}

#[cfg(test)]
mod tests {
use std::collections::Bound;

use futures_util::StreamExt;

use crate::{
executor::tokio::TokioExecutor,
inmem::{
immutable::{tests::TestImmutableArrays, ArrowArrays},
mutable::Mutable,
},
stream::{merge::MergeStream, package::PackageStream},
tests::Test,
};

#[tokio::test]
async fn iter() {
let m1 = Mutable::<Test>::new();
m1.insert(
Test {
vstring: "a".into(),
vu32: 0,
vbool: Some(true),
},
0.into(),
);
m1.insert(
Test {
vstring: "b".into(),
vu32: 1,
vbool: Some(true),
},
1.into(),
);
m1.insert(
Test {
vstring: "c".into(),
vu32: 2,
vbool: Some(true),
},
2.into(),
);
m1.insert(
Test {
vstring: "d".into(),
vu32: 3,
vbool: Some(true),
},
3.into(),
);
m1.insert(
Test {
vstring: "e".into(),
vu32: 4,
vbool: Some(true),
},
4.into(),
);
m1.insert(
Test {
vstring: "f".into(),
vu32: 5,
vbool: Some(true),
},
5.into(),
);

let merge = MergeStream::<Test, TokioExecutor>::from_vec(vec![m1
.scan((Bound::Unbounded, Bound::Unbounded), 6.into())
.into()])
.await
.unwrap();
let projection_indices = vec![0, 1, 2, 3];

let mut package = PackageStream {
row_count: 0,
batch_size: 8192,
inner: merge,
builder: TestImmutableArrays::builder(8192),
projection_indices: Some(projection_indices),
};

dbg!(package.next().await);
}
}

0 comments on commit de58004

Please sign in to comment.