Skip to content

Commit

Permalink
refactor: use Batch::sort_and_dedup instead of Values::sort_in_place (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
v0y4g3r authored Aug 23, 2023
1 parent d581688 commit fdb5ad2
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 84 deletions.
7 changes: 0 additions & 7 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,12 +366,6 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to sort values source: {}, location: {}", source, location))]
SortValues {
source: ArrowError,
location: Location,
},

#[snafu(display("Failed to compact values, source: {}, location: {}", source, location))]
CompactValues {
source: datatypes::error::Error,
Expand Down Expand Up @@ -446,7 +440,6 @@ impl ErrorExt for Error {
ComputeArrow { .. } => StatusCode::Internal,
ComputeVector { .. } => StatusCode::Internal,
PrimaryKeyLengthMismatch { .. } => StatusCode::InvalidArguments,
SortValues { .. } => StatusCode::Unexpected,
CompactValues { source, .. } => source.status_code(),
InvalidFlumeSender { .. } => StatusCode::InvalidArguments,
InvalidSchedulerState { .. } => StatusCode::InvalidArguments,
Expand Down
119 changes: 42 additions & 77 deletions src/mito2/src/memtable/time_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,15 @@ use std::fmt::{Debug, Formatter};
use std::sync::{Arc, RwLock};

use api::v1::OpType;
use datatypes::arrow;
use datatypes::arrow::row::RowConverter;
use datatypes::data_type::DataType;
use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector, VectorRef};
use datatypes::value::ValueRef;
use datatypes::vectors::{
Helper, UInt64Vector, UInt64VectorBuilder, UInt8Vector, UInt8VectorBuilder,
};
use datatypes::vectors::{UInt64Vector, UInt64VectorBuilder, UInt8Vector, UInt8VectorBuilder};
use snafu::{ensure, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ScanRequest;

use crate::error::{
CompactValuesSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result, SortValuesSnafu,
};
use crate::error::{CompactValuesSnafu, PrimaryKeyLengthMismatchSnafu, Result};
use crate::memtable::{BoxedBatchIterator, KeyValues, Memtable, MemtableId};
use crate::read::{Batch, BatchBuilder, BatchColumn};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
Expand Down Expand Up @@ -345,46 +339,8 @@ struct Values {
}

impl Values {
/// Sorts values in place by `timestamp, sequence, op_type`.
fn sort_in_place(&mut self) -> Result<()> {
let mut arrays = Vec::with_capacity(3 + self.fields.len());
arrays.push(self.timestamp.to_arrow_array());
arrays.push(self.sequence.to_arrow_array());
arrays.push(self.op_type.to_arrow_array());
arrays.extend(self.fields.iter().map(|f| f.to_arrow_array()));

// only sort by timestamp and sequence.
let fields = arrays
.iter()
.take(2)
.map(|v| arrow::row::SortField::new(v.data_type().clone()))
.collect();

let mut converter = RowConverter::new(fields).context(SortValuesSnafu)?;
let rows = converter
.convert_columns(&arrays[0..2])
.context(SortValuesSnafu)?;
let mut sort_pairs = rows.iter().enumerate().collect::<Vec<_>>();
sort_pairs.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
let indices =
arrow::array::UInt32Array::from_iter_values(sort_pairs.iter().map(|(i, _)| *i as u32));

let res = arrays
.into_iter()
.map(|arr| arrow::compute::take(&arr, &indices, None))
.collect::<arrow::error::Result<Vec<_>>>()
.context(SortValuesSnafu)?;

self.timestamp = Helper::try_into_vector(&res[0]).context(ConvertVectorSnafu)?;
self.sequence =
Arc::new(UInt64Vector::try_from_arrow_array(&res[1]).context(ConvertVectorSnafu)?);
self.op_type =
Arc::new(UInt8Vector::try_from_arrow_array(&res[2]).context(ConvertVectorSnafu)?);
self.fields = Helper::try_into_vectors(&res[3..]).context(ConvertVectorSnafu)?;
Ok(())
}

/// Converts [Values] to `Batch`.
/// Converts [Values] to `Batch`, sorts the batch according to `timestamp, sequence` desc and
/// keeps only the latest row for the same timestamp.
pub fn to_batch(&self, primary_key: &[u8], metadata: &RegionMetadataRef) -> Result<Batch> {
let builder = BatchBuilder::with_required_columns(
primary_key.to_vec(),
Expand All @@ -402,7 +358,9 @@ impl Values {
})
.collect();

builder.with_fields(fields).build()
let mut batch = builder.with_fields(fields).build()?;
batch.sort_and_dedup()?;
Ok(batch)
}
}

Expand Down Expand Up @@ -445,9 +403,7 @@ mod tests {
use datatypes::prelude::{ConcreteDataType, ScalarVector};
use datatypes::schema::ColumnSchema;
use datatypes::value::{OrderedFloat, Value};
use datatypes::vectors::{
Float32Vector, Float64Vector, Int64Vector, TimestampMillisecondVector,
};
use datatypes::vectors::{Float64Vector, Int64Vector, TimestampMillisecondVector};
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;

Expand Down Expand Up @@ -547,20 +503,21 @@ mod tests {
assert_eq!(1, series.frozen.len());
}

fn check_value(values: &Values, expect: Vec<Vec<Value>>) {
assert_eq!(values.sequence.len(), values.timestamp.len());
assert_eq!(values.op_type.len(), values.timestamp.len());
for f in &values.fields {
assert_eq!(f.len(), values.timestamp.len());
fn check_value(batch: &Batch, expect: Vec<Vec<Value>>) {
let ts_len = batch.timestamps().len();
assert_eq!(batch.sequences().len(), ts_len);
assert_eq!(batch.op_types().len(), ts_len);
for f in batch.fields() {
assert_eq!(f.data.len(), ts_len);
}

let mut rows = vec![];
for idx in 0..values.timestamp.len() {
let mut row = Vec::with_capacity(values.fields.len() + 3);
row.push(values.timestamp.get(idx));
row.push(values.sequence.get(idx));
row.push(values.op_type.get(idx));
row.extend(values.fields.iter().map(|f| f.get(idx)));
for idx in 0..ts_len {
let mut row = Vec::with_capacity(batch.fields().len() + 3);
row.push(batch.timestamps().get(idx));
row.push(batch.sequences().get(idx));
row.push(batch.op_types().get(idx));
row.extend(batch.fields().iter().map(|f| f.data.get(idx)));
rows.push(row);
}

Expand All @@ -572,45 +529,53 @@ mod tests {

#[test]
fn test_values_sort() {
let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 4, 3]));
let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 1, 1, 0]));
let op_type = Arc::new(UInt8Vector::from_vec(vec![1, 1, 1, 1]));

let fields = vec![Arc::new(Float32Vector::from_vec(vec![1.1, 2.1, 3.3, 4.2])) as Arc<_>];
let mut values = Values {
let schema = schema_for_test();
let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 3, 4, 3]));
let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 1, 1, 1, 2]));
let op_type = Arc::new(UInt8Vector::from_vec(vec![1, 1, 1, 1, 0]));

