Skip to content

Commit

Permalink
add tests for put request
Browse files Browse the repository at this point in the history
Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia committed Nov 16, 2023
1 parent 99a961c commit c9fd09e
Show file tree
Hide file tree
Showing 3 changed files with 231 additions and 20 deletions.
158 changes: 143 additions & 15 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl RegionEngine for MetricEngine {
request: RegionRequest,
) -> std::result::Result<Output, BoxedError> {
let result = match request {
RegionRequest::Put(_) => todo!(),
RegionRequest::Put(put) => self.inner.put_region(region_id, put).await,
RegionRequest::Delete(_) => todo!(),
RegionRequest::Create(create) => self
.inner
Expand Down Expand Up @@ -403,6 +403,16 @@ impl MetricEngineInner {
self.metadata_region
.add_logical_region(metadata_region_id, logical_region_id)
.await?;
for col in &request.column_metadatas {
self.metadata_region
.add_column(
metadata_region_id,
logical_region_id,
&col.column_schema.name,
col.semantic_type,
)
.await?;
}

// update the mapping
// Safety: previous steps ensure the physical region exist
Expand Down Expand Up @@ -660,7 +670,7 @@ impl MetricEngineInner {

let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
let mut columns_to_add = vec![];
for col in columns {
for col in &columns {
if self
.metadata_region
.column_semantic_type(
Expand All @@ -671,10 +681,11 @@ impl MetricEngineInner {
.await?
.is_none()
{
columns_to_add.push(col.column_metadata);
columns_to_add.push(col.column_metadata.clone());
}
}

// alter data region
let data_region_id = utils::to_data_region_id(physical_region_id);
self.add_columns_to_physical_data_region(
data_region_id,
Expand All @@ -684,6 +695,18 @@ impl MetricEngineInner {
)
.await?;

// register columns to logical region
for col in columns {
self.metadata_region
.add_column(
metadata_region_id,
region_id,
&col.column_metadata.column_schema.name,
col.column_metadata.semantic_type,
)
.await?;
}

Ok(())
}

Expand All @@ -700,10 +723,35 @@ impl MetricEngineInner {
}

impl MetricEngineInner {
pub async fn put_logical_region(
/// Dispatch region put request
pub async fn put_region(
&self,
logical_region_id: RegionId,
region_id: RegionId,
request: RegionPutRequest,
) -> Result<Output> {
let is_putting_physical_region = self
.state
.read()
.await
.physical_regions
.contains_key(&region_id);

if is_putting_physical_region {
info!(
"Metric region received put request {request:?} on physical region {region_id:?}"
);
FORBIDDEN_OPERATION_COUNT.inc();

ForbiddenPhysicalAlterSnafu.fail()
} else {
self.put_logical_region(region_id, request).await
}
}

async fn put_logical_region(
&self,
logical_region_id: RegionId,
mut request: RegionPutRequest,
) -> Result<Output> {
let physical_region_id = *self
.state
Expand All @@ -721,6 +769,8 @@ impl MetricEngineInner {
.await?;

// write to data region
// TODO: retrieve table name
self.modify_rows("test".to_string(), &mut request.rows)?;
self.data_region.write_data(data_region_id, request).await
}

Expand Down Expand Up @@ -772,7 +822,7 @@ impl MetricEngineInner {
/// - Change the semantic type of tag columns to field
/// - Add table_name column
/// - Generate tsid
fn modify_rows(&self, table_name: String, mut rows: Rows) -> Result<Rows> {
fn modify_rows(&self, table_name: String, rows: &mut Rows) -> Result<()> {
// gather tag column indices
let mut tag_col_indices = rows
.schema
Expand All @@ -788,8 +838,9 @@ impl MetricEngineInner {
.collect::<Vec<_>>();

// generate new schema
let mut new_schema = rows
rows.schema = rows
.schema
.clone()
.into_iter()
.map(|mut col| {
if col.semantic_type == SemanticType::Tag as i32 {
Expand All @@ -799,30 +850,29 @@ impl MetricEngineInner {
})
.collect::<Vec<_>>();
// add table_name column
new_schema.push(PbColumnSchema {
rows.schema.push(PbColumnSchema {
column_name: DATA_SCHEMA_METRIC_NAME_COLUMN_NAME.to_string(),
datatype: to_column_data_type(&ConcreteDataType::string_datatype())
.unwrap()
.into(),
semantic_type: SemanticType::Tag as _,
});
// add tsid column
new_schema.push(PbColumnSchema {
rows.schema.push(PbColumnSchema {
column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
datatype: to_column_data_type(&ConcreteDataType::uint64_datatype())
.unwrap()
.into(),
semantic_type: SemanticType::Tag as _,
});
rows.schema = new_schema;

// fill internal columns
let mut random_state = ahash::RandomState::with_seeds(1, 2, 3, 4);
for row in &mut rows.rows {
Self::fill_internal_columns(&mut random_state, &table_name, &tag_col_indices, row);
}

Ok(rows)
Ok(())
}

/// Fills internal columns of a row with table name and a hash of tag values.
Expand All @@ -836,9 +886,9 @@ impl MetricEngineInner {
for (idx, name) in tag_col_indices {
let tag = row.values[*idx].clone();
name.hash(&mut hasher);
match tag.value_data {
Some(ValueData::StringValue(string)) => string.hash(&mut hasher),
_ => {}
// The type is checked before. So only null is ignored.
if let Some(ValueData::StringValue(string)) = tag.value_data {
string.hash(&mut hasher);
}
}
let hash = hasher.finish();
Expand All @@ -854,10 +904,11 @@ impl MetricEngineInner {
mod tests {
use std::hash::Hash;

use api::v1::region::alter_request;
use store_api::region_request::AddColumn;

use super::*;
use crate::test_util::TestEnv;
use crate::test_util::{self, TestEnv};

#[test]
fn test_verify_region_create_request() {
Expand Down Expand Up @@ -1059,5 +1110,82 @@ mod tests {
.unwrap()
.unwrap();
assert_eq!(semantic_type, SemanticType::Tag);
let timestamp_index = metadata_region
.column_semantic_type(physical_region_id, logical_region_id, "greptime_timestamp")
.await
.unwrap()
.unwrap();
assert_eq!(timestamp_index, SemanticType::Timestamp);
}

#[tokio::test]
async fn test_write_logical_region() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let engine = env.metric();

// add columns
let logical_region_id = env.default_logical_region_id();
let columns = &["odd", "even", "Ev_En"];
let alter_request = test_util::alter_logical_region_add_tag_columns(columns);
engine
.handle_request(logical_region_id, RegionRequest::Alter(alter_request))
.await
.unwrap();

// prepare data
let schema = test_util::row_schema_with_tags(columns);
let rows = test_util::build_rows(3, 100);
let request = RegionRequest::Put(RegionPutRequest {
rows: Rows { schema, rows },
});

// write data
let Output::AffectedRows(count) = engine
.handle_request(logical_region_id, request)
.await
.unwrap()
else {
panic!()
};
assert_eq!(100, count);
}

#[tokio::test]
async fn test_write_physical_region() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let engine = env.metric();

let physical_region_id = env.default_physical_region_id();
let schema = test_util::row_schema_with_tags(&["abc"]);
let rows = test_util::build_rows(1, 100);
let request = RegionRequest::Put(RegionPutRequest {
rows: Rows { schema, rows },
});

engine
.handle_request(physical_region_id, request)
.await
.unwrap_err();
}

#[tokio::test]
async fn test_write_nonexist_logical_region() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let engine = env.metric();

let logical_region_id = RegionId::new(175, 8345);
let schema = test_util::row_schema_with_tags(&["def"]);
let rows = test_util::build_rows(1, 100);
let request = RegionRequest::Put(RegionPutRequest {
rows: Rows { schema, rows },
});

engine
.handle_request(logical_region_id, request)
.await
.unwrap_err();
}
}
90 changes: 87 additions & 3 deletions src/metric-engine/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,20 @@

//! Utilities for testing.
use api::v1::SemanticType;
use api::helper::to_column_data_type;
use api::v1::value::ValueData;
use api::v1::{ColumnSchema as PbColumnSchema, Row, SemanticType, Value};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use mito2::config::MitoConfig;
use mito2::engine::MitoEngine;
use mito2::test_util::TestEnv as MitoTestEnv;
pub use mito2::test_util::{build_rows, rows_schema};
use object_store::util::join_dir;
use store_api::metadata::ColumnMetadata;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionCreateRequest, RegionRequest};
use store_api::region_request::{
AddColumn, AlterKind, RegionAlterRequest, RegionCreateRequest, RegionRequest,
};
use store_api::storage::RegionId;

use crate::data_region::DataRegion;
Expand Down Expand Up @@ -182,6 +185,87 @@ impl TestEnv {
}
}

/// Generate a [RegionAlterRequest] for adding tag columns.
pub fn alter_logical_region_add_tag_columns(new_tags: &[&str]) -> RegionAlterRequest {
let mut new_columns = vec![];
for (i, tag) in new_tags.iter().enumerate() {
new_columns.push(AddColumn {
column_metadata: ColumnMetadata {
column_id: i as u32,
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
tag.to_string(),
ConcreteDataType::string_datatype(),
false,
),
},
location: None,
});
}
RegionAlterRequest {
schema_version: 0,
kind: AlterKind::AddColumns {
columns: new_columns,
},
}
}

/// Generate a row schema with given tag columns.
///
/// The result will also contains default timestamp and value column at beginning.
pub fn row_schema_with_tags(tags: &[&str]) -> Vec<PbColumnSchema> {
let mut schema = vec![
PbColumnSchema {
column_name: "greptime_timestamp".to_string(),
datatype: to_column_data_type(&ConcreteDataType::timestamp_millisecond_datatype())
.unwrap()
.into(),
semantic_type: SemanticType::Timestamp as _,
},
PbColumnSchema {
column_name: "greptime_value".to_string(),
datatype: to_column_data_type(&ConcreteDataType::float64_datatype())
.unwrap()
.into(),
semantic_type: SemanticType::Field as _,
},
];
for tag in tags {
schema.push(PbColumnSchema {
column_name: tag.to_string(),
datatype: to_column_data_type(&ConcreteDataType::string_datatype())
.unwrap()
.into(),
semantic_type: SemanticType::Tag as _,
});
}
schema
}

/// Build [Rows] for assembling [RegionPutRequest](store_api::region_request::RegionPutRequest).
///
/// The schema is generated by [row_schema_with_tags].
pub fn build_rows(num_tags: usize, num_rows: usize) -> Vec<Row> {
let mut rows = vec![];
for i in 0..num_rows {
let mut values = vec![
Value {
value_data: Some(ValueData::TimestampMillisecondValue(i as _)),
},
Value {
value_data: Some(ValueData::F64Value(i as f64)),
},
];
for j in 0..num_tags {
values.push(Value {
value_data: Some(ValueData::StringValue(format!("tag_{}", j))),
});
}
rows.push(Row { values });
}
rows
}

#[cfg(test)]
mod test {

Expand Down
3 changes: 1 addition & 2 deletions src/mito2/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ use api::helper::{
use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value};
use common_query::Output;
use common_query::Output::AffectedRows;
use common_telemetry::tracing::log::info;
use common_telemetry::warn;
use common_telemetry::{info, warn};
use datatypes::prelude::DataType;
use prometheus::HistogramTimer;
use prost::Message;
Expand Down

0 comments on commit c9fd09e

Please sign in to comment.