diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index 977b8002d730..dceb9d552ff0 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -23,7 +23,7 @@ use datatypes::arrow; use datatypes::arrow::array::{RecordBatch, UInt16Array, UInt32Array}; use datatypes::arrow::datatypes::{Field, Schema, SchemaRef}; use datatypes::data_type::DataType; -use datatypes::prelude::{ConcreteDataType, MutableVector, ScalarVectorBuilder, VectorRef}; +use datatypes::prelude::{ConcreteDataType, MutableVector, ScalarVectorBuilder, Vector, VectorRef}; use datatypes::schema::ColumnSchema; use datatypes::types::TimestampType; use datatypes::vectors::{ @@ -153,6 +153,7 @@ impl DataBuffer { pk_weights, true, true, + true, )?; DataBufferIter::new(batch) } @@ -191,12 +192,16 @@ impl LazyMutableVectorBuilder { } /// Converts `DataBuffer` to record batches, with rows sorted according to pk_weights. +/// `keep_data`: whether to keep the original data inside `DataBuffer`. +/// `dedup`: whether to true to remove the duplicated rows inside `DataBuffer`. +/// `replace_pk_index`: whether to replace the pk_index values with corresponding pk weight. fn data_buffer_to_record_batches( schema: SchemaRef, buffer: &mut DataBuffer, pk_weights: &[u16], keep_data: bool, dedup: bool, + replace_pk_index: bool, ) -> Result { let num_rows = buffer.ts_builder.len(); @@ -227,12 +232,17 @@ fn data_buffer_to_record_batches( let mut columns = Vec::with_capacity(4 + buffer.field_builders.len()); - // replace pk index values with pk weights. - let weights_of_pks = Arc::new(UInt16Array::from_iter_values( - rows.into_iter().map(|(_, key)| key.pk_weight), - )) as Arc<_>; + let pk_array = if replace_pk_index { + // replace pk index values with pk weights. + Arc::new(UInt16Array::from_iter_values( + rows.into_iter().map(|(_, key)| key.pk_weight), + )) as Arc<_> + } else { + arrow::compute::take(&pk_index_v.to_arrow_array(), &indices_to_take, None) + .context(error::ComputeArrowSnafu)? + }; - columns.push(weights_of_pks); + columns.push(pk_array); columns.push( arrow::compute::take(&ts_v.to_arrow_array(), &indices_to_take, None) @@ -503,6 +513,7 @@ impl<'a> DataPartEncoder<'a> { self.pk_weights, false, true, + true, )?; writer.write(&rb).context(error::EncodeMemtableSnafu)?; let _file_meta = writer.close().context(error::EncodeMemtableSnafu)?; @@ -566,7 +577,8 @@ mod tests { assert_eq!(5, buffer.num_rows()); let schema = memtable_schema_to_encoded_schema(&meta); let batch = - data_buffer_to_record_batches(schema, &mut buffer, &[3, 1], keep_data, true).unwrap(); + data_buffer_to_record_batches(schema, &mut buffer, &[3, 1], keep_data, true, true) + .unwrap(); assert_eq!( vec![1, 2, 1, 2], @@ -630,7 +642,7 @@ mod tests { assert_eq!(4, buffer.num_rows()); let schema = memtable_schema_to_encoded_schema(&meta); let batch = - data_buffer_to_record_batches(schema, &mut buffer, &[0, 1], true, true).unwrap(); + data_buffer_to_record_batches(schema, &mut buffer, &[0, 1], true, true, true).unwrap(); assert_eq!(3, batch.num_rows()); assert_eq!( @@ -684,7 +696,7 @@ mod tests { assert_eq!(5, buffer.num_rows()); let schema = memtable_schema_to_encoded_schema(&meta); let batch = - data_buffer_to_record_batches(schema, &mut buffer, &[3, 1], true, false).unwrap(); + data_buffer_to_record_batches(schema, &mut buffer, &[3, 1], true, false, true).unwrap(); assert_eq!( vec![1, 1, 3, 3, 3],