Skip to content

Commit

Permalink
feat(mito): implements sort/concat for Batch
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed Aug 18, 2023
1 parent 9784754 commit 719a98f
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 13 deletions.
3 changes: 2 additions & 1 deletion src/datatypes/src/vectors/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ impl BooleanVector {
&self.array
}

pub(crate) fn as_boolean_array(&self) -> &BooleanArray {
/// Get the inner boolean array.
pub fn as_boolean_array(&self) -> &BooleanArray {
&self.array
}

Expand Down
3 changes: 2 additions & 1 deletion src/datatypes/src/vectors/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ impl<T: LogicalPrimitiveType> PrimitiveVector<T> {
}
}

pub(crate) fn as_arrow(&self) -> &PrimitiveArray<T::ArrowPrimitive> {
/// Get the inner arrow array.
pub fn as_arrow(&self) -> &PrimitiveArray<T::ArrowPrimitive> {
&self.array
}

Expand Down
25 changes: 24 additions & 1 deletion src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,12 @@ pub enum Error {
location: Location,
},

#[snafu(display("Invalid parquet SST file {}, reason: {}", file, reason))]
#[snafu(display(
"Invalid parquet SST file {}, location: {}, reason: {}",
file,
location,
reason
))]
InvalidParquet {
file: String,
reason: String,
Expand All @@ -331,6 +336,22 @@ pub enum Error {
location: Location,
source: datatypes::error::Error,
},

#[snafu(display(
"Failed to compute arrow arrays, location: {}, source: {}",
location,
source
))]
ComputeArrow {
location: Location,
source: datatypes::arrow::error::ArrowError,
},

#[snafu(display("Failed to compute vector, location: {}, source: {}", location, source))]
ComputeVector {
location: Location,
source: datatypes::error::Error,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -385,6 +406,8 @@ impl ErrorExt for Error {
InvalidBatch { .. } => StatusCode::InvalidArguments,
InvalidRecordBatch { .. } => StatusCode::InvalidArguments,
ConvertVector { source, .. } => source.status_code(),
ComputeArrow { .. } => StatusCode::Internal,
ComputeVector { .. } => StatusCode::Internal,
}
}

Expand Down
208 changes: 199 additions & 9 deletions src/mito2/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,25 @@
use std::sync::Arc;

use api::v1::OpType;
use async_trait::async_trait;
use common_time::Timestamp;
use datatypes::arrow;
use datatypes::arrow::array::ArrayRef;
use datatypes::prelude::DataType;
use datatypes::arrow::array::{Array, ArrayRef};
use datatypes::arrow::compute::SortOptions;
use datatypes::arrow::row::{RowConverter, SortField};
use datatypes::prelude::{DataType, ScalarVector};
use datatypes::value::ValueRef;
use datatypes::vectors::{Helper, UInt64Vector, UInt8Vector, Vector, VectorRef};
use datatypes::vectors::{
BooleanVector, Helper, UInt32Vector, UInt64Vector, UInt8Vector, Vector, VectorRef,
};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use store_api::storage::{ColumnId, SequenceNumber};

use crate::error::{ConvertVectorSnafu, InvalidBatchSnafu, Result};
use crate::error::{
ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, InvalidBatchSnafu, Result,
};

/// Storage internal representation of a batch of rows
/// for a primary key (time series).
Expand Down Expand Up @@ -92,9 +99,9 @@ impl Batch {

/// Returns the number of rows in the batch.
pub fn num_rows(&self) -> usize {
// All vectors have the same length so we use
// the length of timestamps vector.
self.timestamps.len()
// All vectors have the same length. We use the length of sequences vector
// since it has static type.
self.sequences.len()
}

/// Returns true if the number of rows in the batch is 0.
Expand All @@ -112,6 +119,16 @@ impl Batch {
self.get_timestamp(self.timestamps.len() - 1)
}

/// Returns the first sequence in the batch or `None` if the batch is empty.
pub fn first_sequence(&self) -> Option<SequenceNumber> {
self.get_sequence(0)
}

/// Returns the last sequence in the batch or `None` if the batch is empty.
pub fn last_sequence(&self) -> Option<SequenceNumber> {
self.get_sequence(self.sequences.len() - 1)
}

/// Slice the batch, returning a new batch.
///
/// # Panics
Expand All @@ -137,7 +154,153 @@ impl Batch {
}
}

/// Get a timestamp at given index.
/// Takes `batches` and concat them into one batch.
///
/// All `batches` must have the same primary key.
pub fn concat(mut batches: Vec<Batch>) -> Result<Batch> {
ensure!(
!batches.is_empty(),
InvalidBatchSnafu {
reason: "empty batches",
}
);
if batches.len() == 1 {
// Now we own the `batches` so we could pop it directly.
return Ok(batches.pop().unwrap());
}

let primary_key = std::mem::take(&mut batches[0].primary_key);
let first = &batches[0];
// We took the primary key from the first batch so we don't use `first.primary_key()`.
ensure!(
batches
.iter()
.skip(1)
.all(|b| b.primary_key() == &primary_key),
InvalidBatchSnafu {
reason: "batches have different primary key",
}
);
ensure!(
batches
.iter()
.skip(1)
.all(|b| b.fields().len() == first.fields().len()),
InvalidBatchSnafu {
reason: "batches have different field num",
}
);

// We take the primary key from the first batch.
let mut builder = BatchBuilder::new(primary_key);
// Concat timestamps, sequences, op_types, fields.
let array = concat_arrays(batches.iter().map(|b| b.timestamps().to_arrow_array()))?;
builder.timestamps_array(array)?;
let array = concat_arrays(batches.iter().map(|b| b.sequences().to_arrow_array()))?;
builder.sequences_array(array)?;
let array = concat_arrays(batches.iter().map(|b| b.op_types().to_arrow_array()))?;
builder.op_types_array(array)?;
for (i, batch_column) in first.fields.iter().enumerate() {
let array = concat_arrays(batches.iter().map(|b| b.fields()[i].data.to_arrow_array()))?;
builder.push_field_array(batch_column.column_id, array)?;
}

builder.build()
}

