Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: distinguish between different read paths #3369

Merged
merged 2 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 143 additions & 28 deletions src/mito2/src/memtable/merge_tree/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,24 +146,35 @@ impl DataBuffer {
}
}

/// Freezes `DataBuffer` to bytes. Use `pk_weights` to sort rows and replace pk_index to pk_weights.
/// Freezes `DataBuffer` to bytes.
/// If `pk_weights` is present, it will be used to sort rows.
///
/// `freeze` clears the buffers of builders.
pub fn freeze(&mut self, pk_weights: &[u16]) -> Result<DataPart> {
let encoder = DataPartEncoder::new(&self.metadata, pk_weights, None);
pub fn freeze(
&mut self,
pk_weights: Option<&[u16]>,
replace_pk_index: bool,
) -> Result<DataPart> {
let encoder = DataPartEncoder::new(&self.metadata, pk_weights, None, replace_pk_index);
let parts = encoder.write(self)?;
Ok(parts)
}

/// Reads batches from data buffer without resetting builder's buffers.
pub fn read(&mut self, pk_weights: &[u16]) -> Result<DataBufferReader> {
/// If pk_weights is present, yielded rows are sorted according to weights,
/// otherwise rows are sorted by "pk_weights" values as they are actually weights.
pub fn read(&mut self, pk_weights: Option<&[u16]>) -> Result<DataBufferReader> {
// todo(hl): control whether to dedup while invoking `read`.
let batch = data_buffer_to_record_batches(
self.data_part_schema.clone(),
self,
pk_weights,
true,
true,
true,
// replace_pk_index is always set to false since:
// - for DataBuffer in ShardBuilder, pk dict is not frozen
// - for DataBuffer in Shard, values in pk_index column has already been replaced during `freeze`.
false,
)?;
DataBufferReader::new(batch)
}
Expand Down Expand Up @@ -208,7 +219,7 @@ impl LazyMutableVectorBuilder {
fn data_buffer_to_record_batches(
schema: SchemaRef,
buffer: &mut DataBuffer,
pk_weights: &[u16],
pk_weights: Option<&[u16]>,
keep_data: bool,
dedup: bool,
replace_pk_index: bool,
Expand Down Expand Up @@ -408,7 +419,7 @@ impl Ord for InnerKey {
}

fn build_rows_to_sort(
pk_weights: &[u16],
pk_weights: Option<&[u16]>,
pk_index: &UInt16Vector,
ts: &VectorRef,
sequence: &UInt64Vector,
Expand Down Expand Up @@ -453,11 +464,16 @@ fn build_rows_to_sort(
.zip(sequence_values.iter())
.enumerate()
.map(|(idx, ((timestamp, pk_index), sequence))| {
let pk_weight = if let Some(weights) = pk_weights {
weights[*pk_index as usize] // if pk_weights is present, sort according to weight.
} else {
*pk_index // otherwise pk_index has already been replaced by weights.
};
(
idx,
InnerKey {
timestamp: *timestamp,
pk_weight: pk_weights[*pk_index as usize],
pk_weight,
sequence: *sequence,
},
)
Expand Down Expand Up @@ -493,21 +509,24 @@ fn memtable_schema_to_encoded_schema(schema: &RegionMetadataRef) -> SchemaRef {

struct DataPartEncoder<'a> {
schema: SchemaRef,
pk_weights: &'a [u16],
pk_weights: Option<&'a [u16]>,
row_group_size: Option<usize>,
replace_pk_index: bool,
}

impl<'a> DataPartEncoder<'a> {
pub fn new(
metadata: &RegionMetadataRef,
pk_weights: &'a [u16],
pk_weights: Option<&'a [u16]>,
row_group_size: Option<usize>,
replace_pk_index: bool,
) -> DataPartEncoder<'a> {
let schema = memtable_schema_to_encoded_schema(metadata);
Self {
schema,
pk_weights,
row_group_size,
replace_pk_index,
}
}

Expand All @@ -528,7 +547,7 @@ impl<'a> DataPartEncoder<'a> {
self.pk_weights,
false,
true,
true,
self.replace_pk_index,
)?;
writer.write(&rb).context(error::EncodeMemtableSnafu)?;
let _metadata = writer.close().context(error::EncodeMemtableSnafu)?;
Expand Down Expand Up @@ -689,19 +708,20 @@ impl DataParts {
}

/// Freezes the active data buffer into frozen data parts.
pub fn freeze(&mut self, pk_weights: &[u16]) -> Result<()> {
self.frozen.push(self.active.freeze(pk_weights)?);
pub fn freeze(&mut self) -> Result<()> {
self.frozen.push(self.active.freeze(None, false)?);
Ok(())
}

/// Reads data from all parts including active and frozen parts.
/// The returned iterator yields a record batch of one primary key at a time.
/// The order of yielding primary keys is determined by provided weights.
/// todo(hl): read may not take any pk weights if is read by `Shard`.
pub fn read(&mut self, pk_weights: &[u16]) -> Result<DataPartsReader> {
pub fn read(&mut self) -> Result<DataPartsReader> {
let mut nodes = Vec::with_capacity(self.frozen.len() + 1);
nodes.push(DataNode::new(DataSource::Buffer(
self.active.read(pk_weights)?,
// `DataPars::read` ensures that all pk_index inside `DataBuffer` are replaced by weights.
// then we pass None to sort rows directly according to pk_index.
self.active.read(None)?,
)));
for p in &self.frozen {
nodes.push(DataNode::new(DataSource::Part(p.read()?)));
Expand Down Expand Up @@ -742,6 +762,7 @@ mod tests {
use parquet::data_type::AsBytes;

use super::*;
use crate::memtable::merge_tree::merger::timestamp_array_to_i64_slice;
use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};

#[test]
Expand Down Expand Up @@ -773,9 +794,15 @@ mod tests {
write_rows_to_buffer(&mut buffer, &meta, 0, vec![2], vec![Some(1.1)], 3);
assert_eq!(5, buffer.num_rows());
let schema = memtable_schema_to_encoded_schema(&meta);
let batch =
data_buffer_to_record_batches(schema, &mut buffer, &[3, 1], keep_data, true, true)
.unwrap();
let batch = data_buffer_to_record_batches(
schema,
&mut buffer,
Some(&[3, 1]),
keep_data,
true,
true,
)
.unwrap();

assert_eq!(
vec![1, 2, 1, 2],
Expand Down Expand Up @@ -839,7 +866,8 @@ mod tests {
assert_eq!(4, buffer.num_rows());
let schema = memtable_schema_to_encoded_schema(&meta);
let batch =
data_buffer_to_record_batches(schema, &mut buffer, &[0, 1], true, true, true).unwrap();
data_buffer_to_record_batches(schema, &mut buffer, Some(&[0, 1]), true, true, true)
.unwrap();

assert_eq!(3, batch.num_rows());
assert_eq!(
Expand Down Expand Up @@ -893,7 +921,8 @@ mod tests {
assert_eq!(5, buffer.num_rows());
let schema = memtable_schema_to_encoded_schema(&meta);
let batch =
data_buffer_to_record_batches(schema, &mut buffer, &[3, 1], true, false, true).unwrap();
data_buffer_to_record_batches(schema, &mut buffer, Some(&[3, 1]), true, false, true)
.unwrap();

assert_eq!(
vec![1, 1, 3, 3, 3],
Expand Down Expand Up @@ -944,6 +973,80 @@ mod tests {
}
}

fn check_data_buffer_freeze(
pk_weights: Option<&[u16]>,
replace_pk_weights: bool,
expected: &[(u16, Vec<(i64, u64)>)],
) {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);

// write rows with null values.
write_rows_to_buffer(
&mut buffer,
&meta,
0,
vec![0, 1, 2],
vec![Some(1.0), None, Some(3.0)],
0,
);
write_rows_to_buffer(&mut buffer, &meta, 1, vec![1], vec![Some(2.0)], 3);

let mut res = Vec::with_capacity(3);
let mut reader = buffer
.freeze(pk_weights, replace_pk_weights)
.unwrap()
.read()
.unwrap();
while reader.is_valid() {
let batch = reader.current_data_batch();
let rb = batch.slice_record_batch();
let ts = timestamp_array_to_i64_slice(rb.column(1));
let sequence = rb
.column(2)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap()
.values();
let ts_and_seq = ts
.iter()
.zip(sequence.iter())
.map(|(ts, seq)| (*ts, *seq))
.collect::<Vec<_>>();
res.push((batch.pk_index, ts_and_seq));

reader.next().unwrap();
}
assert_eq!(expected, res);
}

#[test]
fn test_data_buffer_freeze() {
check_data_buffer_freeze(
None,
false,
&[(0, vec![(0, 0), (1, 1), (2, 2)]), (1, vec![(1, 3)])],
);

check_data_buffer_freeze(
Some(&[1, 2]),
false,
&[(0, vec![(0, 0), (1, 1), (2, 2)]), (1, vec![(1, 3)])],
);

check_data_buffer_freeze(
Some(&[3, 2]),
true,
&[(2, vec![(1, 3)]), (3, vec![(0, 0), (1, 1), (2, 2)])],
);

check_data_buffer_freeze(
Some(&[3, 2]),
false,
&[(1, vec![(1, 3)]), (0, vec![(0, 0), (1, 1), (2, 2)])],
);
}

#[test]
fn test_encode_data_buffer() {
let meta = metadata_for_test();
Expand All @@ -965,7 +1068,7 @@ mod tests {

assert_eq!(4, buffer.num_rows());

let encoder = DataPartEncoder::new(&meta, &[0, 1, 2], None);
let encoder = DataPartEncoder::new(&meta, Some(&[0, 1, 2]), None, true);
let encoded = match encoder.write(&mut buffer).unwrap() {
DataPart::Parquet(data) => data.data,
};
Expand Down Expand Up @@ -1010,8 +1113,7 @@ mod tests {
assert_eq!(None, search_next_pk_range(&a, 6));
}

#[test]
fn test_iter_data_buffer() {
fn check_iter_data_buffer(pk_weights: Option<&[u16]>, expected: &[Vec<f64>]) {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);

Expand All @@ -1033,15 +1135,28 @@ mod tests {
2,
);

let mut iter = buffer.read(&[0, 1, 3, 2]).unwrap();
check_buffer_values_equal(&mut iter, &[vec![1.1, 2.1, 3.1], vec![1.0, 2.0, 3.0]]);
let mut iter = buffer.read(pk_weights).unwrap();
check_buffer_values_equal(&mut iter, expected);
}

#[test]
fn test_iter_data_buffer() {
check_iter_data_buffer(None, &[vec![1.0, 2.0, 3.0], vec![1.1, 2.1, 3.1]]);
check_iter_data_buffer(
Some(&[0, 1, 2, 3]),
&[vec![1.0, 2.0, 3.0], vec![1.1, 2.1, 3.1]],
);
check_iter_data_buffer(
Some(&[3, 2, 1, 0]),
&[vec![1.1, 2.1, 3.1], vec![1.0, 2.0, 3.0]],
);
}

#[test]
fn test_iter_empty_data_buffer() {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);
let mut iter = buffer.read(&[0, 1, 3, 2]).unwrap();
let mut iter = buffer.read(Some(&[0, 1, 3, 2])).unwrap();
check_buffer_values_equal(&mut iter, &[]);
}

Expand Down Expand Up @@ -1095,7 +1210,7 @@ mod tests {
4,
);

let encoder = DataPartEncoder::new(&meta, weights, Some(4));
let encoder = DataPartEncoder::new(&meta, Some(weights), Some(4), true);
let encoded = encoder.write(&mut buffer).unwrap();

let mut iter = encoded.read().unwrap();
Expand Down
Loading