Skip to content

Commit

Permalink
feat: enable zstd compression and encodings in merge tree data part (#…
Browse files Browse the repository at this point in the history
…3380)

* feat: enable zstd compression in merge tree data part to save memory

* feat: also enable customized column encoding in DataPartEncoder
  • Loading branch information
v0y4g3r authored Feb 27, 2024
1 parent 206666b commit 492a009
Showing 1 changed file with 55 additions and 13 deletions.
68 changes: 55 additions & 13 deletions src/mito2/src/memtable/merge_tree/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ use datatypes::vectors::{
};
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::properties::{EnabledStatistics, WriterProperties};
use parquet::schema::types::ColumnPath;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME};
Expand Down Expand Up @@ -236,10 +238,12 @@ impl DataBuffer {
pk_weights: Option<&[u16]>,
replace_pk_index: bool,
) -> Result<DataPart> {
let timestamp_col_name = self.metadata.time_index_column().column_schema.name.clone();
let encoder = DataPartEncoder::new(
&self.metadata,
pk_weights,
None,
timestamp_col_name,
replace_pk_index,
self.dedup,
);
Expand Down Expand Up @@ -683,6 +687,7 @@ struct DataPartEncoder<'a> {
schema: SchemaRef,
pk_weights: Option<&'a [u16]>,
row_group_size: Option<usize>,
timestamp_column_name: String,
replace_pk_index: bool,
dedup: bool,
}
Expand All @@ -692,6 +697,7 @@ impl<'a> DataPartEncoder<'a> {
metadata: &RegionMetadataRef,
pk_weights: Option<&'a [u16]>,
row_group_size: Option<usize>,
timestamp_column_name: String,
replace_pk_index: bool,
dedup: bool,
) -> DataPartEncoder<'a> {
Expand All @@ -700,29 +706,51 @@ impl<'a> DataPartEncoder<'a> {
schema,
pk_weights,
row_group_size,
timestamp_column_name,
replace_pk_index,
dedup,
}
}

fn writer_props(&self) -> Option<WriterProperties> {
self.row_group_size.map(|size| {
WriterProperties::builder()
.set_max_row_group_size(size)
.build()
})
}
pub fn write(&self, source: &mut DataBuffer) -> Result<DataPart> {
// todo(hl): more customized config according to region options.
fn writer_props(self) -> WriterProperties {
let mut builder = WriterProperties::builder();
if let Some(row_group_size) = self.row_group_size {
builder = builder.set_max_row_group_size(row_group_size)
}

let ts_col = ColumnPath::new(vec![self.timestamp_column_name]);
let pk_index_col = ColumnPath::new(vec![PK_INDEX_COLUMN_NAME.to_string()]);
let sequence_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]);
let op_type_col = ColumnPath::new(vec![OP_TYPE_COLUMN_NAME.to_string()]);

builder = builder
.set_compression(Compression::ZSTD(ZstdLevel::default()))
.set_statistics_enabled(EnabledStatistics::None);
builder = builder
.set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED)
.set_column_dictionary_enabled(ts_col, false)
.set_column_encoding(pk_index_col.clone(), Encoding::DELTA_BINARY_PACKED)
.set_column_dictionary_enabled(pk_index_col, true)
.set_column_encoding(sequence_col.clone(), Encoding::DELTA_BINARY_PACKED)
.set_column_dictionary_enabled(sequence_col, false)
.set_column_encoding(op_type_col.clone(), Encoding::DELTA_BINARY_PACKED)
.set_column_dictionary_enabled(op_type_col, true);
builder.build()
}

pub fn write(self, source: &mut DataBuffer) -> Result<DataPart> {
let mut bytes = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut bytes, self.schema.clone(), self.writer_props())
.context(error::EncodeMemtableSnafu)?;
let rb = drain_data_buffer_to_record_batches(
self.schema.clone(),
source,
self.pk_weights,
self.dedup,
self.replace_pk_index,
)?;
let mut writer =
ArrowWriter::try_new(&mut bytes, self.schema.clone(), Some(self.writer_props()))
.context(error::EncodeMemtableSnafu)?;
writer.write(&rb).context(error::EncodeMemtableSnafu)?;
let _metadata = writer.close().context(error::EncodeMemtableSnafu)?;
Ok(DataPart::Parquet(ParquetPart {
Expand Down Expand Up @@ -1225,7 +1253,14 @@ mod tests {

assert_eq!(4, buffer.num_rows());

let encoder = DataPartEncoder::new(&meta, Some(&[0, 1, 2]), None, true, true);
let encoder = DataPartEncoder::new(
&meta,
Some(&[0, 1, 2]),
None,
meta.time_index_column().column_schema.name.clone(),
true,
true,
);
let encoded = match encoder.write(&mut buffer).unwrap() {
DataPart::Parquet(data) => data.data,
};
Expand Down Expand Up @@ -1367,7 +1402,14 @@ mod tests {
4,
);

let encoder = DataPartEncoder::new(&meta, Some(weights), Some(4), true, true);
let encoder = DataPartEncoder::new(
&meta,
Some(weights),
Some(4),
meta.time_index_column().column_schema.name.clone(),
true,
true,
);
let encoded = encoder.write(&mut buffer).unwrap();

let mut iter = encoded.read().unwrap();
Expand Down

0 comments on commit 492a009

Please sign in to comment.