Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mito): Implement SST format for mito2 #2178

Merged
merged 38 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
807ccfe
chore: update comment
evenyag Aug 12, 2023
c8a2ec4
feat: stream writer takes arrow's types
evenyag Aug 12, 2023
d5a7360
feat: Define Batch struct
evenyag Aug 12, 2023
2a8d374
feat: arrow_schema_to_store
evenyag Aug 12, 2023
1571114
refactor: rename
evenyag Aug 12, 2023
70e15a5
feat: write parquet in new format with tsids
evenyag Aug 12, 2023
319280a
feat: reader support projection
evenyag Aug 13, 2023
725b7da
feat: Impl read compat
evenyag Aug 13, 2023
05b4ade
refactor: rename SchemaCompat to CompatRecordBatch
evenyag Aug 14, 2023
8601b6c
feat: changing sst format
evenyag Aug 15, 2023
3c5e6d0
feat: Merge branch 'develop' into feat/mito2-read
evenyag Aug 15, 2023
c33c46c
feat: make it compile
evenyag Aug 15, 2023
a56f2f6
feat: remove tsid and some structs
evenyag Aug 15, 2023
48f5aa4
feat: from_sst_record_batch wip
evenyag Aug 15, 2023
bbf5ce9
chore: push array
evenyag Aug 15, 2023
f1f4c5f
chore: wip
evenyag Aug 15, 2023
ec8bf4d
feat: decode batches from RecordBatch
evenyag Aug 15, 2023
f5d8934
feat: reader converts record batches
evenyag Aug 15, 2023
ccfd736
feat: remove compat mod
evenyag Aug 15, 2023
bbcf8ee
chore: remove some codes
evenyag Aug 15, 2023
d0591ab
feat: sort fields by column id
evenyag Aug 15, 2023
757ef86
test: test to_sst_arrow_schema
evenyag Aug 15, 2023
2891b9d
feat: do not sort fields
evenyag Aug 15, 2023
a2e5f62
test: more test helpers
evenyag Aug 15, 2023
557d286
feat: simplify projection
evenyag Aug 15, 2023
a466935
fix: projection indices is incorrect
evenyag Aug 15, 2023
cb6fad5
refactor: define write/read format
evenyag Aug 16, 2023
ec552f2
test: test write format
evenyag Aug 16, 2023
e268d5a
test: test projection
evenyag Aug 16, 2023
3ae156a
test: test convert record batch
evenyag Aug 16, 2023
71e68e3
feat: remove unused errors
evenyag Aug 16, 2023
fb7b531
refactor: wrap get_field_batch_columns
evenyag Aug 16, 2023
7cc761e
chore: clippy
evenyag Aug 16, 2023
8123d3a
chore: Merge branch 'develop' into feat/mito2-sst-format
evenyag Aug 16, 2023
a0aa8c6
chore: fix clippy
evenyag Aug 16, 2023
a7b63f3
feat: build arrow schema from region meta in ReadFormat
evenyag Aug 17, 2023
0b493fc
feat: initialize the parquet reader at `build()`
evenyag Aug 17, 2023
241ff3e
chore: fix typo
evenyag Aug 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,28 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to get metadata from file {}, reason: {}", file, reason))]
NoKeyValue {
evenyag marked this conversation as resolved.
Show resolved Hide resolved
file: String,
reason: String,
location: Location,
},

#[snafu(display("Invalid batch, {}, location: {}", reason, location))]
InvalidBatch { reason: String, location: Location },

#[snafu(display("Invalid arrow record batch, {}, location: {}", reason, location))]
InvalidRecordBatch { reason: String, location: Location },

