diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 969d49b088b1..cb6b407dedb6 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -28,7 +28,7 @@ use common_query::physical_plan::DfPhysicalPlanAdapter; use common_query::{DfPhysicalPlan, Output}; use common_recordbatch::SendableRecordBatchStream; use common_runtime::Runtime; -use common_telemetry::info; +use common_telemetry::{info, warn}; use dashmap::DashMap; use datafusion::catalog::schema::SchemaProvider; use datafusion::catalog::{CatalogList, CatalogProvider}; @@ -48,7 +48,7 @@ use session::context::QueryContext; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::RegionEngineRef; -use store_api::region_request::RegionRequest; +use store_api::region_request::{RegionCloseRequest, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use table::table::scan::StreamScanAdapter; @@ -279,12 +279,19 @@ impl RegionServerInner { } async fn stop(&self) -> Result<()> { - let region_ids = self.region_map.iter().map(|x| *x.key()).collect::>(); - info!("Stopping region server with regions: {:?}", region_ids); - self.region_map.clear(); + for region in self.region_map.iter() { + let region_id = *region.key(); + let engine = region.value(); + let closed = engine + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) + .await; + match closed { + Ok(_) => info!("Region {region_id} is closed"), + Err(e) => warn!("Failed to close region {region_id}, err: {e}"), + } + } let engines = self.engines.write().unwrap().drain().collect::>(); - // The underlying log store will automatically stop when all regions have been removed. for (engine_name, engine) in engines { engine .stop() diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 5c52466f4114..5a6100f64d8b 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -73,14 +73,6 @@ impl MitoEngine { } } - /// Stop the engine. - /// - /// Stopping the engine doesn't stop the underlying log store as other components might - /// still use it. - pub async fn stop(&self) -> Result<()> { - self.inner.stop().await - } - /// Returns true if the specific region exists. pub fn is_region_exists(&self, region_id: RegionId) -> bool { self.inner.workers.is_region_exists(region_id) @@ -191,8 +183,13 @@ impl RegionEngine for MitoEngine { self.inner.get_metadata(region_id).map_err(BoxedError::new) } + /// Stop the engine. + /// + /// Stopping the engine doesn't stop the underlying log store as other components might + /// still use it. (When no other components are referencing the log store, it will + /// automatically shutdown.) async fn stop(&self) -> std::result::Result<(), BoxedError> { - self.stop().await.map_err(BoxedError::new) + self.inner.stop().await.map_err(BoxedError::new) } }