Skip to content

Commit

Permalink
feat(mito): Defines the read Batch struct for mito2 (GreptimeTeam#2174)
Browse files Browse the repository at this point in the history
* feat: define batch

* feat: define Batch struct

* feat: stream writer takes arrow's types

* feat: make it compile

* feat: use uint64vector and uint8vector

* feat: add timestamps and primary key
  • Loading branch information
evenyag authored and paomian committed Oct 19, 2023
1 parent 7dfa778 commit 454a07a
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 61 deletions.
6 changes: 5 additions & 1 deletion src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,14 +294,17 @@ pub enum Error {
},

#[snafu(display(
"Failed to deserialize field, source: {} location: {}",
"Failed to deserialize field, source: {}, location: {}",
source,
location
))]
DeserializeField {
source: memcomparable::Error,
location: Location,
},

#[snafu(display("Invalid batch, {}, location: {}", reason, location))]
InvalidBatch { reason: String, location: Location },
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -351,6 +354,7 @@ impl ErrorExt for Error {
SerializeField { .. } => StatusCode::Internal,
NotSupportedField { .. } => StatusCode::Unsupported,
DeserializeField { .. } => StatusCode::Unexpected,
InvalidBatch { .. } => StatusCode::InvalidArguments,
}
}

Expand Down
189 changes: 152 additions & 37 deletions src/mito2/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,68 +14,182 @@

//! Common structs and utilities for reading data.
use std::sync::Arc;

use async_trait::async_trait;
use common_time::Timestamp;
use datatypes::vectors::VectorRef;
use datatypes::vectors::{UInt64Vector, UInt8Vector, Vector, VectorRef};
use snafu::ensure;
use store_api::storage::ColumnId;

use crate::error::Result;
use crate::error::{InvalidBatchSnafu, Result};
use crate::metadata::RegionMetadataRef;

/// Storage internal representation of a batch of rows.
/// Storage internal representation of a batch of rows
/// for a primary key (time series).
///
/// Now the structure of [Batch] is still unstable, all pub fields may be changed.
#[derive(Debug, Default, PartialEq, Eq, Clone)]
/// Rows are sorted by primary key, timestamp, sequence desc, op_type desc.
#[derive(Debug, PartialEq, Clone)]
pub struct Batch {
/// Rows organized in columnar format.
pub columns: Vec<VectorRef>,
/// Primary key encoded in a comparable form.
primary_key: Vec<u8>,
/// Timestamps of rows, should be sorted and not null.
timestamps: VectorRef,
/// Sequences of rows
///
/// UInt64 type, not null.
sequences: Arc<UInt64Vector>,
/// Op types of rows
///
/// UInt8 type, not null.
op_types: Arc<UInt8Vector>,
/// Fields organized in columnar format.
fields: Vec<BatchColumn>,
}

