diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 4b50a575342f..d53aee4ca3b4 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -15,12 +15,14 @@ //! Parquet writer. use common_telemetry::debug; +use datatypes::arrow::record_batch::RecordBatch; use object_store::ObjectStore; use parquet::basic::{Compression, Encoding, ZstdLevel}; use parquet::file::metadata::KeyValue; use parquet::file::properties::WriterProperties; +use snafu::ResultExt; -use crate::error::Result; +use crate::error::{NewRecordBatchSnafu, Result}; use crate::read::Source; use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY}; use crate::sst::stream_writer::BufferedWriter; @@ -64,17 +66,28 @@ impl<'a> ParquetWriter<'a> { let writer_props = props_builder.build(); + let arrow_schema = metadata.schema.arrow_schema(); let mut buffered_writer = BufferedWriter::try_new( self.file_path.to_string(), self.object_store.clone(), - &metadata.schema, + arrow_schema.clone(), Some(writer_props), opts.write_buffer_size.as_bytes() as usize, ) .await?; while let Some(batch) = self.source.next_batch().await? { - buffered_writer.write(&batch).await?; + let arrow_batch = RecordBatch::try_new( + arrow_schema.clone(), + batch + .columns + .iter() + .map(|v| v.to_arrow_array()) + .collect::>(), + ) + .context(NewRecordBatchSnafu)?; + + buffered_writer.write(&arrow_batch).await?; } // Get stats from the source. let stats = self.source.stats(); diff --git a/src/mito2/src/sst/stream_writer.rs b/src/mito2/src/sst/stream_writer.rs index 62fa4df7e6e1..005b533443b6 100644 --- a/src/mito2/src/sst/stream_writer.rs +++ b/src/mito2/src/sst/stream_writer.rs @@ -17,9 +17,8 @@ use std::pin::Pin; use common_datasource::buffered_writer::LazyBufferedWriter; use common_datasource::share_buffer::SharedBuffer; -use datatypes::arrow; +use datatypes::arrow::datatypes::SchemaRef; use datatypes::arrow::record_batch::RecordBatch; -use datatypes::schema::SchemaRef; use object_store::ObjectStore; use parquet::arrow::ArrowWriter; use parquet::file::properties::WriterProperties; @@ -27,14 +26,13 @@ use parquet::format::FileMetaData; use snafu::ResultExt; use crate::error; -use crate::error::{NewRecordBatchSnafu, WriteParquetSnafu}; -use crate::read::Batch; +use crate::error::WriteParquetSnafu; /// Parquet writer that buffers row groups in memory and writes buffered data to an underlying /// storage by chunks to reduce memory consumption. pub struct BufferedWriter { inner: InnerBufferedWriter, - arrow_schema: arrow::datatypes::SchemaRef, + arrow_schema: SchemaRef, } type InnerBufferedWriter = LazyBufferedWriter< @@ -56,11 +54,10 @@ impl BufferedWriter { pub async fn try_new( path: String, store: ObjectStore, - schema: &SchemaRef, + arrow_schema: SchemaRef, props: Option, buffer_threshold: usize, ) -> error::Result { - let arrow_schema = schema.arrow_schema(); let buffer = SharedBuffer::with_capacity(buffer_threshold); let arrow_writer = ArrowWriter::try_new(buffer.clone(), arrow_schema.clone(), props) @@ -82,24 +79,14 @@ impl BufferedWriter { }) }), ), - arrow_schema: arrow_schema.clone(), + arrow_schema, }) } /// Write a record batch to stream writer. - pub async fn write(&mut self, batch: &Batch) -> error::Result<()> { - let arrow_batch = RecordBatch::try_new( - self.arrow_schema.clone(), - batch - .columns - .iter() - .map(|v| v.to_arrow_array()) - .collect::>(), - ) - .context(NewRecordBatchSnafu)?; - + pub async fn write(&mut self, arrow_batch: &RecordBatch) -> error::Result<()> { self.inner - .write(&arrow_batch) + .write(arrow_batch) .await .context(error::WriteBufferSnafu)?; self.inner