diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index f414ca38f99a..0c68ca70999c 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -146,16 +146,24 @@ impl DataBuffer { } } - /// Freezes `DataBuffer` to bytes. Use `pk_weights` to sort rows and replace pk_index to pk_weights. + /// Freezes `DataBuffer` to bytes. + /// If `pk_weights` is present, it will be used to sort rows. + /// /// `freeze` clears the buffers of builders. - pub fn freeze(&mut self, pk_weights: &[u16]) -> Result { - let encoder = DataPartEncoder::new(&self.metadata, pk_weights, None); + pub fn freeze( + &mut self, + pk_weights: Option<&[u16]>, + replace_pk_index: bool, + ) -> Result { + let encoder = DataPartEncoder::new(&self.metadata, pk_weights, None, replace_pk_index); let parts = encoder.write(self)?; Ok(parts) } /// Reads batches from data buffer without resetting builder's buffers. - pub fn read(&mut self, pk_weights: &[u16]) -> Result { + /// If pk_weights is present, yielded rows are sorted according to weights, + /// otherwise rows are sorted by "pk_weights" values as they are actually weights. + pub fn read(&mut self, pk_weights: Option<&[u16]>) -> Result { // todo(hl): control whether to dedup while invoking `read`. let batch = data_buffer_to_record_batches( self.data_part_schema.clone(), @@ -163,7 +171,10 @@ impl DataBuffer { pk_weights, true, true, - true, + // replace_pk_index is always set to false since: + // - for DataBuffer in ShardBuilder, pk dict is not frozen + // - for DataBuffer in Shard, values in pk_index column has already been replaced during `freeze`. + false, )?; DataBufferReader::new(batch) } @@ -208,7 +219,7 @@ impl LazyMutableVectorBuilder { fn data_buffer_to_record_batches( schema: SchemaRef, buffer: &mut DataBuffer, - pk_weights: &[u16], + pk_weights: Option<&[u16]>, keep_data: bool, dedup: bool, replace_pk_index: bool, @@ -408,7 +419,7 @@ impl Ord for InnerKey { } fn build_rows_to_sort( - pk_weights: &[u16], + pk_weights: Option<&[u16]>, pk_index: &UInt16Vector, ts: &VectorRef, sequence: &UInt64Vector, @@ -453,11 +464,16 @@ fn build_rows_to_sort( .zip(sequence_values.iter()) .enumerate() .map(|(idx, ((timestamp, pk_index), sequence))| { + let pk_weight = if let Some(weights) = pk_weights { + weights[*pk_index as usize] // if pk_weights is present, sort according to weight. + } else { + *pk_index // otherwise pk_index has already been replaced by weights. + }; ( idx, InnerKey { timestamp: *timestamp, - pk_weight: pk_weights[*pk_index as usize], + pk_weight, sequence: *sequence, }, ) @@ -493,21 +509,24 @@ fn memtable_schema_to_encoded_schema(schema: &RegionMetadataRef) -> SchemaRef { struct DataPartEncoder<'a> { schema: SchemaRef, - pk_weights: &'a [u16], + pk_weights: Option<&'a [u16]>, row_group_size: Option, + replace_pk_index: bool, } impl<'a> DataPartEncoder<'a> { pub fn new( metadata: &RegionMetadataRef, - pk_weights: &'a [u16], + pk_weights: Option<&'a [u16]>, row_group_size: Option, + replace_pk_index: bool, ) -> DataPartEncoder<'a> { let schema = memtable_schema_to_encoded_schema(metadata); Self { schema, pk_weights, row_group_size, + replace_pk_index, } } @@ -528,7 +547,7 @@ impl<'a> DataPartEncoder<'a> { self.pk_weights, false, true, - true, + self.replace_pk_index, )?; writer.write(&rb).context(error::EncodeMemtableSnafu)?; let _metadata = writer.close().context(error::EncodeMemtableSnafu)?; @@ -689,19 +708,20 @@ impl DataParts { } /// Freezes the active data buffer into frozen data parts. - pub fn freeze(&mut self, pk_weights: &[u16]) -> Result<()> { - self.frozen.push(self.active.freeze(pk_weights)?); + pub fn freeze(&mut self) -> Result<()> { + self.frozen.push(self.active.freeze(None, false)?); Ok(()) } /// Reads data from all parts including active and frozen parts. /// The returned iterator yields a record batch of one primary key at a time. /// The order of yielding primary keys is determined by provided weights. - /// todo(hl): read may not take any pk weights if is read by `Shard`. - pub fn read(&mut self, pk_weights: &[u16]) -> Result { + pub fn read(&mut self) -> Result { let mut nodes = Vec::with_capacity(self.frozen.len() + 1); nodes.push(DataNode::new(DataSource::Buffer( - self.active.read(pk_weights)?, + // `DataPars::read` ensures that all pk_index inside `DataBuffer` are replaced by weights. + // then we pass None to sort rows directly according to pk_index. + self.active.read(None)?, ))); for p in &self.frozen { nodes.push(DataNode::new(DataSource::Part(p.read()?))); @@ -742,6 +762,7 @@ mod tests { use parquet::data_type::AsBytes; use super::*; + use crate::memtable::merge_tree::merger::timestamp_array_to_i64_slice; use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test}; #[test] @@ -773,9 +794,15 @@ mod tests { write_rows_to_buffer(&mut buffer, &meta, 0, vec![2], vec![Some(1.1)], 3); assert_eq!(5, buffer.num_rows()); let schema = memtable_schema_to_encoded_schema(&meta); - let batch = - data_buffer_to_record_batches(schema, &mut buffer, &[3, 1], keep_data, true, true) - .unwrap(); + let batch = data_buffer_to_record_batches( + schema, + &mut buffer, + Some(&[3, 1]), + keep_data, + true, + true, + ) + .unwrap(); assert_eq!( vec![1, 2, 1, 2], @@ -839,7 +866,8 @@ mod tests { assert_eq!(4, buffer.num_rows()); let schema = memtable_schema_to_encoded_schema(&meta); let batch = - data_buffer_to_record_batches(schema, &mut buffer, &[0, 1], true, true, true).unwrap(); + data_buffer_to_record_batches(schema, &mut buffer, Some(&[0, 1]), true, true, true) + .unwrap(); assert_eq!(3, batch.num_rows()); assert_eq!( @@ -893,7 +921,8 @@ mod tests { assert_eq!(5, buffer.num_rows()); let schema = memtable_schema_to_encoded_schema(&meta); let batch = - data_buffer_to_record_batches(schema, &mut buffer, &[3, 1], true, false, true).unwrap(); + data_buffer_to_record_batches(schema, &mut buffer, Some(&[3, 1]), true, false, true) + .unwrap(); assert_eq!( vec![1, 1, 3, 3, 3], @@ -944,6 +973,80 @@ mod tests { } } + fn check_data_buffer_freeze( + pk_weights: Option<&[u16]>, + replace_pk_weights: bool, + expected: &[(u16, Vec<(i64, u64)>)], + ) { + let meta = metadata_for_test(); + let mut buffer = DataBuffer::with_capacity(meta.clone(), 10); + + // write rows with null values. + write_rows_to_buffer( + &mut buffer, + &meta, + 0, + vec![0, 1, 2], + vec![Some(1.0), None, Some(3.0)], + 0, + ); + write_rows_to_buffer(&mut buffer, &meta, 1, vec![1], vec![Some(2.0)], 3); + + let mut res = Vec::with_capacity(3); + let mut reader = buffer + .freeze(pk_weights, replace_pk_weights) + .unwrap() + .read() + .unwrap(); + while reader.is_valid() { + let batch = reader.current_data_batch(); + let rb = batch.slice_record_batch(); + let ts = timestamp_array_to_i64_slice(rb.column(1)); + let sequence = rb + .column(2) + .as_any() + .downcast_ref::() + .unwrap() + .values(); + let ts_and_seq = ts + .iter() + .zip(sequence.iter()) + .map(|(ts, seq)| (*ts, *seq)) + .collect::>(); + res.push((batch.pk_index, ts_and_seq)); + + reader.next().unwrap(); + } + assert_eq!(expected, res); + } + + #[test] + fn test_data_buffer_freeze() { + check_data_buffer_freeze( + None, + false, + &[(0, vec![(0, 0), (1, 1), (2, 2)]), (1, vec![(1, 3)])], + ); + + check_data_buffer_freeze( + Some(&[1, 2]), + false, + &[(0, vec![(0, 0), (1, 1), (2, 2)]), (1, vec![(1, 3)])], + ); + + check_data_buffer_freeze( + Some(&[3, 2]), + true, + &[(2, vec![(1, 3)]), (3, vec![(0, 0), (1, 1), (2, 2)])], + ); + + check_data_buffer_freeze( + Some(&[3, 2]), + false, + &[(1, vec![(1, 3)]), (0, vec![(0, 0), (1, 1), (2, 2)])], + ); + } + #[test] fn test_encode_data_buffer() { let meta = metadata_for_test(); @@ -965,7 +1068,7 @@ mod tests { assert_eq!(4, buffer.num_rows()); - let encoder = DataPartEncoder::new(&meta, &[0, 1, 2], None); + let encoder = DataPartEncoder::new(&meta, Some(&[0, 1, 2]), None, true); let encoded = match encoder.write(&mut buffer).unwrap() { DataPart::Parquet(data) => data.data, }; @@ -1010,8 +1113,7 @@ mod tests { assert_eq!(None, search_next_pk_range(&a, 6)); } - #[test] - fn test_iter_data_buffer() { + fn check_iter_data_buffer(pk_weights: Option<&[u16]>, expected: &[Vec]) { let meta = metadata_for_test(); let mut buffer = DataBuffer::with_capacity(meta.clone(), 10); @@ -1033,15 +1135,28 @@ mod tests { 2, ); - let mut iter = buffer.read(&[0, 1, 3, 2]).unwrap(); - check_buffer_values_equal(&mut iter, &[vec![1.1, 2.1, 3.1], vec![1.0, 2.0, 3.0]]); + let mut iter = buffer.read(pk_weights).unwrap(); + check_buffer_values_equal(&mut iter, expected); + } + + #[test] + fn test_iter_data_buffer() { + check_iter_data_buffer(None, &[vec![1.0, 2.0, 3.0], vec![1.1, 2.1, 3.1]]); + check_iter_data_buffer( + Some(&[0, 1, 2, 3]), + &[vec![1.0, 2.0, 3.0], vec![1.1, 2.1, 3.1]], + ); + check_iter_data_buffer( + Some(&[3, 2, 1, 0]), + &[vec![1.1, 2.1, 3.1], vec![1.0, 2.0, 3.0]], + ); } #[test] fn test_iter_empty_data_buffer() { let meta = metadata_for_test(); let mut buffer = DataBuffer::with_capacity(meta.clone(), 10); - let mut iter = buffer.read(&[0, 1, 3, 2]).unwrap(); + let mut iter = buffer.read(Some(&[0, 1, 3, 2])).unwrap(); check_buffer_values_equal(&mut iter, &[]); } @@ -1095,7 +1210,7 @@ mod tests { 4, ); - let encoder = DataPartEncoder::new(&meta, weights, Some(4)); + let encoder = DataPartEncoder::new(&meta, Some(weights), Some(4), true); let encoded = encoder.write(&mut buffer).unwrap(); let mut iter = encoded.read().unwrap(); diff --git a/src/mito2/src/memtable/merge_tree/merger.rs b/src/mito2/src/memtable/merge_tree/merger.rs index c758d3ecd909..3ed59da19513 100644 --- a/src/mito2/src/memtable/merge_tree/merger.rs +++ b/src/mito2/src/memtable/merge_tree/merger.rs @@ -63,8 +63,8 @@ pub struct Merger { } impl Merger -where - T: Node, + where + T: Node, { pub(crate) fn try_new(nodes: Vec) -> Result { let mut heap = BinaryHeap::with_capacity(nodes.len()); @@ -372,7 +372,7 @@ impl Node for DataNode { } } -fn timestamp_array_to_i64_slice(arr: &ArrayRef) -> &[i64] { +pub(crate) fn timestamp_array_to_i64_slice(arr: &ArrayRef) -> &[i64] { match arr.data_type() { DataType::Timestamp(t, _) => match t { TimeUnit::Second => arr @@ -470,12 +470,16 @@ mod tests { let mut seq = 0; write_rows_to_buffer(&mut buffer1, &metadata, 1, vec![2, 3], &mut seq); write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2], &mut seq); - let node1 = DataNode::new(DataSource::Buffer(buffer1.read(weight).unwrap())); + let node1 = DataNode::new(DataSource::Part( + buffer1.freeze(Some(weight), true).unwrap().read().unwrap(), + )); let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10); write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![3], &mut seq); write_rows_to_buffer(&mut buffer2, &metadata, 0, vec![1], &mut seq); - let node2 = DataNode::new(DataSource::Buffer(buffer2.read(weight).unwrap())); + let node2 = DataNode::new(DataSource::Part( + buffer2.freeze(Some(weight), true).unwrap().read().unwrap(), + )); check_merger_read( vec![node1, node2], @@ -497,15 +501,21 @@ mod tests { let mut seq = 0; write_rows_to_buffer(&mut buffer1, &metadata, 1, vec![2, 3], &mut seq); write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2], &mut seq); - let node1 = DataNode::new(DataSource::Buffer(buffer1.read(weight).unwrap())); + let node1 = DataNode::new(DataSource::Part( + buffer1.freeze(Some(weight), true).unwrap().read().unwrap(), + )); let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10); write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![3], &mut seq); - let node2 = DataNode::new(DataSource::Buffer(buffer2.read(weight).unwrap())); + let node2 = DataNode::new(DataSource::Part( + buffer2.freeze(Some(weight), true).unwrap().read().unwrap(), + )); let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10); write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2, 3], &mut seq); - let node3 = DataNode::new(DataSource::Buffer(buffer3.read(weight).unwrap())); + let node3 = DataNode::new(DataSource::Part( + buffer3.freeze(Some(weight), true).unwrap().read().unwrap(), + )); check_merger_read( vec![node1, node3, node2], @@ -528,15 +538,21 @@ mod tests { let weight = &[0, 1, 2]; let mut seq = 0; write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2, 3], &mut seq); - let node1 = DataNode::new(DataSource::Buffer(buffer1.read(weight).unwrap())); + let node1 = DataNode::new(DataSource::Part( + buffer1.freeze(Some(weight), true).unwrap().read().unwrap(), + )); let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10); write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![2, 3], &mut seq); - let node2 = DataNode::new(DataSource::Buffer(buffer2.read(weight).unwrap())); + let node2 = DataNode::new(DataSource::Part( + buffer2.freeze(Some(weight), true).unwrap().read().unwrap(), + )); let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10); write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2, 3], &mut seq); - let node3 = DataNode::new(DataSource::Buffer(buffer3.read(weight).unwrap())); + let node3 = DataNode::new(DataSource::Part( + buffer3.freeze(Some(weight), true).unwrap().read().unwrap(), + )); check_merger_read( vec![node1, node3, node2], @@ -558,18 +574,18 @@ mod tests { let weight = &[0, 1, 2]; let mut seq = 0; write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2, 3], &mut seq); - let node1 = DataNode::new(DataSource::Buffer(buffer1.read(weight).unwrap())); + let node1 = DataNode::new(DataSource::Buffer(buffer1.read(Some(weight)).unwrap())); let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10); write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![2, 3], &mut seq); let node2 = DataNode::new(DataSource::Part( - buffer2.freeze(weight).unwrap().read().unwrap(), + buffer2.freeze(Some(weight), true).unwrap().read().unwrap(), )); let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10); write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2, 3], &mut seq); let node3 = DataNode::new(DataSource::Part( - buffer3.freeze(weight).unwrap().read().unwrap(), + buffer3.freeze(Some(weight), true).unwrap().read().unwrap(), )); check_merger_read( @@ -592,18 +608,24 @@ mod tests { let weight = &[0, 1, 2]; let mut seq = 0; write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2, 2], &mut seq); - let node1 = DataNode::new(DataSource::Buffer(buffer1.read(weight).unwrap())); + let node1 = DataNode::new(DataSource::Part( + buffer1.freeze(Some(weight), true).unwrap().read().unwrap(), + )); + + let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10); + write_rows_to_buffer(&mut buffer2, &metadata, 0, vec![2], &mut seq); + let node2 = DataNode::new(DataSource::Part( + buffer2.freeze(Some(weight), true).unwrap().read().unwrap(), + )); let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10); write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2], &mut seq); - let node3 = DataNode::new(DataSource::Buffer(buffer3.read(weight).unwrap())); - - let mut buffer4 = DataBuffer::with_capacity(metadata.clone(), 10); - write_rows_to_buffer(&mut buffer4, &metadata, 0, vec![2], &mut seq); - let node4 = DataNode::new(DataSource::Buffer(buffer4.read(weight).unwrap())); + let node3 = DataNode::new(DataSource::Part( + buffer3.freeze(Some(weight), true).unwrap().read().unwrap(), + )); check_merger_read( - vec![node1, node3, node4], + vec![node1, node2, node3], &[ (0, vec![(1, 0)]), (0, vec![(2, 4)]), @@ -620,11 +642,15 @@ mod tests { let weight = &[0, 1, 2]; let mut seq = 0; write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![0, 1], &mut seq); - let node1 = DataNode::new(DataSource::Buffer(buffer1.read(weight).unwrap())); + let node1 = DataNode::new(DataSource::Part( + buffer1.freeze(Some(weight), true).unwrap().read().unwrap(), + )); let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10); write_rows_to_buffer(&mut buffer2, &metadata, 0, vec![1], &mut seq); - let node2 = DataNode::new(DataSource::Buffer(buffer2.read(weight).unwrap())); + let node2 = DataNode::new(DataSource::Part( + buffer2.freeze(Some(weight), true).unwrap().read().unwrap(), + )); check_merger_read( vec![node1, node2], diff --git a/src/mito2/src/memtable/merge_tree/shard_builder.rs b/src/mito2/src/memtable/merge_tree/shard_builder.rs index f9a32a17a563..96e33ce0698a 100644 --- a/src/mito2/src/memtable/merge_tree/shard_builder.rs +++ b/src/mito2/src/memtable/merge_tree/shard_builder.rs @@ -78,11 +78,11 @@ impl ShardBuilder { let data_part = match &key_dict { Some(dict) => { let pk_weights = dict.pk_weights_to_sort_data(); - self.data_buffer.freeze(&pk_weights)? + self.data_buffer.freeze(Some(&pk_weights), true)? } None => { let pk_weights = [0]; - self.data_buffer.freeze(&pk_weights)? + self.data_buffer.freeze(Some(&pk_weights), true)? } };