Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support querying metric engine from frontend #2987

Merged
merged 5 commits into from
Dec 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,10 @@ impl RegionEngine for MetricEngine {

/// Retrieves region's metadata.
async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
todo!()
self.inner
.load_region_metadata(region_id)
.await
.map_err(BoxedError::new)
}

/// Retrieves region's disk usage.
Expand Down Expand Up @@ -261,7 +264,7 @@ mod test {
.await
.unwrap_err();

// open nonexistent region
// open nonexistent region won't report error
let invalid_open_request = RegionOpenRequest {
engine: METRIC_ENGINE_NAME.to_string(),
region_dir: env.default_region_dir(),
Expand All @@ -274,6 +277,6 @@ mod test {
RegionRequest::Open(invalid_open_request),
)
.await
.unwrap_err();
.unwrap();
}
}
7 changes: 1 addition & 6 deletions src/metric-engine/src/engine/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,7 @@ impl MetricEngineInner {
// register columns to logical region
for col in columns {
self.metadata_region
.add_column(
metadata_region_id,
region_id,
&col.column_metadata.column_schema.name,
col.column_metadata.semantic_type,
)
.add_column(metadata_region_id, region_id, &col.column_metadata)
.await?;
}

Expand Down
14 changes: 2 additions & 12 deletions src/metric-engine/src/engine/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,7 @@ impl MetricEngineInner {
.await?;
for col in &request.column_metadatas {
self.metadata_region
.add_column(
metadata_region_id,
logical_region_id,
&col.column_schema.name,
col.semantic_type,
)
.add_column(metadata_region_id, logical_region_id, col)
.await?;
}

Expand Down Expand Up @@ -221,12 +216,7 @@ impl MetricEngineInner {
// register columns to metadata region
for col in &new_columns {
self.metadata_region
.add_column(
metadata_region_id,
logical_region_id,
&col.column_schema.name,
col.semantic_type,
)
.add_column(metadata_region_id, logical_region_id, col)
.await?;
}

Expand Down
16 changes: 5 additions & 11 deletions src/metric-engine/src/engine/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,13 @@ impl MetricEngineInner {
self.open_physical_region(region_id, request).await?;
self.recover_states(region_id).await?;

Ok(0)
} else if self
.state
.read()
.await
.logical_regions()
.contains_key(&region_id)
{
// if the logical region is already open, do nothing
Ok(0)
} else {
// throw RegionNotFound error
Err(LogicalRegionNotFoundSnafu { region_id }.build())
// Don't check if the logical region exist. Because a logical region cannot be opened
// individually, it is always "open" if its physical region is open. But the engine
// can't tell if the logical region is not exist or the physical region is not opened
// yet. Thus simply return `Ok` here to ignore all those errors.
Ok(0)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/metric-engine/src/engine/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ mod tests {
// add columns
let logical_region_id = env.default_logical_region_id();
let columns = &["odd", "even", "Ev_En"];
let alter_request = test_util::alter_logical_region_add_tag_columns(columns);
let alter_request = test_util::alter_logical_region_add_tag_columns(123456, columns);
engine
.handle_request(logical_region_id, RegionRequest::Alter(alter_request))
.await
Expand Down
112 changes: 89 additions & 23 deletions src/metric-engine/src/engine/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,24 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use api::v1::SemanticType;
use common_query::logical_plan::Expr;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::{error, info, tracing};
use datafusion::logical_expr;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::{RegionMetadata, RegionMetadataBuilder, RegionMetadataRef};
use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
use store_api::region_engine::RegionEngine;
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::{RegionId, ScanRequest};

use crate::engine::MetricEngineInner;
use crate::error::{LogicalRegionNotFoundSnafu, MitoReadOperationSnafu, Result};
use crate::error::{
InvalidMetadataSnafu, LogicalRegionNotFoundSnafu, MitoReadOperationSnafu, Result,
};
use crate::utils;

impl MetricEngineInner {
Expand Down Expand Up @@ -67,17 +73,7 @@ impl MetricEngineInner {
logical_region_id: RegionId,
request: ScanRequest,
) -> Result<SendableRecordBatchStream> {
let physical_region_id = {
let state = &self.state.read().await;
state
.get_physical_region_id(logical_region_id)
.with_context(|| {
error!("Trying to read an nonexistent region {logical_region_id}");
LogicalRegionNotFoundSnafu {
region_id: logical_region_id,
}
})?
};
let physical_region_id = self.get_physical_region_id(logical_region_id).await?;
let data_region_id = utils::to_data_region_id(physical_region_id);
let request = self
.transform_request(physical_region_id, logical_region_id, request)
Expand All @@ -88,6 +84,38 @@ impl MetricEngineInner {
.context(MitoReadOperationSnafu)
}

pub async fn load_region_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef> {
let is_reading_physical_region = self
.state
.read()
.await
.physical_regions()
.contains_key(&region_id);

if is_reading_physical_region {
self.mito
.get_metadata(region_id)
.await
.context(MitoReadOperationSnafu)
} else {
let physical_region_id = self.get_physical_region_id(region_id).await?;
self.logical_region_metadata(physical_region_id, region_id)
.await
}
}

async fn get_physical_region_id(&self, logical_region_id: RegionId) -> Result<RegionId> {
let state = &self.state.read().await;
state
.get_physical_region_id(logical_region_id)
.with_context(|| {
error!("Trying to read an nonexistent region {logical_region_id}");
LogicalRegionNotFoundSnafu {
region_id: logical_region_id,
}
})
}

/// Transform the [ScanRequest] from logical region to physical data region.
async fn transform_request(
&self,
Expand Down Expand Up @@ -130,9 +158,13 @@ impl MetricEngineInner {
origin_projection: &[usize],
) -> Result<Vec<usize>> {
// project on logical columns
let logical_columns = self
let all_logical_columns = self
.load_logical_columns(physical_region_id, logical_region_id)
.await?;
let projected_logical_names = origin_projection
.iter()
.map(|i| all_logical_columns[*i].column_schema.name.clone())
.collect::<Vec<_>>();

// generate physical projection
let mut physical_projection = Vec::with_capacity(origin_projection.len());
Expand All @@ -142,10 +174,9 @@ impl MetricEngineInner {
.get_metadata(data_region_id)
.await
.context(MitoReadOperationSnafu)?;
for logical_proj in origin_projection {
let column_id = logical_columns[*logical_proj].column_id;
for name in projected_logical_names {
// Safety: logical columns is a strict subset of physical columns
physical_projection.push(physical_metadata.column_index_by_id(column_id).unwrap());
physical_projection.push(physical_metadata.column_index_by_name(&name).unwrap());
}

Ok(physical_projection)
Expand All @@ -159,21 +190,55 @@ impl MetricEngineInner {
) -> Result<Vec<usize>> {
let logical_columns = self
.load_logical_columns(physical_region_id, logical_region_id)
.await?;
.await?
.into_iter()
.map(|col| col.column_schema.name);
let mut projection = Vec::with_capacity(logical_columns.len());
let data_region_id = utils::to_data_region_id(physical_region_id);
let physical_metadata = self
.mito
.get_metadata(data_region_id)
.await
.context(MitoReadOperationSnafu)?;
for logical_col in logical_columns {
let column_id = logical_col.column_id;
for name in logical_columns {
// Safety: logical columns is a strict subset of physical columns
projection.push(physical_metadata.column_index_by_id(column_id).unwrap());
projection.push(physical_metadata.column_index_by_name(&name).unwrap());
}

Ok(projection)
}

pub async fn logical_region_metadata(
&self,
physical_region_id: RegionId,
logical_region_id: RegionId,
) -> Result<RegionMetadataRef> {
let logical_columns = self
.load_logical_columns(physical_region_id, logical_region_id)
.await?;

let primary_keys = logical_columns
.iter()
.filter_map(|col| {
if col.semantic_type == SemanticType::Tag {
Some(col.column_id)
} else {
None
}
})
.collect::<Vec<_>>();

let mut logical_metadata_builder = RegionMetadataBuilder::new(logical_region_id);
for col in logical_columns {
logical_metadata_builder.push_column_metadata(col);
}
logical_metadata_builder.primary_key(primary_keys);
let logical_metadata = logical_metadata_builder
.build()
.context(InvalidMetadataSnafu)?;

Ok(Arc::new(logical_metadata))
}
}

#[cfg(test)]
Expand Down Expand Up @@ -205,7 +270,8 @@ mod test {
.unwrap();

// add columns to the first logical region
let alter_request = alter_logical_region_add_tag_columns(&["987", "789", "654", "321"]);
let alter_request =
alter_logical_region_add_tag_columns(123456, &["987", "798", "654", "321"]);
env.metric()
.handle_request(logical_region_id, RegionRequest::Alter(alter_request))
.await
Expand All @@ -225,7 +291,7 @@ mod test {
.await
.unwrap();

assert_eq!(scan_req.projection.unwrap(), vec![0, 1, 4, 7, 8, 9, 10]);
assert_eq!(scan_req.projection.unwrap(), vec![0, 1, 4, 8, 9, 10, 11]);
assert_eq!(scan_req.filters.len(), 1);
assert_eq!(
scan_req.filters[0],
Expand All @@ -242,6 +308,6 @@ mod test {
.transform_request(physical_region_id, logical_region_id, scan_req)
.await
.unwrap();
assert_eq!(scan_req.projection.unwrap(), vec![0, 1, 4, 7, 8, 9, 10]);
assert_eq!(scan_req.projection.unwrap(), vec![0, 1, 4, 8, 9, 10, 11]);
}
}
21 changes: 3 additions & 18 deletions src/metric-engine/src/engine/region_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,34 +27,19 @@ use crate::error::Result;
impl MetricEngineInner {
/// Load column metadata of a logical region.
///
/// The return value is ordered.
/// The return value is ordered on [ColumnId].
pub async fn load_logical_columns(
&self,
physical_region_id: RegionId,
logical_region_id: RegionId,
) -> Result<Vec<ColumnMetadata>> {
// load logical and physical columns, and intersect them to get logical column metadata
let logical_columns = self
let mut logical_column_metadata = self
.metadata_region
.logical_columns(physical_region_id, logical_region_id)
.await?
.into_iter()
.collect::<HashMap<String, SemanticType>>();
let physical_columns = self
.data_region
.physical_columns(physical_region_id)
.await?;
let mut logical_column_metadata = physical_columns
.into_iter()
.filter_map(|mut col| {
// recover the semantic type of logical columns
logical_columns
.get(&col.column_schema.name)
.map(|semantic_type| {
col.semantic_type = *semantic_type;
col
})
})
.map(|(_, column_metadata)| column_metadata)
.collect::<Vec<_>>();

// sort columns on column id to ensure the order
Expand Down
18 changes: 17 additions & 1 deletion src/metric-engine/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to deserialize column metadata from {}", raw))]
DeserializeColumnMetadata {
raw: String,
#[snafu(source)]
error: serde_json::Error,
location: Location,
},

#[snafu(display("Failed to decode base64 column value"))]
DecodeColumnValue {
#[snafu(source)]
Expand Down Expand Up @@ -132,6 +140,12 @@ pub enum Error {

#[snafu(display("Alter request to physical region is forbidden"))]
ForbiddenPhysicalAlter { location: Location },

#[snafu(display("Invalid region metadata"))]
InvalidMetadata {
source: store_api::metadata::MetadataError,
location: Location,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand All @@ -150,8 +164,10 @@ impl ErrorExt for Error {

MissingInternalColumn { .. }
| DeserializeSemanticType { .. }
| DeserializeColumnMetadata { .. }
| DecodeColumnValue { .. }
| ParseRegionId { .. } => StatusCode::Unexpected,
| ParseRegionId { .. }
| InvalidMetadata { .. } => StatusCode::Unexpected,

PhysicalRegionNotFound { .. } | LogicalRegionNotFound { .. } => {
StatusCode::RegionNotFound
Expand Down
Loading
Loading