Skip to content

Commit

Permalink
feat: support querying metric engine from frontend (#2987)
Browse files Browse the repository at this point in the history
* query one logical table

Signed-off-by: Ruihang Xia <[email protected]>

* map column id

Signed-off-by: Ruihang Xia <[email protected]>

* remove deadcode

Signed-off-by: Ruihang Xia <[email protected]>

* fix typo

Signed-off-by: Ruihang Xia <[email protected]>

* remove redundent column name

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Dec 25, 2023
1 parent 0d42651 commit 48cd22d
Show file tree
Hide file tree
Showing 11 changed files with 247 additions and 123 deletions.
9 changes: 6 additions & 3 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,10 @@ impl RegionEngine for MetricEngine {

/// Retrieves region's metadata.
async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
todo!()
self.inner
.load_region_metadata(region_id)
.await
.map_err(BoxedError::new)
}

/// Retrieves region's disk usage.
Expand Down Expand Up @@ -261,7 +264,7 @@ mod test {
.await
.unwrap_err();

// open nonexistent region
// open nonexistent region won't report error
let invalid_open_request = RegionOpenRequest {
engine: METRIC_ENGINE_NAME.to_string(),
region_dir: env.default_region_dir(),
Expand All @@ -274,6 +277,6 @@ mod test {
RegionRequest::Open(invalid_open_request),
)
.await
.unwrap_err();
.unwrap();
}
}
7 changes: 1 addition & 6 deletions src/metric-engine/src/engine/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,7 @@ impl MetricEngineInner {
// register columns to logical region
for col in columns {
self.metadata_region
.add_column(
metadata_region_id,
region_id,
&col.column_metadata.column_schema.name,
col.column_metadata.semantic_type,
)
.add_column(metadata_region_id, region_id, &col.column_metadata)
.await?;
}

Expand Down
14 changes: 2 additions & 12 deletions src/metric-engine/src/engine/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,7 @@ impl MetricEngineInner {
.await?;
for col in &request.column_metadatas {
self.metadata_region
.add_column(
metadata_region_id,
logical_region_id,
&col.column_schema.name,
col.semantic_type,
)
.add_column(metadata_region_id, logical_region_id, col)
.await?;
}

Expand Down Expand Up @@ -221,12 +216,7 @@ impl MetricEngineInner {
// register columns to metadata region
for col in &new_columns {
self.metadata_region
.add_column(
metadata_region_id,
logical_region_id,
&col.column_schema.name,
col.semantic_type,
)
.add_column(metadata_region_id, logical_region_id, col)
.await?;
}

Expand Down
16 changes: 5 additions & 11 deletions src/metric-engine/src/engine/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,13 @@ impl MetricEngineInner {
self.open_physical_region(region_id, request).await?;
self.recover_states(region_id).await?;

Ok(0)
} else if self
.state
.read()
.await
.logical_regions()
.contains_key(&region_id)
{
// if the logical region is already open, do nothing
Ok(0)
} else {
// throw RegionNotFound error
Err(LogicalRegionNotFoundSnafu { region_id }.build())
// Don't check if the logical region exist. Because a logical region cannot be opened
// individually, it is always "open" if its physical region is open. But the engine
// can't tell if the logical region is not exist or the physical region is not opened
// yet. Thus simply return `Ok` here to ignore all those errors.
Ok(0)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/metric-engine/src/engine/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ mod tests {
// add columns
let logical_region_id = env.default_logical_region_id();
let columns = &["odd", "even", "Ev_En"];
let alter_request = test_util::alter_logical_region_add_tag_columns(columns);
let alter_request = test_util::alter_logical_region_add_tag_columns(123456, columns);
engine
.handle_request(logical_region_id, RegionRequest::Alter(alter_request))
.await
Expand Down
112 changes: 89 additions & 23 deletions src/metric-engine/src/engine/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,24 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use api::v1::SemanticType;
use common_query::logical_plan::Expr;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::{error, info, tracing};
use datafusion::logical_expr;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::{RegionMetadata, RegionMetadataBuilder, RegionMetadataRef};
use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
use store_api::region_engine::RegionEngine;
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::{RegionId, ScanRequest};

use crate::engine::MetricEngineInner;
use crate::error::{LogicalRegionNotFoundSnafu, MitoReadOperationSnafu, Result};
use crate::error::{
InvalidMetadataSnafu, LogicalRegionNotFoundSnafu, MitoReadOperationSnafu, Result,
};
use crate::utils;

impl MetricEngineInner {
Expand Down Expand Up @@ -67,17 +73,7 @@ impl MetricEngineInner {
logical_region_id: RegionId,
request: ScanRequest,
) -> Result<SendableRecordBatchStream> {
let physical_region_id = {
let state = &self.state.read().await;
state
.get_physical_region_id(logical_region_id)
.with_context(|| {
error!("Trying to read an nonexistent region {logical_region_id}");
LogicalRegionNotFoundSnafu {
region_id: logical_region_id,
}
})?
};
let physical_region_id = self.get_physical_region_id(logical_region_id).await?;
let data_region_id = utils::to_data_region_id(physical_region_id);
let request = self
.transform_request(physical_region_id, logical_region_id, request)
Expand All @@ -88,6 +84,38 @@ impl MetricEngineInner {
.context(MitoReadOperationSnafu)
}

pub async fn load_region_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef> {
let is_reading_physical_region = self
.state
.read()
.await
.physical_regions()
.contains_key(&region_id);

if is_reading_physical_region {
self.mito
.get_metadata(region_id)
.await
.context(MitoReadOperationSnafu)
} else {
let physical_region_id = self.get_physical_region_id(region_id).await?;
self.logical_region_metadata(physical_region_id, region_id)
.await
}
}

async fn get_physical_region_id(&self, logical_region_id: RegionId) -> Result<RegionId> {
let state = &self.state.read().await;
state
.get_physical_region_id(logical_region_id)
.with_context(|| {
error!("Trying to read an nonexistent region {logical_region_id}");
LogicalRegionNotFoundSnafu {
region_id: logical_region_id,
}
})
}

/// Transform the [ScanRequest] from logical region to physical data region.
async fn transform_request(
&self,
Expand Down Expand Up @@ -130,9 +158,13 @@ impl MetricEngineInner {
origin_projection: &[usize],
) -> Result<Vec<usize>> {
// project on logical columns
let logical_columns = self
let all_logical_columns = self
.load_logical_columns(physical_region_id, logical_region_id)
.await?;
let projected_logical_names = origin_projection
.iter()
.map(|i| all_logical_columns[*i].column_schema.name.clone())
.collect::<Vec<_>>();

// generate physical projection
let mut physical_projection = Vec::with_capacity(origin_projection.len());
Expand All @@ -142,10 +174,9 @@ impl MetricEngineInner {
.get_metadata(data_region_id)
.await
.context(MitoReadOperationSnafu)?;
for logical_proj in origin_projection {
let column_id = logical_columns[*logical_proj].column_id;
for name in projected_logical_names {
// Safety: logical columns is a strict subset of physical columns
physical_projection.push(physical_metadata.column_index_by_id(column_id).unwrap());
physical_projection.push(physical_metadata.column_index_by_name(&name).unwrap());
}

Ok(physical_projection)
Expand All @@ -159,21 +190,55 @@ impl MetricEngineInner {
) -> Result<Vec<usize>> {
let logical_columns = self
.load_logical_columns(physical_region_id, logical_region_id)
.await?;
.await?
.into_iter()
.map(|col| col.column_schema.name);
let mut projection = Vec::with_capacity(logical_columns.len());
let data_region_id = utils::to_data_region_id(physical_region_id);
let physical_metadata = self
.mito
.get_metadata(data_region_id)
.await
.context(MitoReadOperationSnafu)?;
for logical_col in logical_columns {
let column_id = logical_col.column_id;
for name in logical_columns {
// Safety: logical columns is a strict subset of physical columns
projection.push(physical_metadata.column_index_by_id(column_id).unwrap());
projection.push(physical_metadata.column_index_by_name(&name).unwrap());
}

Ok(projection)
}

pub async fn logical_region_metadata(
&self,
physical_region_id: RegionId,
logical_region_id: RegionId,
) -> Result<RegionMetadataRef> {
let logical_columns = self
.load_logical_columns(physical_region_id, logical_region_id)
.await?;

let primary_keys = logical_columns
.iter()
.filter_map(|col| {
if col.semantic_type == SemanticType::Tag {
Some(col.column_id)
} else {
None
}
})
.collect::<Vec<_>>();

let mut logical_metadata_builder = RegionMetadataBuilder::new(logical_region_id);
for col in logical_columns {
logical_metadata_builder.push_column_metadata(col);
}
logical_metadata_builder.primary_key(primary_keys);
let logical_metadata = logical_metadata_builder
.build()
.context(InvalidMetadataSnafu)?;

Ok(Arc::new(logical_metadata))
}
}

#[cfg(test)]
Expand Down Expand Up @@ -205,7 +270,8 @@ mod test {
.unwrap();

// add columns to the first logical region
let alter_request = alter_logical_region_add_tag_columns(&["987", "789", "654", "321"]);
let alter_request =
alter_logical_region_add_tag_columns(123456, &["987", "798", "654", "321"]);
env.metric()
.handle_request(logical_region_id, RegionRequest::Alter(alter_request))
.await
Expand All @@ -225,7 +291,7 @@ mod test {
.await
.unwrap();

assert_eq!(scan_req.projection.unwrap(), vec![0, 1, 4, 7, 8, 9, 10]);
assert_eq!(scan_req.projection.unwrap(), vec![0, 1, 4, 8, 9, 10, 11]);
assert_eq!(scan_req.filters.len(), 1);
assert_eq!(
scan_req.filters[0],
Expand All @@ -242,6 +308,6 @@ mod test {
.transform_request(physical_region_id, logical_region_id, scan_req)
.await
.unwrap();
assert_eq!(scan_req.projection.unwrap(), vec![0, 1, 4, 7, 8, 9, 10]);
assert_eq!(scan_req.projection.unwrap(), vec![0, 1, 4, 8, 9, 10, 11]);
}
}
21 changes: 3 additions & 18 deletions src/metric-engine/src/engine/region_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,34 +27,19 @@ use crate::error::Result;
impl MetricEngineInner {
/// Load column metadata of a logical region.
///
/// The return value is ordered.
/// The return value is ordered on [ColumnId].
pub async fn load_logical_columns(
&self,
physical_region_id: RegionId,
logical_region_id: RegionId,
) -> Result<Vec<ColumnMetadata>> {
// load logical and physical columns, and intersect them to get logical column metadata
let logical_columns = self
let mut logical_column_metadata = self
.metadata_region
.logical_columns(physical_region_id, logical_region_id)
.await?
.into_iter()
.collect::<HashMap<String, SemanticType>>();
let physical_columns = self
.data_region
.physical_columns(physical_region_id)
.await?;
let mut logical_column_metadata = physical_columns
.into_iter()
.filter_map(|mut col| {
// recover the semantic type of logical columns
logical_columns
.get(&col.column_schema.name)
.map(|semantic_type| {
col.semantic_type = *semantic_type;
col
})
})
.map(|(_, column_metadata)| column_metadata)
.collect::<Vec<_>>();

// sort columns on column id to ensure the order
Expand Down
18 changes: 17 additions & 1 deletion src/metric-engine/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to deserialize column metadata from {}", raw))]
DeserializeColumnMetadata {
raw: String,
#[snafu(source)]
error: serde_json::Error,
location: Location,
},

#[snafu(display("Failed to decode base64 column value"))]
DecodeColumnValue {
#[snafu(source)]
Expand Down Expand Up @@ -132,6 +140,12 @@ pub enum Error {

#[snafu(display("Alter request to physical region is forbidden"))]
ForbiddenPhysicalAlter { location: Location },

#[snafu(display("Invalid region metadata"))]
InvalidMetadata {
source: store_api::metadata::MetadataError,
location: Location,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand All @@ -150,8 +164,10 @@ impl ErrorExt for Error {

MissingInternalColumn { .. }
| DeserializeSemanticType { .. }
| DeserializeColumnMetadata { .. }
| DecodeColumnValue { .. }
| ParseRegionId { .. } => StatusCode::Unexpected,
| ParseRegionId { .. }
| InvalidMetadata { .. } => StatusCode::Unexpected,

PhysicalRegionNotFound { .. } | LogicalRegionNotFound { .. } => {
StatusCode::RegionNotFound
Expand Down
Loading

0 comments on commit 48cd22d

Please sign in to comment.