diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index fe9791ee60b4..927510408703 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -371,7 +371,7 @@ impl Default for DatanodeOptions { meta_client_options: None, wal: WalConfig::default(), storage: StorageConfig::default(), - region_engine: vec![], + region_engine: vec![RegionEngineConfig::Mito(MitoConfig::default())], logging: LoggingOptions::default(), heartbeat: HeartbeatOptions::default(), enable_telemetry: true, @@ -497,6 +497,7 @@ impl Datanode { .map_err(BoxedError::new) .context(ShutdownInstanceSnafu)?; } + self.region_server.stop().await?; Ok(()) } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index fc2d3c447f29..d4b6865d923c 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -560,6 +560,18 @@ pub enum Error { location: Location, source: store_api::metadata::MetadataError, }, + + #[snafu(display( + "Failed to stop region engine {}, location:{}, source: {}", + name, + location, + source + ))] + StopRegionEngine { + name: String, + location: Location, + source: BoxedError, + }, } pub type Result = std::result::Result; @@ -670,6 +682,7 @@ impl ErrorExt for Error { source.status_code() } HandleRegionRequest { source, .. } => source.status_code(), + StopRegionEngine { source, .. } => source.status_code(), } } diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 0aea68307d7b..7580a8307ffc 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -28,7 +28,7 @@ use common_query::physical_plan::DfPhysicalPlanAdapter; use common_query::{DfPhysicalPlan, Output}; use common_recordbatch::SendableRecordBatchStream; use common_runtime::Runtime; -use common_telemetry::info; +use common_telemetry::{info, warn}; use dashmap::DashMap; use datafusion::catalog::schema::SchemaProvider; use datafusion::catalog::{CatalogList, CatalogProvider}; @@ -48,7 +48,7 @@ use session::context::QueryContext; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::RegionEngineRef; -use store_api::region_request::RegionRequest; +use store_api::region_request::{RegionCloseRequest, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use table::table::scan::StreamScanAdapter; @@ -57,7 +57,7 @@ use tonic::{Request, Response, Result as TonicResult}; use crate::error::{ BuildRegionRequestsSnafu, DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu, GetRegionMetadataSnafu, HandleRegionRequestSnafu, RegionEngineNotFoundSnafu, - RegionNotFoundSnafu, Result, UnsupportedOutputSnafu, + RegionNotFoundSnafu, Result, StopRegionEngineSnafu, UnsupportedOutputSnafu, }; #[derive(Clone)] @@ -95,6 +95,11 @@ impl RegionServer { pub fn runtime(&self) -> Arc { self.inner.runtime.clone() } + + /// Stop the region server. + pub async fn stop(&self) -> Result<()> { + self.inner.stop().await + } } #[async_trait] @@ -272,6 +277,32 @@ impl RegionServerInner { Output::Stream(stream) => Ok(stream), } } + + async fn stop(&self) -> Result<()> { + for region in self.region_map.iter() { + let region_id = *region.key(); + let engine = region.value(); + let closed = engine + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) + .await; + match closed { + Ok(_) => info!("Region {region_id} is closed"), + Err(e) => warn!("Failed to close region {region_id}, err: {e}"), + } + } + self.region_map.clear(); + + let engines = self.engines.write().unwrap().drain().collect::>(); + for (engine_name, engine) in engines { + engine + .stop() + .await + .context(StopRegionEngineSnafu { name: &engine_name })?; + info!("Region engine {engine_name} is stopped"); + } + + Ok(()) + } } enum RegionChange { diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index aed6f399a524..5a6100f64d8b 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -73,14 +73,6 @@ impl MitoEngine { } } - /// Stop the engine. - /// - /// Stopping the engine doesn't stop the underlying log store as other components might - /// still use it. - pub async fn stop(&self) -> Result<()> { - self.inner.stop().await - } - /// Returns true if the specific region exists. pub fn is_region_exists(&self, region_id: RegionId) -> bool { self.inner.workers.is_region_exists(region_id) @@ -190,6 +182,15 @@ impl RegionEngine for MitoEngine { ) -> std::result::Result { self.inner.get_metadata(region_id).map_err(BoxedError::new) } + + /// Stop the engine. + /// + /// Stopping the engine doesn't stop the underlying log store as other components might + /// still use it. (When no other components are referencing the log store, it will + /// automatically shutdown.) + async fn stop(&self) -> std::result::Result<(), BoxedError> { + self.inner.stop().await.map_err(BoxedError::new) + } } // Tests methods. diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 4f2b54c0398b..c23282a13024 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -48,6 +48,9 @@ pub trait RegionEngine: Send + Sync { /// Retrieve region's metadata. async fn get_metadata(&self, region_id: RegionId) -> Result; + + /// Stop the engine + async fn stop(&self) -> Result<(), BoxedError>; } pub type RegionEngineRef = Arc;