diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index ac724bae2171..82bf5c8440d0 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -34,7 +34,9 @@ use datatypes::vectors::{ }; use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder}; use parquet::arrow::ArrowWriter; -use parquet::file::properties::WriterProperties; +use parquet::basic::{Compression, Encoding, ZstdLevel}; +use parquet::file::properties::{EnabledStatistics, WriterProperties}; +use parquet::schema::types::ColumnPath; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME}; @@ -236,10 +238,12 @@ impl DataBuffer { pk_weights: Option<&[u16]>, replace_pk_index: bool, ) -> Result { + let timestamp_col_name = self.metadata.time_index_column().column_schema.name.clone(); let encoder = DataPartEncoder::new( &self.metadata, pk_weights, None, + timestamp_col_name, replace_pk_index, self.dedup, ); @@ -683,6 +687,7 @@ struct DataPartEncoder<'a> { schema: SchemaRef, pk_weights: Option<&'a [u16]>, row_group_size: Option, + timestamp_column_name: String, replace_pk_index: bool, dedup: bool, } @@ -692,6 +697,7 @@ impl<'a> DataPartEncoder<'a> { metadata: &RegionMetadataRef, pk_weights: Option<&'a [u16]>, row_group_size: Option, + timestamp_column_name: String, replace_pk_index: bool, dedup: bool, ) -> DataPartEncoder<'a> { @@ -700,22 +706,41 @@ impl<'a> DataPartEncoder<'a> { schema, pk_weights, row_group_size, + timestamp_column_name, replace_pk_index, dedup, } } - fn writer_props(&self) -> Option { - self.row_group_size.map(|size| { - WriterProperties::builder() - .set_max_row_group_size(size) - .build() - }) - } - pub fn write(&self, source: &mut DataBuffer) -> Result { + // todo(hl): more customized config according to region options. + fn writer_props(self) -> WriterProperties { + let mut builder = WriterProperties::builder(); + if let Some(row_group_size) = self.row_group_size { + builder = builder.set_max_row_group_size(row_group_size) + } + + let ts_col = ColumnPath::new(vec![self.timestamp_column_name]); + let pk_index_col = ColumnPath::new(vec![PK_INDEX_COLUMN_NAME.to_string()]); + let sequence_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]); + let op_type_col = ColumnPath::new(vec![OP_TYPE_COLUMN_NAME.to_string()]); + + builder = builder + .set_compression(Compression::ZSTD(ZstdLevel::default())) + .set_statistics_enabled(EnabledStatistics::None); + builder = builder + .set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED) + .set_column_dictionary_enabled(ts_col, false) + .set_column_encoding(pk_index_col.clone(), Encoding::DELTA_BINARY_PACKED) + .set_column_dictionary_enabled(pk_index_col, true) + .set_column_encoding(sequence_col.clone(), Encoding::DELTA_BINARY_PACKED) + .set_column_dictionary_enabled(sequence_col, false) + .set_column_encoding(op_type_col.clone(), Encoding::DELTA_BINARY_PACKED) + .set_column_dictionary_enabled(op_type_col, true); + builder.build() + } + + pub fn write(self, source: &mut DataBuffer) -> Result { let mut bytes = Vec::with_capacity(1024); - let mut writer = ArrowWriter::try_new(&mut bytes, self.schema.clone(), self.writer_props()) - .context(error::EncodeMemtableSnafu)?; let rb = drain_data_buffer_to_record_batches( self.schema.clone(), source, @@ -723,6 +748,9 @@ impl<'a> DataPartEncoder<'a> { self.dedup, self.replace_pk_index, )?; + let mut writer = + ArrowWriter::try_new(&mut bytes, self.schema.clone(), Some(self.writer_props())) + .context(error::EncodeMemtableSnafu)?; writer.write(&rb).context(error::EncodeMemtableSnafu)?; let _metadata = writer.close().context(error::EncodeMemtableSnafu)?; Ok(DataPart::Parquet(ParquetPart { @@ -1225,7 +1253,14 @@ mod tests { assert_eq!(4, buffer.num_rows()); - let encoder = DataPartEncoder::new(&meta, Some(&[0, 1, 2]), None, true, true); + let encoder = DataPartEncoder::new( + &meta, + Some(&[0, 1, 2]), + None, + meta.time_index_column().column_schema.name.clone(), + true, + true, + ); let encoded = match encoder.write(&mut buffer).unwrap() { DataPart::Parquet(data) => data.data, }; @@ -1367,7 +1402,14 @@ mod tests { 4, ); - let encoder = DataPartEncoder::new(&meta, Some(weights), Some(4), true, true); + let encoder = DataPartEncoder::new( + &meta, + Some(weights), + Some(4), + meta.time_index_column().column_schema.name.clone(), + true, + true, + ); let encoded = encoder.write(&mut buffer).unwrap(); let mut iter = encoded.read().unwrap();