Skip to content

Commit

Permalink
feat(mito): Implement operations like concat and sort for Batch (Grep…
Browse files Browse the repository at this point in the history
…timeTeam#2203)

* feat: Implement slice and first/last timestamp for Batch

* feat(mito): implements sort/concat for Batch

* chore: fix typo

* chore: remove comments

* feat: sort and dedup

* test: test batch operations

* chore: cast enum to test op type

* test: test filter related api

* sytle: fix clippy

* docs: comment for slice

* chore: address CR comment

Don't return Option in get_timestamp()/get_sequence()
  • Loading branch information
evenyag authored and paomian committed Oct 19, 2023
1 parent 9c94379 commit 2aef37e
Show file tree
Hide file tree
Showing 6 changed files with 551 additions and 16 deletions.
3 changes: 2 additions & 1 deletion src/datatypes/src/vectors/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ impl BooleanVector {
&self.array
}

pub(crate) fn as_boolean_array(&self) -> &BooleanArray {
/// Get the inner boolean array.
pub fn as_boolean_array(&self) -> &BooleanArray {
&self.array
}

Expand Down
12 changes: 8 additions & 4 deletions src/datatypes/src/vectors/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ impl<T: LogicalPrimitiveType> PrimitiveVector<T> {
}
}

pub(crate) fn as_arrow(&self) -> &PrimitiveArray<T::ArrowPrimitive> {
/// Get the inner arrow array.
pub fn as_arrow(&self) -> &PrimitiveArray<T::ArrowPrimitive> {
&self.array
}

Expand All @@ -245,7 +246,11 @@ impl<T: LogicalPrimitiveType> PrimitiveVector<T> {
}

// To distinguish with `Vector::slice()`.
fn get_slice(&self, offset: usize, length: usize) -> Self {
/// Slice the batch, returning a new batch.
///
/// # Panics
/// This function panics if `offset + length > self.len()`.
pub fn get_slice(&self, offset: usize, length: usize) -> Self {
let data = self.array.to_data().slice(offset, length);
Self::from_array_data(data)
}
Expand Down Expand Up @@ -295,8 +300,7 @@ impl<T: LogicalPrimitiveType> Vector for PrimitiveVector<T> {
}

fn slice(&self, offset: usize, length: usize) -> VectorRef {
let data = self.array.to_data().slice(offset, length);
Arc::new(Self::from_array_data(data))
Arc::new(self.get_slice(offset, length))
}

fn get(&self, index: usize) -> Value {
Expand Down
25 changes: 24 additions & 1 deletion src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,12 @@ pub enum Error {
location: Location,
},

#[snafu(display("Invalid parquet SST file {}, reason: {}", file, reason))]
#[snafu(display(
"Invalid parquet SST file {}, location: {}, reason: {}",
file,
location,
reason
))]
InvalidParquet {
file: String,
reason: String,
Expand All @@ -332,6 +337,22 @@ pub enum Error {
source: datatypes::error::Error,
},

#[snafu(display(
"Failed to compute arrow arrays, location: {}, source: {}",
location,
source
))]
ComputeArrow {
location: Location,
source: datatypes::arrow::error::ArrowError,
},

#[snafu(display("Failed to compute vector, location: {}, source: {}", location, source))]
ComputeVector {
location: Location,
source: datatypes::error::Error,
},

#[snafu(display(
"Primary key length mismatch, expect: {}, actual: {}, location: {}",
expect,
Expand Down Expand Up @@ -409,6 +430,8 @@ impl ErrorExt for Error {
InvalidBatch { .. } => StatusCode::InvalidArguments,
InvalidRecordBatch { .. } => StatusCode::InvalidArguments,
ConvertVector { source, .. } => source.status_code(),
ComputeArrow { .. } => StatusCode::Internal,
ComputeVector { .. } => StatusCode::Internal,
PrimaryKeyLengthMismatch { .. } => StatusCode::InvalidArguments,
SortValues { .. } => StatusCode::Unexpected,
CompactValues { source, .. } => source.status_code(),
Expand Down
Loading

0 comments on commit 2aef37e

Please sign in to comment.