Skip to content

Commit

Permalink
feat: define Batch struct
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed Aug 14, 2023
1 parent 5aa96a3 commit fa97086
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 7 deletions.
6 changes: 5 additions & 1 deletion src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,14 +294,17 @@ pub enum Error {
},

#[snafu(display(
"Failed to deserialize field, source: {} location: {}",
"Failed to deserialize field, source: {}, location: {}",
source,
location
))]
DeserializeField {
source: memcomparable::Error,
location: Location,
},

#[snafu(display("Invalid batch, {}, location: {}", reason, location))]
InvalidBatch { reason: String, location: Location },
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -351,6 +354,7 @@ impl ErrorExt for Error {
SerializeField { .. } => StatusCode::Internal,
NotSupportedField { .. } => StatusCode::Unsupported,
DeserializeField { .. } => StatusCode::Unexpected,
InvalidBatch { .. } => StatusCode::InvalidArguments,
}
}

Expand Down
126 changes: 120 additions & 6 deletions src/mito2/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
use async_trait::async_trait;
use common_time::Timestamp;
use datatypes::prelude::ConcreteDataType;
use datatypes::vectors::VectorRef;
use snafu::ensure;
use store_api::storage::ColumnId;

use crate::error::Result;
use crate::error::{InvalidBatchSnafu, Result};
use crate::metadata::RegionMetadataRef;

/// Storage internal representation of a batch of rows
Expand All @@ -45,13 +48,21 @@ pub struct Batch {

impl Batch {
/// Creates a new batch.
pub fn new(primary_key: Vec<u8>, timestamps: VectorRef, sequences: VectorRef, op_types: VectorRef, fields: Vec<BatchColumn>) -> Result<Batch> {
todo!()
pub fn new(
primary_key: Vec<u8>,
timestamps: VectorRef,
sequences: VectorRef,
op_types: VectorRef,
fields: Vec<BatchColumn>,
) -> Result<Batch> {
BatchBuilder::new(primary_key, timestamps, sequences, op_types)
.fields(fields)
.build()
}

/// Returns columns in the batch.
pub fn columns(&self) -> &[BatchColumn] {
&self.columns
/// Returns fields in the batch.
pub fn fields(&self) -> &[BatchColumn] {
&self.fields
}

/// Returns sequences of the batch.
Expand Down Expand Up @@ -86,6 +97,109 @@ pub struct BatchColumn {
pub data: VectorRef,
}

/// Builder to build [Batch].
pub struct BatchBuilder {
primary_key: Vec<u8>,
timestamps: VectorRef,
sequences: VectorRef,
op_types: VectorRef,
fields: Vec<BatchColumn>,
}

impl BatchBuilder {
/// Creates a new [BatchBuilder].
pub fn new(
primary_key: Vec<u8>,
timestamps: VectorRef,
sequences: VectorRef,
op_types: VectorRef,
) -> BatchBuilder {
BatchBuilder {
primary_key,
timestamps,
sequences,
op_types,
fields: Vec::new(),
}
}

/// Set all field columns.
pub fn fields(&mut self, fields: Vec<BatchColumn>) -> &mut Self {
self.fields = fields;
self
}

/// Push a field column.
pub fn push_field(&mut self, column: BatchColumn) -> &mut Self {
self.fields.push(column);
self
}

/// Builds the [Batch].
pub fn build(self) -> Result<Batch> {
let ts_len = self.timestamps.len();
ensure!(
self.sequences.len() == ts_len,
InvalidBatchSnafu {
reason: format!(
"sequence have different len {} != {}",
self.sequences.len(),
ts_len
),
}
);
ensure!(
self.sequences.data_type() == ConcreteDataType::uint64_datatype(),
InvalidBatchSnafu {
reason: format!(
"sequence must has uint64 type, given: {:?}",
self.sequences.data_type()
),
}
);
ensure!(
self.op_types.len() == ts_len,
InvalidBatchSnafu {
reason: format!(
"op type have different len {} != {}",
self.op_types.len(),
ts_len
),
}
);
ensure!(
self.sequences.data_type() == ConcreteDataType::uint8_datatype(),
InvalidBatchSnafu {
reason: format!(
"sequence must has uint8 type, given: {:?}",
self.op_types.data_type()
),
}
);
for column in &self.fields {
ensure!(
column.data.len() == ts_len,
InvalidBatchSnafu {
reason: format!(
"column {} has different len {} != {}",
column.column_id,
column.data.len(),
ts_len
),
}
);
}

Ok(Batch {
primary_key: self.primary_key,
timestamps: self.timestamps,
sequences: self.sequences,
op_types: self.op_types,
fields: self.fields,
})
}
}

/// Collected [Source] statistics.
#[derive(Debug, Clone)]
pub struct SourceStats {
Expand Down

0 comments on commit fa97086

Please sign in to comment.