let fields = vec![
Arc::new(Int64Vector::from_vec(vec![4, 3, 2, 1, 2])) as Arc<_>,
Arc::new(Float64Vector::from_vec(vec![1.1, 2.1, 4.2, 3.3, 4.2])) as Arc<_>,
];
let values = Values {
timestamp: timestamp as Arc<_>,
sequence,
op_type,
fields,
};
values.sort_in_place().unwrap();

let batch = values.to_batch(b"test", &schema).unwrap();
check_value(
&values,
&batch,
vec![
vec![
Value::Timestamp(Timestamp::new_millisecond(1)),
Value::UInt64(1),
Value::UInt8(1),
Value::Float32(OrderedFloat(1.1)),
Value::Int64(4),
Value::Float64(OrderedFloat(1.1)),
],
vec![
Value::Timestamp(Timestamp::new_millisecond(2)),
Value::UInt64(1),
Value::UInt8(1),
Value::Float32(OrderedFloat(2.1)),
Value::Int64(3),
Value::Float64(OrderedFloat(2.1)),
],
vec![
Value::Timestamp(Timestamp::new_millisecond(3)),
Value::UInt64(0),
Value::UInt8(1),
Value::Float32(OrderedFloat(4.2)),
Value::UInt64(2),
Value::UInt8(0),
Value::Int64(2),
Value::Float64(OrderedFloat(4.2)),
],
vec![
Value::Timestamp(Timestamp::new_millisecond(4)),
Value::UInt64(1),
Value::UInt8(1),
Value::Float32(OrderedFloat(3.3)),
Value::Int64(1),
Value::Float64(OrderedFloat(3.3)),
],
],
)
Expand Down

0 comments on commit fdb5ad2

Please sign in to comment.