Skip to content

Commit

Permalink
feat: register logical regions
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelScofield committed Jan 3, 2024
1 parent 7f16931 commit 2d55f18
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 5 deletions.
12 changes: 12 additions & 0 deletions src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,16 @@ pub enum Error {
location: Location,
source: BoxedError,
},

#[snafu(display(
"Failed to find logical regions in physical region {}",
physical_region_id
))]
FindLogicalRegions {
physical_region_id: RegionId,
source: metric_engine::error::Error,
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -340,6 +350,8 @@ impl ErrorExt for Error {
}
HandleRegionRequest { source, .. } => source.status_code(),
StopRegionEngine { source, .. } => source.status_code(),

FindLogicalRegions { source, .. } => source.status_code(),
}
}

Expand Down
49 changes: 44 additions & 5 deletions src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use datafusion_common::DataFusionError;
use datafusion_expr::{Expr as DfExpr, TableProviderFilterPushDown, TableType};
use datatypes::arrow::datatypes::SchemaRef;
use futures_util::future::try_join_all;
use metric_engine::engine::MetricEngine;
use prost::Message;
use query::QueryEngineRef;
use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult};
Expand All @@ -51,6 +52,7 @@ use servers::grpc::region_server::RegionServerHandler;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::region_engine::{RegionEngineRef, RegionRole, SetReadonlyResponse};
use store_api::region_request::{AffectedRows, RegionCloseRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
Expand All @@ -60,8 +62,9 @@ use tonic::{Request, Response, Result as TonicResult};

use crate::error::{
self, BuildRegionRequestsSnafu, DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu,
GetRegionMetadataSnafu, HandleRegionRequestSnafu, RegionEngineNotFoundSnafu,
RegionNotFoundSnafu, Result, StopRegionEngineSnafu, UnsupportedOutputSnafu,
FindLogicalRegionsSnafu, GetRegionMetadataSnafu, HandleRegionRequestSnafu,
RegionEngineNotFoundSnafu, RegionNotFoundSnafu, Result, StopRegionEngineSnafu, UnexpectedSnafu,
UnsupportedOutputSnafu,
};
use crate::event_listener::RegionServerEventListenerRef;

Expand Down Expand Up @@ -460,7 +463,8 @@ impl RegionServerInner {
{
Ok(result) => {
// Sets corresponding region status to ready.
self.set_region_status_ready(region_id, engine, region_change);
self.set_region_status_ready(region_id, engine, region_change)
.await?;
Ok(result)
}
Err(err) => {
Expand Down Expand Up @@ -505,16 +509,20 @@ impl RegionServerInner {
}
}

fn set_region_status_ready(
async fn set_region_status_ready(
&self,
region_id: RegionId,
engine: RegionEngineRef,
region_change: RegionChange,
) {
) -> Result<()> {
let engine_type = engine.name();
match region_change {
RegionChange::None => {}
RegionChange::Register(_) => {
if engine_type == METRIC_ENGINE_NAME {
self.register_logical_regions(&engine, region_id).await?;
}

info!("Region {region_id} is registered to engine {engine_type}");
self.region_map
.insert(region_id, RegionEngineWithStatus::Ready(engine));
Expand All @@ -528,6 +536,37 @@ impl RegionServerInner {
self.event_listener.on_region_deregistered(region_id);
}
}
Ok(())
}

async fn register_logical_regions(
&self,
engine: &RegionEngineRef,
physical_region_id: RegionId,
) -> Result<()> {
let metric_engine =
engine
.as_any()
.downcast_ref::<MetricEngine>()
.context(UnexpectedSnafu {
violated: format!(
"expecting engine type '{}', actual '{}'",
METRIC_ENGINE_NAME,
engine.name(),
),
})?;

let logical_regions = metric_engine
.logical_regions(physical_region_id)
.await
.context(FindLogicalRegionsSnafu { physical_region_id })?;

for region in logical_regions {
self.region_map
.insert(region, RegionEngineWithStatus::Ready(engine.clone()));
info!("Logical region {} is registered!", region);
}
Ok(())
}

pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
Expand Down
4 changes: 4 additions & 0 deletions src/datanode/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,4 +207,8 @@ impl RegionEngine for MockRegionEngine {
}
Some(RegionRole::Leader)
}

fn as_any(&self) -> &dyn Any {
self
}
}
5 changes: 5 additions & 0 deletions src/file-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};

Expand Down Expand Up @@ -119,6 +120,10 @@ impl RegionEngine for FileRegionEngine {
fn role(&self, region_id: RegionId) -> Option<RegionRole> {
self.inner.state(region_id)
}

fn as_any(&self) -> &dyn Any {
self
}
}

struct EngineInner {
Expand Down
13 changes: 13 additions & 0 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod read;
mod region_metadata;
mod state;

use std::any::Any;
use std::sync::Arc;

use async_trait::async_trait;
Expand All @@ -38,6 +39,7 @@ use tokio::sync::RwLock;

use self::state::MetricEngineState;
use crate::data_region::DataRegion;
use crate::error::Result;
use crate::metadata_region::MetadataRegion;
use crate::utils;

Expand Down Expand Up @@ -193,6 +195,10 @@ impl RegionEngine for MetricEngine {
fn role(&self, region_id: RegionId) -> Option<RegionRole> {
todo!()
}

fn as_any(&self) -> &dyn Any {
self
}
}

impl MetricEngine {
Expand All @@ -208,6 +214,13 @@ impl MetricEngine {
}),
}
}

pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result<Vec<RegionId>> {
self.inner
.metadata_region
.logical_regions(physical_region_id)
.await
}
}

struct MetricEngineInner {
Expand Down
5 changes: 5 additions & 0 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ mod set_readonly_test;
#[cfg(test)]
mod truncate_test;

use std::any::Any;
use std::sync::Arc;

use async_trait::async_trait;
Expand Down Expand Up @@ -303,6 +304,10 @@ impl RegionEngine for MitoEngine {
fn role(&self, region_id: RegionId) -> Option<RegionRole> {
self.inner.role(region_id)
}

fn as_any(&self) -> &dyn Any {
self
}
}

// Tests methods.
Expand Down
3 changes: 3 additions & 0 deletions src/store-api/src/region_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

//! Region Engine's definition
use std::any::Any;
use std::fmt::Display;
use std::sync::Arc;

Expand Down Expand Up @@ -165,6 +166,8 @@ pub trait RegionEngine: Send + Sync {
///
/// Returns the `None` if the region is not found.
fn role(&self, region_id: RegionId) -> Option<RegionRole>;

fn as_any(&self) -> &dyn Any;
}

pub type RegionEngineRef = Arc<dyn RegionEngine>;

0 comments on commit 2d55f18

Please sign in to comment.