Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mito): Defines the read Batch struct for mito2 #2174

Merged
merged 6 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,14 +294,17 @@ pub enum Error {
},

#[snafu(display(
"Failed to deserialize field, source: {} location: {}",
"Failed to deserialize field, source: {}, location: {}",
source,
location
))]
DeserializeField {
source: memcomparable::Error,
location: Location,
},

#[snafu(display("Invalid batch, {}, location: {}", reason, location))]
InvalidBatch { reason: String, location: Location },
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -351,6 +354,7 @@ impl ErrorExt for Error {
SerializeField { .. } => StatusCode::Internal,
NotSupportedField { .. } => StatusCode::Unsupported,
DeserializeField { .. } => StatusCode::Unexpected,
InvalidBatch { .. } => StatusCode::InvalidArguments,
}
}

Expand Down
189 changes: 152 additions & 37 deletions src/mito2/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,68 +14,182 @@

//! Common structs and utilities for reading data.

use std::sync::Arc;

use async_trait::async_trait;
use common_time::Timestamp;
use datatypes::vectors::VectorRef;
use datatypes::vectors::{UInt64Vector, UInt8Vector, Vector, VectorRef};
use snafu::ensure;
use store_api::storage::ColumnId;

use crate::error::Result;
use crate::error::{InvalidBatchSnafu, Result};
use crate::metadata::RegionMetadataRef;

/// Storage internal representation of a batch of rows.
/// Storage internal representation of a batch of rows
/// for a primary key (time series).
///
/// Now the structure of [Batch] is still unstable, all pub fields may be changed.
#[derive(Debug, Default, PartialEq, Eq, Clone)]
/// Rows are sorted by primary key, timestamp, sequence desc, op_type desc.
#[derive(Debug, PartialEq, Clone)]
pub struct Batch {
/// Rows organized in columnar format.
pub columns: Vec<VectorRef>,
/// Primary key encoded in a comparable form.
primary_key: Vec<u8>,
/// Timestamps of rows, should be sorted and not null.
timestamps: VectorRef,
/// Sequences of rows
///
/// UInt64 type, not null.
sequences: Arc<UInt64Vector>,
/// Op types of rows
///
/// UInt8 type, not null.
op_types: Arc<UInt8Vector>,
/// Fields organized in columnar format.
fields: Vec<BatchColumn>,
}

