diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index fe9791ee60b4..927510408703 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -371,7 +371,7 @@ impl Default for DatanodeOptions { meta_client_options: None, wal: WalConfig::default(), storage: StorageConfig::default(), - region_engine: vec![], + region_engine: vec![RegionEngineConfig::Mito(MitoConfig::default())], logging: LoggingOptions::default(), heartbeat: HeartbeatOptions::default(), enable_telemetry: true, @@ -497,6 +497,7 @@ impl Datanode { .map_err(BoxedError::new) .context(ShutdownInstanceSnafu)?; } + self.region_server.stop().await?; Ok(()) } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index fc2d3c447f29..d4b6865d923c 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -560,6 +560,18 @@ pub enum Error { location: Location, source: store_api::metadata::MetadataError, }, + + #[snafu(display( + "Failed to stop region engine {}, location:{}, source: {}", + name, + location, + source + ))] + StopRegionEngine { + name: String, + location: Location, + source: BoxedError, + }, } pub type Result = std::result::Result; @@ -670,6 +682,7 @@ impl ErrorExt for Error { source.status_code() } HandleRegionRequest { source, .. } => source.status_code(), + StopRegionEngine { source, .. } => source.status_code(), } } diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 0aea68307d7b..969d49b088b1 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -57,7 +57,7 @@ use tonic::{Request, Response, Result as TonicResult}; use crate::error::{ BuildRegionRequestsSnafu, DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu, GetRegionMetadataSnafu, HandleRegionRequestSnafu, RegionEngineNotFoundSnafu, - RegionNotFoundSnafu, Result, UnsupportedOutputSnafu, + RegionNotFoundSnafu, Result, StopRegionEngineSnafu, UnsupportedOutputSnafu, }; #[derive(Clone)] @@ -95,6 +95,11 @@ impl RegionServer { pub fn runtime(&self) -> Arc { self.inner.runtime.clone() } + + /// Stop the region server. + pub async fn stop(&self) -> Result<()> { + self.inner.stop().await + } } #[async_trait] @@ -272,6 +277,24 @@ impl RegionServerInner { Output::Stream(stream) => Ok(stream), } } + + async fn stop(&self) -> Result<()> { + let region_ids = self.region_map.iter().map(|x| *x.key()).collect::>(); + info!("Stopping region server with regions: {:?}", region_ids); + self.region_map.clear(); + + let engines = self.engines.write().unwrap().drain().collect::>(); + // The underlying log store will automatically stop when all regions have been removed. + for (engine_name, engine) in engines { + engine + .stop() + .await + .context(StopRegionEngineSnafu { name: &engine_name })?; + info!("Region engine {engine_name} is stopped"); + } + + Ok(()) + } } enum RegionChange { diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index aed6f399a524..5c52466f4114 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -190,6 +190,10 @@ impl RegionEngine for MitoEngine { ) -> std::result::Result { self.inner.get_metadata(region_id).map_err(BoxedError::new) } + + async fn stop(&self) -> std::result::Result<(), BoxedError> { + self.stop().await.map_err(BoxedError::new) + } } // Tests methods. diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 4f2b54c0398b..c23282a13024 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -48,6 +48,9 @@ pub trait RegionEngine: Send + Sync { /// Retrieve region's metadata. async fn get_metadata(&self, region_id: RegionId) -> Result; + + /// Stop the engine + async fn stop(&self) -> Result<(), BoxedError>; } pub type RegionEngineRef = Arc;