From 2d55f18902ca80ad73cebb0cbfbf386ecd6d96d7 Mon Sep 17 00:00:00 2001 From: luofucong Date: Wed, 3 Jan 2024 21:02:27 +0800 Subject: [PATCH] feat: register logical regions --- src/datanode/src/error.rs | 12 ++++++++ src/datanode/src/region_server.rs | 49 +++++++++++++++++++++++++++--- src/datanode/src/tests.rs | 4 +++ src/file-engine/src/engine.rs | 5 +++ src/metric-engine/src/engine.rs | 13 ++++++++ src/mito2/src/engine.rs | 5 +++ src/store-api/src/region_engine.rs | 3 ++ 7 files changed, 86 insertions(+), 5 deletions(-) diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 95aa0bb22b58..94724ffb95a0 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -272,6 +272,16 @@ pub enum Error { location: Location, source: BoxedError, }, + + #[snafu(display( + "Failed to find logical regions in physical region {}", + physical_region_id + ))] + FindLogicalRegions { + physical_region_id: RegionId, + source: metric_engine::error::Error, + location: Location, + }, } pub type Result = std::result::Result; @@ -340,6 +350,8 @@ impl ErrorExt for Error { } HandleRegionRequest { source, .. } => source.status_code(), StopRegionEngine { source, .. } => source.status_code(), + + FindLogicalRegions { source, .. } => source.status_code(), } } diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 26a14c997d57..b7c01517e547 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -43,6 +43,7 @@ use datafusion_common::DataFusionError; use datafusion_expr::{Expr as DfExpr, TableProviderFilterPushDown, TableType}; use datatypes::arrow::datatypes::SchemaRef; use futures_util::future::try_join_all; +use metric_engine::engine::MetricEngine; use prost::Message; use query::QueryEngineRef; use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult}; @@ -51,6 +52,7 @@ use servers::grpc::region_server::RegionServerHandler; use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; +use store_api::metric_engine_consts::METRIC_ENGINE_NAME; use store_api::region_engine::{RegionEngineRef, RegionRole, SetReadonlyResponse}; use store_api::region_request::{AffectedRows, RegionCloseRequest, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; @@ -60,8 +62,9 @@ use tonic::{Request, Response, Result as TonicResult}; use crate::error::{ self, BuildRegionRequestsSnafu, DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu, - GetRegionMetadataSnafu, HandleRegionRequestSnafu, RegionEngineNotFoundSnafu, - RegionNotFoundSnafu, Result, StopRegionEngineSnafu, UnsupportedOutputSnafu, + FindLogicalRegionsSnafu, GetRegionMetadataSnafu, HandleRegionRequestSnafu, + RegionEngineNotFoundSnafu, RegionNotFoundSnafu, Result, StopRegionEngineSnafu, UnexpectedSnafu, + UnsupportedOutputSnafu, }; use crate::event_listener::RegionServerEventListenerRef; @@ -460,7 +463,8 @@ impl RegionServerInner { { Ok(result) => { // Sets corresponding region status to ready. - self.set_region_status_ready(region_id, engine, region_change); + self.set_region_status_ready(region_id, engine, region_change) + .await?; Ok(result) } Err(err) => { @@ -505,16 +509,20 @@ impl RegionServerInner { } } - fn set_region_status_ready( + async fn set_region_status_ready( &self, region_id: RegionId, engine: RegionEngineRef, region_change: RegionChange, - ) { + ) -> Result<()> { let engine_type = engine.name(); match region_change { RegionChange::None => {} RegionChange::Register(_) => { + if engine_type == METRIC_ENGINE_NAME { + self.register_logical_regions(&engine, region_id).await?; + } + info!("Region {region_id} is registered to engine {engine_type}"); self.region_map .insert(region_id, RegionEngineWithStatus::Ready(engine)); @@ -528,6 +536,37 @@ impl RegionServerInner { self.event_listener.on_region_deregistered(region_id); } } + Ok(()) + } + + async fn register_logical_regions( + &self, + engine: &RegionEngineRef, + physical_region_id: RegionId, + ) -> Result<()> { + let metric_engine = + engine + .as_any() + .downcast_ref::() + .context(UnexpectedSnafu { + violated: format!( + "expecting engine type '{}', actual '{}'", + METRIC_ENGINE_NAME, + engine.name(), + ), + })?; + + let logical_regions = metric_engine + .logical_regions(physical_region_id) + .await + .context(FindLogicalRegionsSnafu { physical_region_id })?; + + for region in logical_regions { + self.region_map + .insert(region, RegionEngineWithStatus::Ready(engine.clone())); + info!("Logical region {} is registered!", region); + } + Ok(()) } pub async fn handle_read(&self, request: QueryRequest) -> Result { diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 80351f55718b..e04dd32907ee 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -207,4 +207,8 @@ impl RegionEngine for MockRegionEngine { } Some(RegionRole::Leader) } + + fn as_any(&self) -> &dyn Any { + self + } } diff --git a/src/file-engine/src/engine.rs b/src/file-engine/src/engine.rs index c8b9f82992c2..f5a7d0e259f1 100644 --- a/src/file-engine/src/engine.rs +++ b/src/file-engine/src/engine.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::any::Any; use std::collections::HashMap; use std::sync::{Arc, RwLock}; @@ -119,6 +120,10 @@ impl RegionEngine for FileRegionEngine { fn role(&self, region_id: RegionId) -> Option { self.inner.state(region_id) } + + fn as_any(&self) -> &dyn Any { + self + } } struct EngineInner { diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 3e4f30adcfcb..83d9444dd950 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -21,6 +21,7 @@ mod read; mod region_metadata; mod state; +use std::any::Any; use std::sync::Arc; use async_trait::async_trait; @@ -38,6 +39,7 @@ use tokio::sync::RwLock; use self::state::MetricEngineState; use crate::data_region::DataRegion; +use crate::error::Result; use crate::metadata_region::MetadataRegion; use crate::utils; @@ -193,6 +195,10 @@ impl RegionEngine for MetricEngine { fn role(&self, region_id: RegionId) -> Option { todo!() } + + fn as_any(&self) -> &dyn Any { + self + } } impl MetricEngine { @@ -208,6 +214,13 @@ impl MetricEngine { }), } } + + pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result> { + self.inner + .metadata_region + .logical_regions(physical_region_id) + .await + } } struct MetricEngineInner { diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 2f758ee2233a..2a718d88eaff 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -45,6 +45,7 @@ mod set_readonly_test; #[cfg(test)] mod truncate_test; +use std::any::Any; use std::sync::Arc; use async_trait::async_trait; @@ -303,6 +304,10 @@ impl RegionEngine for MitoEngine { fn role(&self, region_id: RegionId) -> Option { self.inner.role(region_id) } + + fn as_any(&self) -> &dyn Any { + self + } } // Tests methods. diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index df73efd6ad74..3b9052a16a9d 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -14,6 +14,7 @@ //! Region Engine's definition +use std::any::Any; use std::fmt::Display; use std::sync::Arc; @@ -165,6 +166,8 @@ pub trait RegionEngine: Send + Sync { /// /// Returns the `None` if the region is not found. fn role(&self, region_id: RegionId) -> Option; + + fn as_any(&self) -> &dyn Any; } pub type RegionEngineRef = Arc;