diff --git a/src/datatypes/src/vectors/boolean.rs b/src/datatypes/src/vectors/boolean.rs index 6aa483cf372a..61dd86a434bf 100644 --- a/src/datatypes/src/vectors/boolean.rs +++ b/src/datatypes/src/vectors/boolean.rs @@ -39,7 +39,8 @@ impl BooleanVector { &self.array } - pub(crate) fn as_boolean_array(&self) -> &BooleanArray { + /// Get the inner boolean array. + pub fn as_boolean_array(&self) -> &BooleanArray { &self.array } diff --git a/src/datatypes/src/vectors/primitive.rs b/src/datatypes/src/vectors/primitive.rs index 5cbeeee4916d..670211948b7c 100644 --- a/src/datatypes/src/vectors/primitive.rs +++ b/src/datatypes/src/vectors/primitive.rs @@ -230,7 +230,8 @@ impl PrimitiveVector { } } - pub(crate) fn as_arrow(&self) -> &PrimitiveArray { + /// Get the inner arrow array. + pub fn as_arrow(&self) -> &PrimitiveArray { &self.array } @@ -245,7 +246,11 @@ impl PrimitiveVector { } // To distinguish with `Vector::slice()`. - fn get_slice(&self, offset: usize, length: usize) -> Self { + /// Slice the batch, returning a new batch. + /// + /// # Panics + /// This function panics if `offset + length > self.len()`. + pub fn get_slice(&self, offset: usize, length: usize) -> Self { let data = self.array.to_data().slice(offset, length); Self::from_array_data(data) } @@ -295,8 +300,7 @@ impl Vector for PrimitiveVector { } fn slice(&self, offset: usize, length: usize) -> VectorRef { - let data = self.array.to_data().slice(offset, length); - Arc::new(Self::from_array_data(data)) + Arc::new(self.get_slice(offset, length)) } fn get(&self, index: usize) -> Value { diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 2f7a585f82e3..453197643b0c 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -309,7 +309,12 @@ pub enum Error { location: Location, }, - #[snafu(display("Invalid parquet SST file {}, reason: {}", file, reason))] + #[snafu(display( + "Invalid parquet SST file {}, location: {}, reason: {}", + file, + location, + reason + ))] InvalidParquet { file: String, reason: String, @@ -332,6 +337,22 @@ pub enum Error { source: datatypes::error::Error, }, + #[snafu(display( + "Failed to compute arrow arrays, location: {}, source: {}", + location, + source + ))] + ComputeArrow { + location: Location, + source: datatypes::arrow::error::ArrowError, + }, + + #[snafu(display("Failed to compute vector, location: {}, source: {}", location, source))] + ComputeVector { + location: Location, + source: datatypes::error::Error, + }, + #[snafu(display( "Primary key length mismatch, expect: {}, actual: {}, location: {}", expect, @@ -409,6 +430,8 @@ impl ErrorExt for Error { InvalidBatch { .. } => StatusCode::InvalidArguments, InvalidRecordBatch { .. } => StatusCode::InvalidArguments, ConvertVector { source, .. } => source.status_code(), + ComputeArrow { .. } => StatusCode::Internal, + ComputeVector { .. } => StatusCode::Internal, PrimaryKeyLengthMismatch { .. } => StatusCode::InvalidArguments, SortValues { .. } => StatusCode::Unexpected, CompactValues { source, .. } => source.status_code(), diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index f9f919581ac6..ce34898876b7 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -16,17 +16,25 @@ use std::sync::Arc; +use api::v1::OpType; use async_trait::async_trait; use common_time::Timestamp; use datatypes::arrow; -use datatypes::arrow::array::ArrayRef; -use datatypes::prelude::DataType; -use datatypes::vectors::{Helper, UInt64Vector, UInt8Vector, Vector, VectorRef}; +use datatypes::arrow::array::{Array, ArrayRef}; +use datatypes::arrow::compute::SortOptions; +use datatypes::arrow::row::{RowConverter, SortField}; +use datatypes::prelude::{DataType, ScalarVector}; +use datatypes::value::ValueRef; +use datatypes::vectors::{ + BooleanVector, Helper, UInt32Vector, UInt64Vector, UInt8Vector, Vector, VectorRef, +}; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; -use store_api::storage::ColumnId; +use store_api::storage::{ColumnId, SequenceNumber}; -use crate::error::{ConvertVectorSnafu, InvalidBatchSnafu, Result}; +use crate::error::{ + ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, InvalidBatchSnafu, Result, +}; /// Storage internal representation of a batch of rows /// for a primary key (time series). @@ -91,15 +99,266 @@ impl Batch { /// Returns the number of rows in the batch. pub fn num_rows(&self) -> usize { - // All vectors have the same length so we use - // the length of timestamps vector. - self.timestamps.len() + // All vectors have the same length. We use the length of sequences vector + // since it has static type. + self.sequences.len() } /// Returns true if the number of rows in the batch is 0. pub fn is_empty(&self) -> bool { self.num_rows() == 0 } + + /// Returns the first timestamp in the batch. + pub fn first_timestamp(&self) -> Option { + if self.timestamps.is_empty() { + return None; + } + + Some(self.get_timestamp(0)) + } + + /// Returns the last timestamp in the batch. + pub fn last_timestamp(&self) -> Option { + if self.timestamps.is_empty() { + return None; + } + + Some(self.get_timestamp(self.timestamps.len() - 1)) + } + + /// Returns the first sequence in the batch or `None` if the batch is empty. + pub fn first_sequence(&self) -> Option { + if self.sequences.is_empty() { + return None; + } + + Some(self.get_sequence(0)) + } + + /// Returns the last sequence in the batch or `None` if the batch is empty. + pub fn last_sequence(&self) -> Option { + if self.sequences.is_empty() { + return None; + } + + Some(self.get_sequence(self.sequences.len() - 1)) + } + + /// Slice the batch, returning a new batch. + /// + /// # Panics + /// Panics if `offset + length > self.num_rows()`. + pub fn slice(&self, offset: usize, length: usize) -> Batch { + let fields = self + .fields + .iter() + .map(|column| BatchColumn { + column_id: column.column_id, + data: column.data.slice(offset, length), + }) + .collect(); + // We skip using the builder to avoid validating the batch again. + Batch { + // Now we need to clone the primary key. We could try `Bytes` if + // this becomes a bottleneck. + primary_key: self.primary_key.clone(), + timestamps: self.timestamps.slice(offset, length), + sequences: Arc::new(self.sequences.get_slice(offset, length)), + op_types: Arc::new(self.op_types.get_slice(offset, length)), + fields, + } + } + + /// Takes `batches` and concat them into one batch. + /// + /// All `batches` must have the same primary key. + pub fn concat(mut batches: Vec) -> Result { + ensure!( + !batches.is_empty(), + InvalidBatchSnafu { + reason: "empty batches", + } + ); + if batches.len() == 1 { + // Now we own the `batches` so we could pop it directly. + return Ok(batches.pop().unwrap()); + } + + let primary_key = std::mem::take(&mut batches[0].primary_key); + let first = &batches[0]; + // We took the primary key from the first batch so we don't use `first.primary_key()`. + ensure!( + batches + .iter() + .skip(1) + .all(|b| b.primary_key() == primary_key), + InvalidBatchSnafu { + reason: "batches have different primary key", + } + ); + ensure!( + batches + .iter() + .skip(1) + .all(|b| b.fields().len() == first.fields().len()), + InvalidBatchSnafu { + reason: "batches have different field num", + } + ); + + // We take the primary key from the first batch. + let mut builder = BatchBuilder::new(primary_key); + // Concat timestamps, sequences, op_types, fields. + let array = concat_arrays(batches.iter().map(|b| b.timestamps().to_arrow_array()))?; + builder.timestamps_array(array)?; + let array = concat_arrays(batches.iter().map(|b| b.sequences().to_arrow_array()))?; + builder.sequences_array(array)?; + let array = concat_arrays(batches.iter().map(|b| b.op_types().to_arrow_array()))?; + builder.op_types_array(array)?; + for (i, batch_column) in first.fields.iter().enumerate() { + let array = concat_arrays(batches.iter().map(|b| b.fields()[i].data.to_arrow_array()))?; + builder.push_field_array(batch_column.column_id, array)?; + } + + builder.build() + } + + /// Removes rows whose op type is delete. + pub fn filter_deleted(&mut self) -> Result<()> { + // Safety: op type column is not null. + let array = self.op_types.as_arrow(); + // Find rows with non-delete op type. + let predicate = + arrow::compute::neq_scalar(array, OpType::Delete as u8).context(ComputeArrowSnafu)?; + self.filter(&BooleanVector::from(predicate)) + } + + // Applies the `predicate` to the batch. + // Safety: We know the array type so we unwrap on casting. + pub fn filter(&mut self, predicate: &BooleanVector) -> Result<()> { + self.timestamps = self + .timestamps + .filter(predicate) + .context(ComputeVectorSnafu)?; + self.sequences = Arc::new( + UInt64Vector::try_from_arrow_array( + arrow::compute::filter(self.sequences.as_arrow(), predicate.as_boolean_array()) + .context(ComputeArrowSnafu)?, + ) + .unwrap(), + ); + self.op_types = Arc::new( + UInt8Vector::try_from_arrow_array( + arrow::compute::filter(self.op_types.as_arrow(), predicate.as_boolean_array()) + .context(ComputeArrowSnafu)?, + ) + .unwrap(), + ); + for batch_column in &mut self.fields { + batch_column.data = batch_column + .data + .filter(predicate) + .context(ComputeVectorSnafu)?; + } + + Ok(()) + } + + /// Sorts and dedup rows in the batch. + /// + /// It orders rows by timestamp, sequence desc and only keep the latest + /// row for the same timestamp. It doesn't consider op type as sequence + /// should already provide uniqueness for a row. + pub fn sort_and_dedup(&mut self) -> Result<()> { + // If building a converter each time is costly, we may allow passing a + // converter. + let mut converter = RowConverter::new(vec![ + SortField::new(self.timestamps.data_type().as_arrow_type()), + SortField::new_with_options( + self.sequences.data_type().as_arrow_type(), + SortOptions { + descending: true, + ..Default::default() + }, + ), + ]) + .context(ComputeArrowSnafu)?; + // Columns to sort. + let columns = [ + self.timestamps.to_arrow_array(), + self.sequences.to_arrow_array(), + ]; + let rows = converter.convert_columns(&columns).unwrap(); + let mut to_sort: Vec<_> = rows.iter().enumerate().collect(); + to_sort.sort_unstable_by(|left, right| left.1.cmp(&right.1)); + + // Dedup by timestamps. + to_sort.dedup_by(|left, right| { + debug_assert_eq!(18, left.1.as_ref().len()); + debug_assert_eq!(18, right.1.as_ref().len()); + let (left_key, right_key) = (left.1.as_ref(), right.1.as_ref()); + // We only compare the timestamp part and ignore sequence. + left_key[..TIMESTAMP_KEY_LEN] == right_key[..TIMESTAMP_KEY_LEN] + }); + + let indices = UInt32Vector::from_iter_values(to_sort.iter().map(|v| v.0 as u32)); + self.take_in_place(&indices) + } + + /// Takes the batch in place. + fn take_in_place(&mut self, indices: &UInt32Vector) -> Result<()> { + self.timestamps = self.timestamps.take(indices).context(ComputeVectorSnafu)?; + let array = arrow::compute::take(self.sequences.as_arrow(), indices.as_arrow(), None) + .context(ComputeArrowSnafu)?; + // Safety: we know the array and vector type. + self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap()); + let array = arrow::compute::take(self.op_types.as_arrow(), indices.as_arrow(), None) + .context(ComputeArrowSnafu)?; + self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap()); + for batch_column in &mut self.fields { + batch_column.data = batch_column + .data + .take(indices) + .context(ComputeVectorSnafu)?; + } + + Ok(()) + } + + /// Gets a timestamp at given `index`. + /// + /// # Panics + /// Panics if `index` is out-of-bound or the timestamp vector returns null. + fn get_timestamp(&self, index: usize) -> Timestamp { + match self.timestamps.get_ref(index) { + ValueRef::Timestamp(timestamp) => timestamp, + // Int64 is always millisecond. + // TODO(yingwen): Don't allow using int64 as time index. + ValueRef::Int64(v) => Timestamp::new_millisecond(v), + // We have check the data type is timestamp compatible in the [BatchBuilder] so it's safe to panic. + value => panic!("{:?} is not a timestamp", value), + } + } + + /// Gets a sequence at given `index`. + /// + /// # Panics + /// Panics if `index` is out-of-bound or the sequence vector returns null. + fn get_sequence(&self, index: usize) -> SequenceNumber { + // Safety: sequences is not null so it actually returns Some. + self.sequences.get_data(index).unwrap() + } +} + +/// Len of timestamp in arrow row format. +const TIMESTAMP_KEY_LEN: usize = 9; + +/// Helper function to concat arrays from `iter`. +fn concat_arrays(iter: impl Iterator) -> Result { + let arrays: Vec<_> = iter.collect(); + let dyn_arrays: Vec<_> = arrays.iter().map(|array| array.as_ref()).collect(); + arrow::compute::concat(&dyn_arrays).context(ComputeArrowSnafu) } /// A column in a [Batch]. @@ -226,6 +485,11 @@ impl BatchBuilder { let op_types = self.op_types.context(InvalidBatchSnafu { reason: "missing op_types", })?; + // Our storage format ensure these columns are not nullable so + // we use assert here. + assert_eq!(0, timestamps.null_count()); + assert_eq!(0, sequences.null_count()); + assert_eq!(0, op_types.null_count()); let ts_len = timestamps.len(); ensure!( @@ -336,3 +600,245 @@ impl BatchReader for Box { (**self).next_batch().await } } + +#[cfg(test)] +mod tests { + use datatypes::arrow::array::{TimestampMillisecondArray, UInt64Array, UInt8Array}; + + use super::*; + use crate::error::Error; + + fn new_batch_builder( + timestamps: &[i64], + sequences: &[u64], + op_types: &[OpType], + field: &[u64], + ) -> BatchBuilder { + let mut builder = BatchBuilder::new(b"test".to_vec()); + builder + .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values( + timestamps.iter().copied(), + ))) + .unwrap() + .sequences_array(Arc::new(UInt64Array::from_iter_values( + sequences.iter().copied(), + ))) + .unwrap() + .op_types_array(Arc::new(UInt8Array::from_iter_values( + op_types.iter().map(|v| *v as u8), + ))) + .unwrap() + .push_field_array( + 1, + Arc::new(UInt64Array::from_iter_values(field.iter().copied())), + ) + .unwrap(); + builder + } + + fn new_batch( + timestamps: &[i64], + sequences: &[u64], + op_types: &[OpType], + field: &[u64], + ) -> Batch { + new_batch_builder(timestamps, sequences, op_types, field) + .build() + .unwrap() + } + + #[test] + fn test_first_last_empty() { + let batch = new_batch(&[], &[], &[], &[]); + assert_eq!(None, batch.first_timestamp()); + assert_eq!(None, batch.last_timestamp()); + assert_eq!(None, batch.first_sequence()); + assert_eq!(None, batch.last_sequence()); + } + + #[test] + fn test_first_last_one() { + let batch = new_batch(&[1], &[2], &[OpType::Put], &[4]); + assert_eq!( + Timestamp::new_millisecond(1), + batch.first_timestamp().unwrap() + ); + assert_eq!( + Timestamp::new_millisecond(1), + batch.last_timestamp().unwrap() + ); + assert_eq!(2, batch.first_sequence().unwrap()); + assert_eq!(2, batch.last_sequence().unwrap()); + } + + #[test] + fn test_first_last_multiple() { + let batch = new_batch( + &[1, 2, 3], + &[11, 12, 13], + &[OpType::Put, OpType::Put, OpType::Put], + &[21, 22, 23], + ); + assert_eq!( + Timestamp::new_millisecond(1), + batch.first_timestamp().unwrap() + ); + assert_eq!( + Timestamp::new_millisecond(3), + batch.last_timestamp().unwrap() + ); + assert_eq!(11, batch.first_sequence().unwrap()); + assert_eq!(13, batch.last_sequence().unwrap()); + } + + #[test] + fn test_slice() { + let batch = new_batch( + &[1, 2, 3, 4], + &[11, 12, 13, 14], + &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put], + &[21, 22, 23, 24], + ); + let batch = batch.slice(1, 2); + let expect = new_batch( + &[2, 3], + &[12, 13], + &[OpType::Delete, OpType::Put], + &[22, 23], + ); + assert_eq!(expect, batch); + } + + #[test] + fn test_concat_empty() { + let err = Batch::concat(vec![]).unwrap_err(); + assert!( + matches!(err, Error::InvalidBatch { .. }), + "unexpected err: {err}" + ); + } + + #[test] + fn test_concat_one() { + let batch = new_batch(&[], &[], &[], &[]); + let actual = Batch::concat(vec![batch.clone()]).unwrap(); + assert_eq!(batch, actual); + + let batch = new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]); + let actual = Batch::concat(vec![batch.clone()]).unwrap(); + assert_eq!(batch, actual); + } + + #[test] + fn test_concat_multiple() { + let batches = vec![ + new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]), + new_batch( + &[3, 4, 5], + &[13, 14, 15], + &[OpType::Put, OpType::Delete, OpType::Put], + &[23, 24, 25], + ), + new_batch(&[], &[], &[], &[]), + new_batch(&[6], &[16], &[OpType::Put], &[26]), + ]; + let batch = Batch::concat(batches).unwrap(); + let expect = new_batch( + &[1, 2, 3, 4, 5, 6], + &[11, 12, 13, 14, 15, 16], + &[ + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Delete, + OpType::Put, + OpType::Put, + ], + &[21, 22, 23, 24, 25, 26], + ); + assert_eq!(expect, batch); + } + + #[test] + fn test_concat_different() { + let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]); + let mut batch2 = new_batch(&[2], &[2], &[OpType::Put], &[2]); + batch2.primary_key = b"hello".to_vec(); + let err = Batch::concat(vec![batch1, batch2]).unwrap_err(); + assert!( + matches!(err, Error::InvalidBatch { .. }), + "unexpected err: {err}" + ); + } + + #[test] + fn test_filter_deleted_empty() { + let mut batch = new_batch(&[], &[], &[], &[]); + batch.filter_deleted().unwrap(); + assert!(batch.is_empty()); + } + + #[test] + fn test_filter_deleted() { + let mut batch = new_batch( + &[1, 2, 3, 4], + &[11, 12, 13, 14], + &[OpType::Delete, OpType::Put, OpType::Delete, OpType::Put], + &[21, 22, 23, 24], + ); + batch.filter_deleted().unwrap(); + let expect = new_batch(&[2, 4], &[12, 14], &[OpType::Put, OpType::Put], &[22, 24]); + assert_eq!(expect, batch); + } + + #[test] + fn test_filter() { + let mut batch = new_batch( + &[1, 2, 3, 4], + &[11, 12, 13, 14], + &[OpType::Put, OpType::Put, OpType::Put, OpType::Put], + &[21, 22, 23, 24], + ); + let predicate = BooleanVector::from_vec(vec![false, false, true, true]); + batch.filter(&predicate).unwrap(); + let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]); + assert_eq!(expect, batch); + + // filter to empty. + let predicate = BooleanVector::from_vec(vec![false, false]); + batch.filter(&predicate).unwrap(); + assert!(batch.is_empty()); + } + + #[test] + fn test_sort_and_dedup() { + let mut batch = new_batch( + &[2, 3, 1, 4, 5, 2], + &[1, 2, 3, 4, 5, 6], + &[ + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + ], + &[21, 22, 23, 24, 25, 26], + ); + batch.sort_and_dedup().unwrap(); + // It should only keep one timestamp 2. + let expect = new_batch( + &[1, 2, 3, 4, 5], + &[3, 6, 2, 4, 5], + &[ + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + ], + &[23, 26, 22, 24, 25], + ); + assert_eq!(expect, batch); + } +} diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index fe665ac8d017..8b17826edfe8 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -341,6 +341,7 @@ fn new_primary_key_array(primary_key: &[u8], num_rows: usize) -> ArrayRef { #[cfg(test)] mod tests { + use api::v1::OpType; use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt64Array, UInt8Array}; use datatypes::arrow::datatypes::TimeUnit; use datatypes::prelude::ConcreteDataType; @@ -352,7 +353,7 @@ mod tests { use super::*; const TEST_SEQUENCE: u64 = 1; - const TEST_OP_TYPE: u8 = 1; + const TEST_OP_TYPE: u8 = OpType::Put as u8; fn build_test_region_metadata() -> RegionMetadataRef { let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1)); diff --git a/src/storage/src/read.rs b/src/storage/src/read.rs index f3a0e182babf..7f3637da43c6 100644 --- a/src/storage/src/read.rs +++ b/src/storage/src/read.rs @@ -255,7 +255,7 @@ pub trait BatchReader: Send { /// Returns `Ok(None)` when the reader has reached its end and calling `next_batch()` /// again won't return batch again. /// - /// If `Err` is returned, caller should not call this method again, the implementor + /// If `Err` is returned, caller **must** not call this method again, the implementor /// may or may not panic in such case. async fn next_batch(&mut self) -> Result>; }