Skip to content

Commit

Permalink
feat: stream writer takes arrow's types
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed Aug 14, 2023
1 parent fa97086 commit 92c62a4
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 23 deletions.
19 changes: 16 additions & 3 deletions src/mito2/src/sst/parquet/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
//! Parquet writer.
use common_telemetry::debug;
use datatypes::arrow::record_batch::RecordBatch;
use object_store::ObjectStore;
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::metadata::KeyValue;
use parquet::file::properties::WriterProperties;
use snafu::ResultExt;

use crate::error::Result;
use crate::error::{NewRecordBatchSnafu, Result};
use crate::read::Source;
use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY};
use crate::sst::stream_writer::BufferedWriter;
Expand Down Expand Up @@ -64,17 +66,28 @@ impl<'a> ParquetWriter<'a> {

let writer_props = props_builder.build();

let arrow_schema = metadata.schema.arrow_schema();
let mut buffered_writer = BufferedWriter::try_new(
self.file_path.to_string(),
self.object_store.clone(),
&metadata.schema,
arrow_schema.clone(),
Some(writer_props),
opts.write_buffer_size.as_bytes() as usize,
)
.await?;

while let Some(batch) = self.source.next_batch().await? {
buffered_writer.write(&batch).await?;
let arrow_batch = RecordBatch::try_new(
arrow_schema.clone(),
batch
.columns
.iter()
.map(|v| v.to_arrow_array())
.collect::<Vec<_>>(),
)
.context(NewRecordBatchSnafu)?;

buffered_writer.write(&arrow_batch).await?;
}
// Get stats from the source.
let stats = self.source.stats();
Expand Down
27 changes: 7 additions & 20 deletions src/mito2/src/sst/stream_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,22 @@ use std::pin::Pin;

use common_datasource::buffered_writer::LazyBufferedWriter;
use common_datasource::share_buffer::SharedBuffer;
use datatypes::arrow;
use datatypes::arrow::datatypes::SchemaRef;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::schema::SchemaRef;
use object_store::ObjectStore;
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
use parquet::format::FileMetaData;
use snafu::ResultExt;

use crate::error;
use crate::error::{NewRecordBatchSnafu, WriteParquetSnafu};
use crate::read::Batch;
use crate::error::WriteParquetSnafu;

/// Parquet writer that buffers row groups in memory and writes buffered data to an underlying
/// storage by chunks to reduce memory consumption.
pub struct BufferedWriter {
inner: InnerBufferedWriter,
arrow_schema: arrow::datatypes::SchemaRef,
arrow_schema: SchemaRef,
}

type InnerBufferedWriter = LazyBufferedWriter<
Expand All @@ -56,11 +54,10 @@ impl BufferedWriter {
pub async fn try_new(
path: String,
store: ObjectStore,
schema: &SchemaRef,
arrow_schema: SchemaRef,
props: Option<WriterProperties>,
buffer_threshold: usize,
) -> error::Result<Self> {
let arrow_schema = schema.arrow_schema();
let buffer = SharedBuffer::with_capacity(buffer_threshold);

let arrow_writer = ArrowWriter::try_new(buffer.clone(), arrow_schema.clone(), props)
Expand All @@ -82,24 +79,14 @@ impl BufferedWriter {
})
}),
),
arrow_schema: arrow_schema.clone(),
arrow_schema,
})
}

/// Write a record batch to stream writer.
pub async fn write(&mut self, batch: &Batch) -> error::Result<()> {
let arrow_batch = RecordBatch::try_new(
self.arrow_schema.clone(),
batch
.columns
.iter()
.map(|v| v.to_arrow_array())
.collect::<Vec<_>>(),
)
.context(NewRecordBatchSnafu)?;

pub async fn write(&mut self, arrow_batch: &RecordBatch) -> error::Result<()> {
self.inner
.write(&arrow_batch)
.write(arrow_batch)
.await
.context(error::WriteBufferSnafu)?;
self.inner
Expand Down

0 comments on commit 92c62a4

Please sign in to comment.