Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(mito): incorrect field index in ProjectionMapper #2388

Merged
merged 4 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub(crate) mod listener;
#[cfg(test)]
mod open_test;
#[cfg(test)]
mod projection_test;
#[cfg(test)]
mod tests;

use std::sync::Arc;
Expand Down
94 changes: 94 additions & 0 deletions src/mito2/src/engine/projection_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use api::v1::value::ValueData;
use api::v1::{Row, Rows};
use common_recordbatch::RecordBatches;
use store_api::region_engine::RegionEngine;
use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest};

use crate::config::MitoConfig;
use crate::test_util::{put_rows, rows_schema, CreateRequestBuilder, TestEnv};

/// Build rows for multiple tags and fields.
fn build_rows_multi_tags_fields(
tags: &[&str],
field_starts: &[usize],
ts_range: (usize, usize),
) -> Vec<Row> {
(ts_range.0..ts_range.1)
.enumerate()
.map(|(idx, ts)| {
let mut values = Vec::with_capacity(tags.len() + field_starts.len() + 1);
for tag in tags {
values.push(api::v1::Value {
value_data: Some(ValueData::StringValue(tag.to_string())),
});
}
for field_start in field_starts {
values.push(api::v1::Value {
value_data: Some(ValueData::F64Value((field_start + idx) as f64)),
});
}
values.push(api::v1::Value {
value_data: Some(ValueData::TsMillisecondValue(ts as i64 * 1000)),
});

api::v1::Row { values }
})
.collect()
}

#[tokio::test]
async fn test_scan_projection() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
// [tag_0, tag_1, field_0, field_1, ts]
let request = CreateRequestBuilder::new().tag_num(2).field_num(2).build();

let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

let rows = Rows {
schema: column_schemas,
rows: build_rows_multi_tags_fields(&["a", "b"], &[0, 10], (0, 3)),
};
put_rows(&engine, region_id, rows).await;

// Scans tag_1, field_1, ts
let request = ScanRequest {
sequence: None,
projection: Some(vec![1, 3, 4]),
filters: Vec::new(),
output_ordering: None,
limit: None,
};
let stream = engine.handle_query(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_1 | field_1 | ts |
+-------+---------+---------------------+
| b | 10.0 | 1970-01-01T00:00:00 |
| b | 11.0 | 1970-01-01T00:00:01 |
| b | 12.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}
57 changes: 36 additions & 21 deletions src/mito2/src/read/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

//! Utilities for projection.

use std::collections::HashMap;
use std::sync::Arc;

use api::v1::SemanticType;
Expand Down Expand Up @@ -55,50 +56,64 @@ impl ProjectionMapper {
metadata: &RegionMetadataRef,
projection: impl Iterator<Item = usize>,
) -> Result<ProjectionMapper> {
let projection_len = projection.size_hint().0;
let mut batch_indices = Vec::with_capacity(projection_len);
let mut column_schemas = Vec::with_capacity(projection_len);
let mut column_ids = Vec::with_capacity(projection_len);
for idx in projection {
let projection: Vec<_> = projection.collect();
let mut column_schemas = Vec::with_capacity(projection.len());
let mut column_ids = Vec::with_capacity(projection.len());
for idx in &projection {
// For each projection index, we get the column id for projection.
let column = metadata
.column_metadatas
.get(idx)
.get(*idx)
.context(InvalidRequestSnafu {
region_id: metadata.region_id,
reason: format!("projection index {} is out of bound", idx),
})?;

column_ids.push(column.column_id);
// Safety: idx is valid.
column_schemas.push(metadata.schema.column_schemas()[*idx].clone());
}
let codec = McmpRowCodec::new(
metadata
.primary_key_columns()
.map(|c| SortField::new(c.column_schema.data_type.clone()))
.collect(),
);
// Safety: Columns come from existing schema.
let output_schema = Arc::new(Schema::new(column_schemas));
// Get fields in each batch.
let batch_fields = Batch::projected_fields(metadata, &column_ids);

// Field column id to its index in batch.
let field_id_to_index: HashMap<_, _> = batch_fields
.iter()
.enumerate()
.map(|(index, column_id)| (*column_id, index))
.collect();
// For each projected column, compute its index in batches.
let mut batch_indices = Vec::with_capacity(projection.len());
for idx in &projection {
// Safety: idx is valid.
let column = &metadata.column_metadatas[*idx];
// Get column index in a batch by its semantic type and column id.
let batch_index = match column.semantic_type {
SemanticType::Tag => {
// Safety: It is a primary key column.
let index = metadata.primary_key_index(column.column_id).unwrap();
// We always read all primary key so the column always exists and the tag
// index is always valid.
BatchIndex::Tag(index)
}
SemanticType::Timestamp => BatchIndex::Timestamp,
SemanticType::Field => {
// Safety: It is a field column.
let index = metadata.field_index(column.column_id).unwrap();
// Safety: It is a field column so it should be in `field_id_to_index`.
let index = field_id_to_index[&column.column_id];
BatchIndex::Field(index)
}
};
batch_indices.push(batch_index);
column_ids.push(column.column_id);
// Safety: idx is valid.
column_schemas.push(metadata.schema.column_schemas()[idx].clone());
}

let codec = McmpRowCodec::new(
metadata
.primary_key_columns()
.map(|c| SortField::new(c.column_schema.data_type.clone()))
.collect(),
);
// Safety: Columns come from existing schema.
let output_schema = Arc::new(Schema::new(column_schemas));
let batch_fields = Batch::projected_fields(metadata, &column_ids);

Ok(ProjectionMapper {
metadata: metadata.clone(),
batch_indices,
Expand Down
4 changes: 3 additions & 1 deletion src/mito2/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ impl TestEnv {
}

/// Builder to mock a [RegionCreateRequest].
///
/// It builds schema like `[tag_0, tag_1, ..., field_0, field_1, ..., ts]`.
pub struct CreateRequestBuilder {
region_dir: String,
tag_num: usize,
Expand Down Expand Up @@ -232,7 +234,7 @@ impl CreateRequestBuilder {
}

pub fn field_num(mut self, value: usize) -> Self {
self.tag_num = value;
self.field_num = value;
self
}

Expand Down
8 changes: 0 additions & 8 deletions src/store-api/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,14 +233,6 @@ impl RegionMetadata {
self.primary_key.iter().position(|id| *id == column_id)
}

/// Returns a column's index in fields if it is a field column.
///
/// This does a linear search.
pub fn field_index(&self, column_id: ColumnId) -> Option<usize> {
self.field_columns()
.position(|column| column.column_id == column_id)
}

/// Checks whether the metadata is valid.
fn validate(&self) -> Result<()> {
// Id to name.
Expand Down
Loading