Skip to content

Commit

Permalink
refactor: change BatchConverter to ProjectionMapper
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed Aug 24, 2023
1 parent be6ecb7 commit 2a8cada
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 93 deletions.
43 changes: 10 additions & 33 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@

//! Scans a region according to the scan request.
use common_recordbatch::{SendableRecordBatchStream};
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::debug;
use common_time::range::TimestampRange;
use object_store::ObjectStore;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadata;
use store_api::storage::{ColumnId, ScanRequest};
use snafu::ResultExt;
use store_api::storage::ScanRequest;
use table::predicate::{Predicate, TimeRangePredicateBuilder};

use crate::error::{BuildPredicateSnafu, InvalidRequestSnafu, Result};
use crate::error::{BuildPredicateSnafu, Result};
use crate::read::seq_scan::SeqScan;
use crate::read::stream::ProjectionMapper;
use crate::region::version::VersionRef;
use crate::sst::file::FileHandle;

Expand Down Expand Up @@ -110,19 +110,17 @@ impl ScanRegion {
self.version.metadata.schema.clone(),
)
.context(BuildPredicateSnafu)?;
let projection = self
.request
.projection
.as_ref()
.map(|p| projection_indices_to_ids(&self.version.metadata, p))
.transpose()?;
let mapper = match &self.request.projection {
Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied())?,
None => ProjectionMapper::all(&self.version.metadata)?,
};

let seq_scan = SeqScan::new(
self.version.metadata.clone(),
&self.file_dir,
self.object_store.clone(),
mapper,
)
.with_projection(projection)
.with_time_range(Some(time_range))
.with_predicate(Some(predicate))
.with_memtables(memtables)
Expand Down Expand Up @@ -155,24 +153,3 @@ fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
file_ts_range.intersects(predicate)
}

/// Map projection indices to column ids.
fn projection_indices_to_ids(
metadata: &RegionMetadata,
projection: &[usize],
) -> Result<Vec<ColumnId>> {
let mut column_ids = Vec::with_capacity(projection.len());
// For each projection index, we get the column id.
for idx in projection {
let column_id = metadata
.column_metadatas
.get(*idx)
.context(InvalidRequestSnafu {
region_id: metadata.region_id,
reason: format!("Index {} out of bound", idx),
})?
.column_id;
column_ids.push(column_id);
}
Ok(column_ids)
}
30 changes: 14 additions & 16 deletions src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ use common_recordbatch::SendableRecordBatchStream;
use common_time::range::TimestampRange;
use object_store::ObjectStore;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;

use crate::memtable::MemtableRef;
use crate::read::stream::ProjectionMapper;
use crate::sst::file::FileHandle;

