From fa9708668aba580c9349e68c7865ee139507397f Mon Sep 17 00:00:00 2001 From: evenyag Date: Mon, 14 Aug 2023 22:49:41 +0800 Subject: [PATCH] feat: define Batch struct --- src/mito2/src/error.rs | 6 +- src/mito2/src/read.rs | 126 +++++++++++++++++++++++++++++++++++++++-- 2 files changed, 125 insertions(+), 7 deletions(-) diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index bd6df63d0114..b2be074e421d 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -294,7 +294,7 @@ pub enum Error { }, #[snafu(display( - "Failed to deserialize field, source: {} location: {}", + "Failed to deserialize field, source: {}, location: {}", source, location ))] @@ -302,6 +302,9 @@ pub enum Error { source: memcomparable::Error, location: Location, }, + + #[snafu(display("Invalid batch, {}, location: {}", reason, location))] + InvalidBatch { reason: String, location: Location }, } pub type Result = std::result::Result; @@ -351,6 +354,7 @@ impl ErrorExt for Error { SerializeField { .. } => StatusCode::Internal, NotSupportedField { .. } => StatusCode::Unsupported, DeserializeField { .. } => StatusCode::Unexpected, + InvalidBatch { .. } => StatusCode::InvalidArguments, } } diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index e92f8b12a70a..77e7ab5844d7 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -16,9 +16,12 @@ use async_trait::async_trait; use common_time::Timestamp; +use datatypes::prelude::ConcreteDataType; use datatypes::vectors::VectorRef; +use snafu::ensure; +use store_api::storage::ColumnId; -use crate::error::Result; +use crate::error::{InvalidBatchSnafu, Result}; use crate::metadata::RegionMetadataRef; /// Storage internal representation of a batch of rows @@ -45,13 +48,21 @@ pub struct Batch { impl Batch { /// Creates a new batch. - pub fn new(primary_key: Vec, timestamps: VectorRef, sequences: VectorRef, op_types: VectorRef, fields: Vec) -> Result { - todo!() + pub fn new( + primary_key: Vec, + timestamps: VectorRef, + sequences: VectorRef, + op_types: VectorRef, + fields: Vec, + ) -> Result { + BatchBuilder::new(primary_key, timestamps, sequences, op_types) + .fields(fields) + .build() } - /// Returns columns in the batch. - pub fn columns(&self) -> &[BatchColumn] { - &self.columns + /// Returns fields in the batch. + pub fn fields(&self) -> &[BatchColumn] { + &self.fields } /// Returns sequences of the batch. @@ -86,6 +97,109 @@ pub struct BatchColumn { pub data: VectorRef, } +/// Builder to build [Batch]. +pub struct BatchBuilder { + primary_key: Vec, + timestamps: VectorRef, + sequences: VectorRef, + op_types: VectorRef, + fields: Vec, +} + +impl BatchBuilder { + /// Creates a new [BatchBuilder]. + pub fn new( + primary_key: Vec, + timestamps: VectorRef, + sequences: VectorRef, + op_types: VectorRef, + ) -> BatchBuilder { + BatchBuilder { + primary_key, + timestamps, + sequences, + op_types, + fields: Vec::new(), + } + } + + /// Set all field columns. + pub fn fields(&mut self, fields: Vec) -> &mut Self { + self.fields = fields; + self + } + + /// Push a field column. + pub fn push_field(&mut self, column: BatchColumn) -> &mut Self { + self.fields.push(column); + self + } + + /// Builds the [Batch]. + pub fn build(self) -> Result { + let ts_len = self.timestamps.len(); + ensure!( + self.sequences.len() == ts_len, + InvalidBatchSnafu { + reason: format!( + "sequence have different len {} != {}", + self.sequences.len(), + ts_len + ), + } + ); + ensure!( + self.sequences.data_type() == ConcreteDataType::uint64_datatype(), + InvalidBatchSnafu { + reason: format!( + "sequence must has uint64 type, given: {:?}", + self.sequences.data_type() + ), + } + ); + ensure!( + self.op_types.len() == ts_len, + InvalidBatchSnafu { + reason: format!( + "op type have different len {} != {}", + self.op_types.len(), + ts_len + ), + } + ); + ensure!( + self.sequences.data_type() == ConcreteDataType::uint8_datatype(), + InvalidBatchSnafu { + reason: format!( + "sequence must has uint8 type, given: {:?}", + self.op_types.data_type() + ), + } + ); + for column in &self.fields { + ensure!( + column.data.len() == ts_len, + InvalidBatchSnafu { + reason: format!( + "column {} has different len {} != {}", + column.column_id, + column.data.len(), + ts_len + ), + } + ); + } + + Ok(Batch { + primary_key: self.primary_key, + timestamps: self.timestamps, + sequences: self.sequences, + op_types: self.op_types, + fields: self.fields, + }) + } +} + /// Collected [Source] statistics. #[derive(Debug, Clone)] pub struct SourceStats {