impl Batch {
/// Create a new `Batch` from `columns`.
///
/// # Panics
/// Panics if vectors in `columns` have different length.
pub fn new(columns: Vec<VectorRef>) -> Batch {
Self::assert_columns(&columns);
/// Creates a new batch.
pub fn new(
primary_key: Vec<u8>,
timestamps: VectorRef,
sequences: Arc<UInt64Vector>,
op_types: Arc<UInt8Vector>,
fields: Vec<BatchColumn>,
) -> Result<Batch> {
BatchBuilder::new(primary_key, timestamps, sequences, op_types)
.with_fields(fields)
.build()
}

/// Returns primary key of the batch.
pub fn primary_key(&self) -> &[u8] {
&self.primary_key
}

/// Returns fields in the batch.
pub fn fields(&self) -> &[BatchColumn] {
&self.fields
}

Batch { columns }
/// Returns timestamps of the batch.
pub fn timestamps(&self) -> &VectorRef {
&self.timestamps
}

/// Returns number of columns in the batch.
pub fn num_columns(&self) -> usize {
self.columns.len()
/// Returns sequences of the batch.
pub fn sequences(&self) -> &Arc<UInt64Vector> {
&self.sequences
}

/// Returns number of rows in the batch.
/// Returns op types of the batch.
pub fn op_types(&self) -> &Arc<UInt8Vector> {
&self.op_types
}

/// Returns the number of rows in the batch.
pub fn num_rows(&self) -> usize {
self.columns.get(0).map(|v| v.len()).unwrap_or(0)
// All vectors have the same length so we use
// the length of timestamps vector.
self.timestamps.len()
}

/// Returns true if the number of rows in the batch is 0.
pub fn is_empty(&self) -> bool {
self.num_rows() == 0
}
}

/// A column in a [Batch].
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct BatchColumn {
/// Id of the column.
pub column_id: ColumnId,
/// Data of the column.
pub data: VectorRef,
}

/// Builder to build [Batch].
pub struct BatchBuilder {
primary_key: Vec<u8>,
timestamps: VectorRef,
sequences: Arc<UInt64Vector>,
op_types: Arc<UInt8Vector>,
fields: Vec<BatchColumn>,
}

impl BatchBuilder {
/// Creates a new [BatchBuilder].
pub fn new(
primary_key: Vec<u8>,
timestamps: VectorRef,
sequences: Arc<UInt64Vector>,
op_types: Arc<UInt8Vector>,
) -> BatchBuilder {
BatchBuilder {
primary_key,
timestamps,
sequences,
op_types,
fields: Vec::new(),
}
}

/// Slice the batch, returning a new batch.
///
/// # Panics
/// Panics if `offset + length > self.num_rows()`.
pub fn slice(&self, offset: usize, length: usize) -> Batch {
let columns = self
.columns
.iter()
.map(|v| v.slice(offset, length))
.collect();
Batch { columns }
}

fn assert_columns(columns: &[VectorRef]) {
if columns.is_empty() {
return;
/// Set all field columns.
pub fn with_fields(mut self, fields: Vec<BatchColumn>) -> Self {
self.fields = fields;
self
}

/// Push a field column.
pub fn push_field(&mut self, column: BatchColumn) -> &mut Self {
self.fields.push(column);
self
}

/// Builds the [Batch].
pub fn build(self) -> Result<Batch> {
let ts_len = self.timestamps.len();
ensure!(
self.sequences.len() == ts_len,
InvalidBatchSnafu {
reason: format!(
"sequence have different len {} != {}",
self.sequences.len(),
ts_len
),
}
);
ensure!(
self.op_types.len() == ts_len,
InvalidBatchSnafu {
reason: format!(
"op type have different len {} != {}",
self.op_types.len(),
ts_len
),
}
);
for column in &self.fields {
ensure!(
column.data.len() == ts_len,
InvalidBatchSnafu {
reason: format!(
"column {} has different len {} != {}",
column.column_id,
column.data.len(),
ts_len
),
}
);
}

let length = columns[0].len();
assert!(columns.iter().all(|col| col.len() == length));
Ok(Batch {
primary_key: self.primary_key,
timestamps: self.timestamps,
sequences: self.sequences,
op_types: self.op_types,
fields: self.fields,
})
}
}

Expand Down Expand Up @@ -110,6 +224,7 @@ impl Source {
unimplemented!()
}

// TODO(yingwen): Maybe remove this method.
/// Returns statisics of fetched batches.
pub(crate) fn stats(&self) -> SourceStats {
unimplemented!()
Expand Down
19 changes: 16 additions & 3 deletions src/mito2/src/sst/parquet/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
//! Parquet writer.
use common_telemetry::debug;
use datatypes::arrow::record_batch::RecordBatch;
use object_store::ObjectStore;
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::metadata::KeyValue;
use parquet::file::properties::WriterProperties;
use snafu::ResultExt;

use crate::error::Result;
use crate::error::{NewRecordBatchSnafu, Result};
use crate::read::Source;
use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY};
use crate::sst::stream_writer::BufferedWriter;
Expand Down Expand Up @@ -64,17 +66,28 @@ impl<'a> ParquetWriter<'a> {

let writer_props = props_builder.build();

let arrow_schema = metadata.schema.arrow_schema();
let mut buffered_writer = BufferedWriter::try_new(
self.file_path.to_string(),
self.object_store.clone(),
&metadata.schema,
arrow_schema.clone(),
Some(writer_props),
opts.write_buffer_size.as_bytes() as usize,
)
.await?;

while let Some(batch) = self.source.next_batch().await? {
buffered_writer.write(&batch).await?;
let arrow_batch = RecordBatch::try_new(
arrow_schema.clone(),
batch
.fields()
.iter()
.map(|v| v.data.to_arrow_array())
.collect::<Vec<_>>(),
)
.context(NewRecordBatchSnafu)?;

buffered_writer.write(&arrow_batch).await?;
}
// Get stats from the source.
let stats = self.source.stats();
Expand Down
27 changes: 7 additions & 20 deletions src/mito2/src/sst/stream_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,22 @@ use std::pin::Pin;

use common_datasource::buffered_writer::LazyBufferedWriter;
use common_datasource::share_buffer::SharedBuffer;
use datatypes::arrow;
use datatypes::arrow::datatypes::SchemaRef;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::schema::SchemaRef;
use object_store::ObjectStore;
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
use parquet::format::FileMetaData;
use snafu::ResultExt;

use crate::error;
use crate::error::{NewRecordBatchSnafu, WriteParquetSnafu};
use crate::read::Batch;
use crate::error::WriteParquetSnafu;

/// Parquet writer that buffers row groups in memory and writes buffered data to an underlying
/// storage by chunks to reduce memory consumption.
pub struct BufferedWriter {
inner: InnerBufferedWriter,
arrow_schema: arrow::datatypes::SchemaRef,
arrow_schema: SchemaRef,
}

type InnerBufferedWriter = LazyBufferedWriter<
Expand All @@ -56,11 +54,10 @@ impl BufferedWriter {
pub async fn try_new(
path: String,
store: ObjectStore,
schema: &SchemaRef,
arrow_schema: SchemaRef,
props: Option<WriterProperties>,
buffer_threshold: usize,
) -> error::Result<Self> {
let arrow_schema = schema.arrow_schema();
let buffer = SharedBuffer::with_capacity(buffer_threshold);

let arrow_writer = ArrowWriter::try_new(buffer.clone(), arrow_schema.clone(), props)
Expand All @@ -82,24 +79,14 @@ impl BufferedWriter {
})
}),
),
arrow_schema: arrow_schema.clone(),
arrow_schema,
})
}

/// Write a record batch to stream writer.
pub async fn write(&mut self, batch: &Batch) -> error::Result<()> {
let arrow_batch = RecordBatch::try_new(
self.arrow_schema.clone(),
batch
.columns
.iter()
.map(|v| v.to_arrow_array())
.collect::<Vec<_>>(),
)
.context(NewRecordBatchSnafu)?;

pub async fn write(&mut self, arrow_batch: &RecordBatch) -> error::Result<()> {
self.inner
.write(&arrow_batch)
.write(arrow_batch)
.await
.context(error::WriteBufferSnafu)?;
self.inner
Expand Down

0 comments on commit 454a07a

Please sign in to comment.