Skip to content

Commit

Permalink
fix: Builder::finish on marco
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Aug 1, 2024
1 parent de58004 commit 5176194
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 11 deletions.
5 changes: 3 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,14 +404,15 @@ where
pub async fn package(
mut self,
batch_size: usize,
) -> Result<impl Stream<Item = Result<R::Columns, ParquetError>> + 'scan, WriteError<R>> {
) -> Result<impl Stream<Item = Result<R::Columns, ParquetError>> + 'scan, DataBaseError<R>>
{
self.streams.push(
self.schema
.mutable
.scan((self.lower, self.upper), self.ts)
.into(),
);
for immutable in &self.schema.immutables {
for (_, immutable) in &self.schema.immutables {
self.streams.push(
immutable
.scan((self.lower, self.upper), self.ts, self.projection.clone())
Expand Down
9 changes: 7 additions & 2 deletions src/morseldb_marco/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,12 +575,12 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream {
0 #(#builder_size_fields)*
}

fn finish(&mut self) -> #struct_arrays_name {
fn finish(&mut self, indices: Option<&[usize]>) -> #struct_arrays_name {
#(#builder_finish_fields)*

let _null = ::std::sync::Arc::new(::arrow::array::BooleanArray::new(self._null.finish(), None));
let _ts = ::std::sync::Arc::new(self._ts.finish());
let record_batch = ::arrow::record_batch::RecordBatch::try_new(
let mut record_batch = ::arrow::record_batch::RecordBatch::try_new(
::std::sync::Arc::clone(
<<#struct_arrays_name as ::morseldb::inmem::immutable::ArrowArrays>::Record as ::morseldb::record::Record>::arrow_schema(),
),
Expand All @@ -592,6 +592,11 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream {
],
)
.expect("create record batch must be successful");
if let Some(indices) = indices {
record_batch = record_batch
.project(indices)
.expect("projection indices must be successful");
}

#struct_arrays_name {
#(#field_names)*
Expand Down
42 changes: 35 additions & 7 deletions src/stream/package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,68 +85,96 @@ mod tests {
use std::collections::Bound;

use futures_util::StreamExt;
use tempfile::TempDir;

use crate::{
executor::tokio::TokioExecutor,
fs::FileProvider,
inmem::{
immutable::{tests::TestImmutableArrays, ArrowArrays},
mutable::Mutable,
},
stream::{merge::MergeStream, package::PackageStream},
tests::Test,
wal::log::LogType,
DbOption,
};

#[tokio::test]
async fn iter() {
let m1 = Mutable::<Test>::new();
let temp_dir = TempDir::new().unwrap();
let option = DbOption::from(temp_dir.path());
TokioExecutor::create_dir_all(option.wal_dir_path())
.await
.unwrap();

let m1 = Mutable::<Test, TokioExecutor>::new(&option).await.unwrap();
m1.insert(
LogType::Full,
Test {
vstring: "a".into(),
vu32: 0,
vbool: Some(true),
},
0.into(),
);
)
.await
.unwrap();
m1.insert(
LogType::Full,
Test {
vstring: "b".into(),
vu32: 1,
vbool: Some(true),
},
1.into(),
);
)
.await
.unwrap();
m1.insert(
LogType::Full,
Test {
vstring: "c".into(),
vu32: 2,
vbool: Some(true),
},
2.into(),
);
)
.await
.unwrap();
m1.insert(
LogType::Full,
Test {
vstring: "d".into(),
vu32: 3,
vbool: Some(true),
},
3.into(),
);
)
.await
.unwrap();
m1.insert(
LogType::Full,
Test {
vstring: "e".into(),
vu32: 4,
vbool: Some(true),
},
4.into(),
);
)
.await
.unwrap();
m1.insert(
LogType::Full,
Test {
vstring: "f".into(),
vu32: 5,
vbool: Some(true),
},
5.into(),
);
)
.await
.unwrap();

let merge = MergeStream::<Test, TokioExecutor>::from_vec(vec![m1
.scan((Bound::Unbounded, Bound::Unbounded), 6.into())
Expand Down

0 comments on commit 5176194

Please sign in to comment.