diff --git a/src/lib.rs b/src/lib.rs index 48955e68..4f6997d1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -404,14 +404,15 @@ where pub async fn package( mut self, batch_size: usize, - ) -> Result> + 'scan, WriteError> { + ) -> Result> + 'scan, DataBaseError> + { self.streams.push( self.schema .mutable .scan((self.lower, self.upper), self.ts) .into(), ); - for immutable in &self.schema.immutables { + for (_, immutable) in &self.schema.immutables { self.streams.push( immutable .scan((self.lower, self.upper), self.ts, self.projection.clone()) diff --git a/src/morseldb_marco/src/lib.rs b/src/morseldb_marco/src/lib.rs index 55e2c461..34cf5a2d 100644 --- a/src/morseldb_marco/src/lib.rs +++ b/src/morseldb_marco/src/lib.rs @@ -575,12 +575,12 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream { 0 #(#builder_size_fields)* } - fn finish(&mut self) -> #struct_arrays_name { + fn finish(&mut self, indices: Option<&[usize]>) -> #struct_arrays_name { #(#builder_finish_fields)* let _null = ::std::sync::Arc::new(::arrow::array::BooleanArray::new(self._null.finish(), None)); let _ts = ::std::sync::Arc::new(self._ts.finish()); - let record_batch = ::arrow::record_batch::RecordBatch::try_new( + let mut record_batch = ::arrow::record_batch::RecordBatch::try_new( ::std::sync::Arc::clone( <<#struct_arrays_name as ::morseldb::inmem::immutable::ArrowArrays>::Record as ::morseldb::record::Record>::arrow_schema(), ), @@ -592,6 +592,11 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream { ], ) .expect("create record batch must be successful"); + if let Some(indices) = indices { + record_batch = record_batch + .project(indices) + .expect("projection indices must be successful"); + } #struct_arrays_name { #(#field_names)* diff --git a/src/stream/package.rs b/src/stream/package.rs index 79fa6540..2f413a92 100644 --- a/src/stream/package.rs +++ b/src/stream/package.rs @@ -85,68 +85,96 @@ mod tests { use std::collections::Bound; use futures_util::StreamExt; + use tempfile::TempDir; use crate::{ executor::tokio::TokioExecutor, + fs::FileProvider, inmem::{ immutable::{tests::TestImmutableArrays, ArrowArrays}, mutable::Mutable, }, stream::{merge::MergeStream, package::PackageStream}, tests::Test, + wal::log::LogType, + DbOption, }; #[tokio::test] async fn iter() { - let m1 = Mutable::::new(); + let temp_dir = TempDir::new().unwrap(); + let option = DbOption::from(temp_dir.path()); + TokioExecutor::create_dir_all(option.wal_dir_path()) + .await + .unwrap(); + + let m1 = Mutable::::new(&option).await.unwrap(); m1.insert( + LogType::Full, Test { vstring: "a".into(), vu32: 0, vbool: Some(true), }, 0.into(), - ); + ) + .await + .unwrap(); m1.insert( + LogType::Full, Test { vstring: "b".into(), vu32: 1, vbool: Some(true), }, 1.into(), - ); + ) + .await + .unwrap(); m1.insert( + LogType::Full, Test { vstring: "c".into(), vu32: 2, vbool: Some(true), }, 2.into(), - ); + ) + .await + .unwrap(); m1.insert( + LogType::Full, Test { vstring: "d".into(), vu32: 3, vbool: Some(true), }, 3.into(), - ); + ) + .await + .unwrap(); m1.insert( + LogType::Full, Test { vstring: "e".into(), vu32: 4, vbool: Some(true), }, 4.into(), - ); + ) + .await + .unwrap(); m1.insert( + LogType::Full, Test { vstring: "f".into(), vu32: 5, vbool: Some(true), }, 5.into(), - ); + ) + .await + .unwrap(); let merge = MergeStream::::from_vec(vec![m1 .scan((Bound::Unbounded, Bound::Unbounded), 6.into())