/// Scans a region and returns rows in a sorted sequence.
Expand All @@ -34,9 +34,9 @@ pub struct SeqScan {
file_dir: String,
/// Object store that stores SST files.
object_store: ObjectStore,
/// Maps projected Batches to RecordBatches.
mapper: ProjectionMapper,

/// Projection to push down.
projection: Option<Vec<ColumnId>>,
/// Time range filter for time index.
time_range: Option<TimestampRange>,
/// Predicate to push down.
Expand All @@ -50,50 +50,48 @@ pub struct SeqScan {
impl SeqScan {
/// Creates a new [SeqScan].
#[must_use]
pub fn new(metadata: RegionMetadataRef, file_dir: &str, object_store: ObjectStore) -> SeqScan {
pub(crate) fn new(
metadata: RegionMetadataRef,
file_dir: &str,
object_store: ObjectStore,
mapper: ProjectionMapper,
) -> SeqScan {
SeqScan {
metadata,
file_dir: file_dir.to_string(),
object_store,
projection: None,
mapper,
time_range: None,
predicate: None,
memtables: Vec::new(),
files: Vec::new(),
}
}

/// Set projection.
#[must_use]
pub fn with_projection(mut self, projection: Option<Vec<ColumnId>>) -> Self {
self.projection = projection;
self
}

/// Set time range filter for time index.
#[must_use]
pub fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
self.time_range = time_range;
self
}

/// Set predicate to push down.
#[must_use]
pub fn with_predicate(mut self, predicate: Option<Predicate>) -> Self {
pub(crate) fn with_predicate(mut self, predicate: Option<Predicate>) -> Self {
self.predicate = predicate;
self
}

/// Set memtables to read.
#[must_use]
pub fn with_memtables(mut self, memtables: Vec<MemtableRef>) -> Self {
pub(crate) fn with_memtables(mut self, memtables: Vec<MemtableRef>) -> Self {
self.memtables = memtables;
self
}

/// Set files to read.
#[must_use]
pub fn with_files(mut self, files: Vec<FileHandle>) -> Self {
pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
self.files = files;
self
}
Expand Down
105 changes: 62 additions & 43 deletions src/mito2/src/read/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,42 +14,38 @@

//! Record batch stream.
use std::sync::Arc;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::sync::Arc;
use std::task::{Context, Poll};

use api::v1::SemanticType;
use common_error::ext::BoxedError;
use common_recordbatch::RecordBatchStream;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatch};
use common_recordbatch::{RecordBatch, RecordBatchStream};
use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::schema::{SchemaRef, Schema};
use datatypes::schema::{Schema, SchemaRef};
use datatypes::value::ValueRef;
use datatypes::vectors::VectorRef;
use futures::Stream;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadata;

use crate::error::{InvalidRequestSnafu, Result};
use crate::read::Batch;
use crate::row_converter::{McmpRowCodec, SortField, RowCodec};
use crate::error::Result;
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};

/// Record batch stream implementation.
pub(crate) struct StreamImpl<S> {
/// [Batch] stream.
stream: S,
/// Converts [Batch]es from the `stream` to [RecordBatch].
converter: BatchConverter,
mapper: ProjectionMapper,
}

impl<S> StreamImpl<S> {
/// Returns a new stream from a batch stream.
pub(crate) fn new(stream: S, converter: BatchConverter) -> StreamImpl<S> {
StreamImpl {
stream,
converter,
}
pub(crate) fn new(stream: S, mapper: ProjectionMapper) -> StreamImpl<S> {
StreamImpl { stream, mapper }
}
}

Expand All @@ -59,11 +55,12 @@ impl<S: Stream<Item = Result<Batch>> + Unpin> Stream for StreamImpl<S> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Ready(Some(res)) => {
let record_batch = res.map_err(BoxedError::new).context(ExternalSnafu).and_then(|batch| {
self.converter.convert(&batch)
});
let record_batch = res
.map_err(BoxedError::new)
.context(ExternalSnafu)
.and_then(|batch| self.mapper.convert(&batch));
Poll::Ready(Some(record_batch))
},
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
Expand All @@ -72,12 +69,12 @@ impl<S: Stream<Item = Result<Batch>> + Unpin> Stream for StreamImpl<S> {

impl<S: Stream<Item = Result<Batch>> + Unpin> RecordBatchStream for StreamImpl<S> {
fn schema(&self) -> SchemaRef {
self.converter.output_schema.clone()
self.mapper.output_schema.clone()
}
}

/// Converts a [Batch] to a [RecordBatch].
pub(crate) struct BatchConverter {
/// Handles projection and converts a projected [Batch] to a projected [RecordBatch].
pub(crate) struct ProjectionMapper {
/// Maps column in [RecordBatch] to index in [Batch].
batch_indices: Vec<BatchIndex>,
/// Decoder for primary key.
Expand All @@ -86,25 +83,31 @@ pub(crate) struct BatchConverter {
output_schema: SchemaRef,
}

impl BatchConverter {
/// Returns a new converter with projection.
///
/// # Panics
/// Panics if any index in `projection` is out of bound.
pub(crate) fn new(metadata: &RegionMetadata, projection: impl Iterator<Item = usize>) -> BatchConverter {
impl ProjectionMapper {
/// Returns a new mapper with projection.
pub(crate) fn new(
metadata: &RegionMetadata,
projection: impl Iterator<Item = usize>,
) -> Result<ProjectionMapper> {
let mut batch_indices = Vec::with_capacity(projection.size_hint().0);
let mut column_schemas = Vec::with_capacity(projection.size_hint().0);
for idx in projection {
// For each projection index, we get the column id for projection.
let column = &metadata.column_metadatas[idx];
let column = metadata
.column_metadatas
.get(idx)
.context(InvalidRequestSnafu {
region_id: metadata.region_id,
reason: format!("projection index {} is out of bound", idx),
})?;

// Get column index in a batch by its semantic type and column id.
let batch_index = match column.semantic_type {
SemanticType::Tag => {
// Safety: It is a primary key column.
let index = metadata.primary_key_index(column.column_id).unwrap();
BatchIndex::Tag(index)
},
}
SemanticType::Timestamp => BatchIndex::Timestamp,
SemanticType::Field => {
// Safety: It is a field column.
Expand All @@ -114,6 +117,7 @@ impl BatchConverter {
};
batch_indices.push(batch_index);

// Safety: idx is valid.
column_schemas.push(metadata.schema.column_schemas()[idx].clone());
}

Expand All @@ -126,39 +130,47 @@ impl BatchConverter {
// Safety: Columns come from existing schema.
let output_schema = Arc::new(Schema::new(column_schemas));

BatchConverter {
Ok(ProjectionMapper {
batch_indices,
codec,
output_schema,
}
})
}

/// Returns a new converter without projection.
pub(crate) fn all(metadata: &RegionMetadata) -> BatchConverter {
BatchConverter::new(metadata, 0..metadata.column_metadatas.len())
/// Returns a new mapper without projection.
pub(crate) fn all(metadata: &RegionMetadata) -> Result<ProjectionMapper> {
ProjectionMapper::new(metadata, 0..metadata.column_metadatas.len())
}

/// Converts a [Batch] to a [RecordBatch].
///
/// The batch must match the `projection` using to build the converter.
/// The batch must match the `projection` using to build the mapper.
pub(crate) fn convert(&self, batch: &Batch) -> common_recordbatch::error::Result<RecordBatch> {
let pk_values = self.codec.decode(batch.primary_key()).map_err(BoxedError::new).context(ExternalSnafu)?;
let pk_values = self
.codec
.decode(batch.primary_key())
.map_err(BoxedError::new)
.context(ExternalSnafu)?;

let mut columns = Vec::with_capacity(self.output_schema.num_columns());
let num_rows = batch.num_rows();
for (index, column_schema) in self.batch_indices.iter().zip(self.output_schema.column_schemas()) {
for (index, column_schema) in self
.batch_indices
.iter()
.zip(self.output_schema.column_schemas())
{
match index {
BatchIndex::Tag(idx) => {
let value = pk_values[*idx].as_value_ref();
let vector = new_repeated_vector(&column_schema.data_type, value, num_rows)?;
columns.push(vector);
},
}
BatchIndex::Timestamp => {
columns.push(batch.timestamps().clone());
},
}
BatchIndex::Field(idx) => {
columns.push(batch.fields()[*idx].data.clone());
},
}
}
}

Expand All @@ -178,9 +190,16 @@ enum BatchIndex {
}

/// Returns a vector with repeated values.
fn new_repeated_vector(data_type: &ConcreteDataType, value: ValueRef, num_rows: usize) -> common_recordbatch::error::Result<VectorRef> {
fn new_repeated_vector(
data_type: &ConcreteDataType,
value: ValueRef,
num_rows: usize,
) -> common_recordbatch::error::Result<VectorRef> {
let mut mutable_vector = data_type.create_mutable_vector(1);
mutable_vector.try_push_value_ref(value).map_err(BoxedError::new).context(ExternalSnafu)?;
mutable_vector
.try_push_value_ref(value)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
// This requires an addtional allocation. TODO(yingwen): Add a way to create repeated vector to data type.
let base_vector = mutable_vector.to_vector();
Ok(base_vector.replicate(&[num_rows]))
Expand Down
3 changes: 2 additions & 1 deletion src/store-api/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ impl RegionMetadata {
///
/// This does a linear search.
pub fn field_index(&self, column_id: ColumnId) -> Option<usize> {
self.field_columns().position(|column| column.column_id == column_id)
self.field_columns()
.position(|column| column.column_id == column_id)
}

/// Checks whether the metadata is valid.
Expand Down

0 comments on commit 2a8cada

Please sign in to comment.