impl Batch {
/// Create a new `Batch` from `columns`.
///
/// # Panics
/// Panics if vectors in `columns` have different length.
pub fn new(columns: Vec<VectorRef>) -> Batch {
Self::assert_columns(&columns);
/// Creates a new batch.
pub fn new(
primary_key: Vec<u8>,
timestamps: VectorRef,
sequences: Arc<UInt64Vector>,
op_types: Arc<UInt8Vector>,
fields: Vec<BatchColumn>,
) -> Result<Batch> {
BatchBuilder::new(primary_key, timestamps, sequences, op_types)
.with_fields(fields)
.build()
}

/// Returns primary key of the batch.
pub fn primary_key(&self) -> &[u8] {
&self.primary_key
}

/// Returns fields in the batch.
pub fn fields(&self) -> &[BatchColumn] {
&self.fields
}

Batch { columns }
/// Returns timestamps of the batch.
pub fn timestamps(&self) -> &VectorRef {
&self.timestamps
}

/// Returns number of columns in the batch.
pub fn num_columns(&self) -> usize {
self.columns.len()
/// Returns sequences of the batch.
pub fn sequences(&self) -> &Arc<UInt64Vector> {
&self.sequences
}

/// Returns number of rows in the batch.
/// Returns op types of the batch.
pub fn op_types(&self) -> &Arc<UInt8Vector> {
&self.op_types
}

/// Returns the number of rows in the batch.
pub fn num_rows(&self) -> usize {
self.columns.get(0).map(|v| v.len()).unwrap_or(0)
// All vectors have the same length so we use
// the length of timestamps vector.
self.timestamps.len()
}

/// Returns true if the number of rows in the batch is 0.
pub fn is_empty(&self) -> bool {
self.num_rows() == 0
}
}

/// A column in a [Batch].
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct BatchColumn {
/// Id of the column.
pub column_id: ColumnId,
/// Data of the column.
pub data: VectorRef,
}

/// Builder to build [Batch].
pub struct BatchBuilder {
primary_key: Vec<u8>,
timestamps: VectorRef,
sequences: Arc<UInt64Vector>,
op_types: Arc<UInt8Vector>,
fields: Vec<BatchColumn>,
}

impl BatchBuilder {
/// Creates a new [BatchBuilder].
pub fn new(
primary_key: Vec<u8>,
timestamps: VectorRef,
sequences: Arc<UInt64Vector>,
op_types: Arc<UInt8Vector>,
) -> BatchBuilder {
BatchBuilder {
primary_key,
timestamps,
sequences,
op_types,
fields: Vec::new(),
}
}

/// Slice the batch, returning a new batch.
///
/// # Panics
/// Panics if `offset + length > self.num_rows()`.
pub fn slice(&self, offset: usize, length: usize) -> Batch {
let columns = self
.columns
.iter()
.map(|v| v.slice(offset, length))
.collect();
Batch { columns }
}

fn assert_columns(columns: &[VectorRef]) {
if columns.is_empty() {
return;
/// Set all field columns.
pub fn with_fields(mut self, fields: Vec<BatchColumn>) -> Self {
self.fields = fields;
self
}

/// Push a field column.
pub fn push_field(&mut self, column: BatchColumn) -> &mut Self {
self.fields.push(column);
self
}

/// Builds the [Batch].
pub fn build(self) -> Result<Batch> {
let ts_len = self.timestamps.len();
ensure!(
self.sequences.len() == ts_len,
InvalidBatchSnafu {
reason: format!(
"sequence have different len {} != {}",
self.sequences.len(),
ts_len
),
}
);
ensure!(
self.op_types.len() == ts_len,
InvalidBatchSnafu {
reason: format!(
"op type have different len {} != {}",
self.op_types.len(),
ts_len
),
}
);
for column in &self.fields {
ensure!(
column.data.len() == ts_len,
InvalidBatchSnafu {
reason: format!(
"column {} has different len {} != {}",
column.column_id,
column.data.len(),
ts_len
),
}
);
}

let length = columns[0].len();
assert!(columns.iter().all(|col| col.len() == length));
Ok(Batch {
primary_key: self.primary_key,
timestamps: self.timestamps,
sequences: self.sequences,
op_types: self.op_types,
fields: self.fields,
})
}
}

Expand Down Expand Up @@ -110,6 +224,7 @@ impl Source {
unimplemented!()
}

// TODO(yingwen): Maybe remove this method.
/// Returns statisics of fetched batches.
pub(crate) fn stats(&self) -> SourceStats {
unimplemented!()
Expand Down
19 changes: 16 additions & 3 deletions src/mito2/src/sst/parquet/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
//! Parquet writer.

use common_telemetry::debug;
use datatypes::arrow::record_batch::RecordBatch;
use object_store::ObjectStore;
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::metadata::KeyValue;
use parquet::file::properties::WriterProperties;
use snafu::ResultExt;

use crate::error::Result;
use crate::error::{NewRecordBatchSnafu, Result};
use crate::read::Source;
use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY};
use crate::sst::stream_writer::BufferedWriter;
Expand Down Expand Up @@ -64,17 +66,28 @@ impl<'a> ParquetWriter<'a> {

let writer_props = props_builder.build();

let arrow_schema = metadata.schema.arrow_schema();
let mut buffered_writer = BufferedWriter::try_new(
self.file_path.to_string(),
self.object_store.clone(),
&metadata.schema,
arrow_schema.clone(),
Some(writer_props),
opts.write_buffer_size.as_bytes() as usize,
)
.await?;

while let Some(batch) = self.source.next_batch().await? {
buffered_writer.write(&batch).await?;
let arrow_batch = RecordBatch::try_new(
arrow_schema.clone(),
batch
.fields()
.iter()
.map(|v| v.data.to_arrow_array())
.collect::<Vec<_>>(),
)
.context(NewRecordBatchSnafu)?;

buffered_writer.write(&arrow_batch).await?;
}
// Get stats from the source.
let stats = self.source.stats();
Expand Down
27 changes: 7 additions & 20 deletions src/mito2/src/sst/stream_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,22 @@ use std::pin::Pin;

use common_datasource::buffered_writer::LazyBufferedWriter;
use common_datasource::share_buffer::SharedBuffer;
use datatypes::arrow;
use datatypes::arrow::datatypes::SchemaRef;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::schema::SchemaRef;
use object_store::ObjectStore;
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
use parquet::format::FileMetaData;
use snafu::ResultExt;

use crate::error;
use crate::error::{NewRecordBatchSnafu, WriteParquetSnafu};
use crate::read::Batch;
use crate::error::WriteParquetSnafu;

/// Parquet writer that buffers row groups in memory and writes buffered data to an underlying
/// storage by chunks to reduce memory consumption.
pub struct BufferedWriter {
inner: InnerBufferedWriter,
arrow_schema: arrow::datatypes::SchemaRef,
arrow_schema: SchemaRef,
}

type InnerBufferedWriter = LazyBufferedWriter<
Expand All @@ -56,11 +54,10 @@ impl BufferedWriter {
pub async fn try_new(
path: String,
store: ObjectStore,
schema: &SchemaRef,
arrow_schema: SchemaRef,
props: Option<WriterProperties>,
buffer_threshold: usize,
) -> error::Result<Self> {
let arrow_schema = schema.arrow_schema();
let buffer = SharedBuffer::with_capacity(buffer_threshold);

let arrow_writer = ArrowWriter::try_new(buffer.clone(), arrow_schema.clone(), props)
Expand All @@ -82,24 +79,14 @@ impl BufferedWriter {
})
}),
),
arrow_schema: arrow_schema.clone(),
arrow_schema,
})
}

/// Write a record batch to stream writer.
pub async fn write(&mut self, batch: &Batch) -> error::Result<()> {
let arrow_batch = RecordBatch::try_new(
self.arrow_schema.clone(),
batch
.columns
.iter()
.map(|v| v.to_arrow_array())
.collect::<Vec<_>>(),
)
.context(NewRecordBatchSnafu)?;

pub async fn write(&mut self, arrow_batch: &RecordBatch) -> error::Result<()> {
self.inner
.write(&arrow_batch)
.write(arrow_batch)
.await
.context(error::WriteBufferSnafu)?;
self.inner
Expand Down