/// Removes rows whose op type is delete.
pub fn filter_deleted(&mut self) -> Result<()> {
// Safety: op type column is not null.
let array = self.op_types.as_arrow();
// Find rows with non-delete op type.
let predicate =
arrow::compute::neq_scalar(array, OpType::Delete as u8).context(ComputeArrowSnafu)?;
self.filter(&BooleanVector::from(predicate))
}

// Applies the `predicate` to the batch.
// Safety: We know the array type so we unwrap on casting.
pub fn filter(&mut self, predicate: &BooleanVector) -> Result<()> {
self.timestamps = self
.timestamps
.filter(predicate)
.context(ComputeVectorSnafu)?;
self.sequences = Arc::new(
UInt64Vector::try_from_arrow_array(
arrow::compute::filter(self.sequences.as_arrow(), predicate.as_boolean_array())
.context(ComputeArrowSnafu)?,
)
.unwrap(),
);
self.op_types = Arc::new(
UInt8Vector::try_from_arrow_array(
arrow::compute::filter(self.op_types.as_arrow(), predicate.as_boolean_array())
.context(ComputeArrowSnafu)?,
)
.unwrap(),
);
for batch_column in &mut self.fields {
batch_column.data = batch_column
.data
.filter(predicate)
.context(ComputeVectorSnafu)?;
}

Ok(())
}

/// Sorts rows in the batch.
///
/// It orders rows by timestamp, sequence desc. It doesn't consider op type as sequence
/// should already provide uniqueness for a row.
pub fn sort(&mut self) -> Result<()> {
// If building a converter each time is costly, we may allow passing a
// converter.
let mut converter = RowConverter::new(vec![
SortField::new(self.timestamps.data_type().as_arrow_type()),
SortField::new_with_options(
self.sequences.data_type().as_arrow_type(),
SortOptions {
descending: true,
..Default::default()
},
),
])
.context(ComputeArrowSnafu)?;
// Columns to sort.
let columns = [
self.timestamps.to_arrow_array(),
self.sequences.to_arrow_array(),
];
let rows = converter.convert_columns(&columns).unwrap();
let mut to_sort: Vec<_> = rows.iter().enumerate().collect();
to_sort.sort_unstable_by(|left, right| left.1.cmp(&right.1));

let indices = UInt32Vector::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
self.take_in_place(&indices)
}

/// Takes the batch in place.
fn take_in_place(&mut self, indices: &UInt32Vector) -> Result<()> {
self.timestamps = self.timestamps.take(indices).context(ComputeVectorSnafu)?;
let array = arrow::compute::take(self.sequences.as_arrow(), indices.as_arrow(), None)
.context(ComputeArrowSnafu)?;
// Safety: we know the array and vector type.
self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
let array = arrow::compute::take(self.op_types.as_arrow(), indices.as_arrow(), None)
.context(ComputeArrowSnafu)?;
self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
for batch_column in &mut self.fields {
batch_column.data = batch_column
.data
.take(indices)
.context(ComputeVectorSnafu)?;
}

Ok(())
}

/// Gets a timestamp at given `index`.
fn get_timestamp(&self, index: usize) -> Option<Timestamp> {
if self.timestamps.is_empty() {
return None;
Expand All @@ -152,8 +315,30 @@ impl Batch {
value => panic!("{:?} is not a timestmap", value),
}
}

/// Gets a sequence at given `index`.
fn get_sequence(&self, index: usize) -> Option<SequenceNumber> {
if self.sequences.is_empty() {
return None;
}

// Sequences is not null so it actually returns Some.
self.sequences.get_data(index)
}
}

/// Helper function to concat arrays from `iter`.
fn concat_arrays(iter: impl Iterator<Item = ArrayRef>) -> Result<ArrayRef> {
let arrays: Vec<_> = iter.collect();
let dyn_arrays: Vec<_> = arrays.iter().map(|array| array.as_ref()).collect();
arrow::compute::concat(&dyn_arrays).context(ComputeArrowSnafu)
}

// /// Helper function to take array by `indices`.
// fn take_array(array: &dyn Array, indices: &Int32Vector) -> Result<ArrayRef> {
// arrow::compute::take(array, indices.as_arrow(), None).context(ComputeArrowSnafu)?;
// }

/// A column in a [Batch].
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct BatchColumn {
Expand Down Expand Up @@ -278,6 +463,11 @@ impl BatchBuilder {
let op_types = self.op_types.context(InvalidBatchSnafu {
reason: "missing op_types",
})?;
// Our storage format ensure these columns are not nullable so
// we use assert here.
assert_eq!(0, timestamps.null_count());
assert_eq!(0, sequences.null_count());
assert_eq!(0, op_types.null_count());

let ts_len = timestamps.len();
ensure!(
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ pub trait BatchReader: Send {
/// Returns `Ok(None)` when the reader has reached its end and calling `next_batch()`
/// again won't return batch again.
///
/// If `Err` is returned, caller should not call this method again, the implementor
/// If `Err` is returned, caller **must** not call this method again, the implementor
/// may or may not panic in such case.
async fn next_batch(&mut self) -> Result<Option<Batch>>;
}
Expand Down

0 comments on commit 719a98f

Please sign in to comment.