#[snafu(display(
"Failed to convert array to vector, location: {}, source: {}",
location,
source
))]
ConvertVector {
location: Location,
source: datatypes::error::Error,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -341,7 +361,8 @@ impl ErrorExt for Error {
| NewRecordBatch { .. }
| RegionNotFound { .. }
| RegionCorrupted { .. }
| CreateDefault { .. } => StatusCode::Unexpected,
| CreateDefault { .. }
| NoKeyValue { .. } => StatusCode::Unexpected,
InvalidScanIndex { .. }
| InvalidMeta { .. }
| InvalidSchema { .. }
Expand All @@ -362,6 +383,8 @@ impl ErrorExt for Error {
NotSupportedField { .. } => StatusCode::Unsupported,
DeserializeField { .. } => StatusCode::Unexpected,
InvalidBatch { .. } => StatusCode::InvalidArguments,
InvalidRecordBatch { .. } => StatusCode::InvalidArguments,
ConvertVector { source, .. } => source.status_code(),
}
}

Expand Down
123 changes: 103 additions & 20 deletions src/mito2/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ use std::sync::Arc;

use async_trait::async_trait;
use common_time::Timestamp;
use datatypes::vectors::{UInt64Vector, UInt8Vector, Vector, VectorRef};
use snafu::ensure;
use datatypes::arrow;
use datatypes::arrow::array::ArrayRef;
use datatypes::prelude::DataType;
use datatypes::vectors::{Helper, UInt64Vector, UInt8Vector, Vector, VectorRef};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;

use crate::error::{InvalidBatchSnafu, Result};
use crate::error::{ConvertVectorSnafu, InvalidBatchSnafu, Result};

/// Storage internal representation of a batch of rows
/// for a primary key (time series).
Expand Down Expand Up @@ -56,7 +59,7 @@ impl Batch {
op_types: Arc<UInt8Vector>,
fields: Vec<BatchColumn>,
) -> Result<Batch> {
BatchBuilder::new(primary_key, timestamps, sequences, op_types)
BatchBuilder::with_required_columns(primary_key, timestamps, sequences, op_types)
.with_fields(fields)
.build()
}
Expand Down Expand Up @@ -111,25 +114,36 @@ pub struct BatchColumn {
/// Builder to build [Batch].
pub struct BatchBuilder {
primary_key: Vec<u8>,
timestamps: VectorRef,
sequences: Arc<UInt64Vector>,
op_types: Arc<UInt8Vector>,
timestamps: Option<VectorRef>,
sequences: Option<Arc<UInt64Vector>>,
op_types: Option<Arc<UInt8Vector>>,
fields: Vec<BatchColumn>,
}

impl BatchBuilder {
/// Creates a new [BatchBuilder].
pub fn new(
/// Creates a new [BatchBuilder] with primary key.
pub fn new(primary_key: Vec<u8>) -> BatchBuilder {
BatchBuilder {
primary_key,
timestamps: None,
sequences: None,
op_types: None,
fields: Vec::new(),
}
}

/// Creates a new [BatchBuilder] with all required columns.
pub fn with_required_columns(
primary_key: Vec<u8>,
timestamps: VectorRef,
sequences: Arc<UInt64Vector>,
op_types: Arc<UInt8Vector>,
) -> BatchBuilder {
BatchBuilder {
primary_key,
timestamps,
sequences,
op_types,
timestamps: Some(timestamps),
sequences: Some(sequences),
op_types: Some(op_types),
fields: Vec::new(),
}
}
Expand All @@ -146,25 +160,90 @@ impl BatchBuilder {
self
}

/// Push an array as a field.
pub fn push_field_array(&mut self, column_id: ColumnId, array: ArrayRef) -> Result<&mut Self> {
let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
self.fields.push(BatchColumn {
column_id,
data: vector,
});

Ok(self)
}

/// Try to set an array as timestamps.
pub fn timestamps_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
ensure!(
vector.data_type().is_timestamp_compatible(),
InvalidBatchSnafu {
reason: format!("{:?} is a timestamp type", vector.data_type()),
}
);

self.timestamps = Some(vector);
Ok(self)
}

/// Try to set an array as sequences.
pub fn sequences_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
ensure!(
*array.data_type() == arrow::datatypes::DataType::UInt64,
InvalidBatchSnafu {
reason: "sequence array is not UInt64 type",
}
);
// Safety: The cast must success as we have ensured it is uint64 type.
let vector = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
self.sequences = Some(vector);

Ok(self)
}

/// Try to set an array as op types.
pub fn op_types_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
ensure!(
*array.data_type() == arrow::datatypes::DataType::UInt8,
InvalidBatchSnafu {
reason: "sequence array is not UInt8 type",
}
);
// Safety: The cast must success as we have ensured it is uint64 type.
let vector = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
self.op_types = Some(vector);

Ok(self)
}

/// Builds the [Batch].
pub fn build(self) -> Result<Batch> {
let ts_len = self.timestamps.len();
let timestamps = self.timestamps.context(InvalidBatchSnafu {
reason: "missing timestamps",
})?;
let sequences = self.sequences.context(InvalidBatchSnafu {
reason: "missing timestamps",
evenyag marked this conversation as resolved.
Show resolved Hide resolved
})?;
let op_types = self.op_types.context(InvalidBatchSnafu {
reason: "missing timestamps",
evenyag marked this conversation as resolved.
Show resolved Hide resolved
})?;

let ts_len = timestamps.len();
ensure!(
self.sequences.len() == ts_len,
sequences.len() == ts_len,
InvalidBatchSnafu {
reason: format!(
"sequence have different len {} != {}",
self.sequences.len(),
sequences.len(),
ts_len
),
}
);
ensure!(
self.op_types.len() == ts_len,
op_types.len() == ts_len,
InvalidBatchSnafu {
reason: format!(
"op type have different len {} != {}",
self.op_types.len(),
op_types.len(),
ts_len
),
}
Expand All @@ -185,9 +264,9 @@ impl BatchBuilder {

Ok(Batch {
primary_key: self.primary_key,
timestamps: self.timestamps,
sequences: self.sequences,
op_types: self.op_types,
timestamps,
sequences,
op_types,
fields: self.fields,
})
}
Expand Down Expand Up @@ -232,8 +311,12 @@ impl Source {
}

/// Async batch reader.
///
/// The reader must guarantee [Batch]es returned by it have the same schema.
#[async_trait]
pub trait BatchReader: Send {
// TODO(yingwen): fields of the batch returned.

/// Fetch next [Batch].
///
/// Returns `Ok(None)` when the reader has reached its end and calling `next_batch()`
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

//! SST in parquet format.

mod format;
mod reader;
mod writer;

Expand Down
Loading