Skip to content

Commit

Permalink
fix: close region first
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed Sep 12, 2023
1 parent 77efaa2 commit 0fc442b
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 14 deletions.
18 changes: 13 additions & 5 deletions src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use common_query::physical_plan::DfPhysicalPlanAdapter;
use common_query::{DfPhysicalPlan, Output};
use common_recordbatch::SendableRecordBatchStream;
use common_runtime::Runtime;
use common_telemetry::info;
use common_telemetry::{info, warn};
use dashmap::DashMap;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::{CatalogList, CatalogProvider};
Expand All @@ -48,7 +48,7 @@ use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::RegionEngineRef;
use store_api::region_request::RegionRequest;
use store_api::region_request::{RegionCloseRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::table::scan::StreamScanAdapter;
Expand Down Expand Up @@ -279,12 +279,20 @@ impl RegionServerInner {
}

async fn stop(&self) -> Result<()> {
let region_ids = self.region_map.iter().map(|x| *x.key()).collect::<Vec<_>>();
info!("Stopping region server with regions: {:?}", region_ids);
for region in self.region_map.iter() {
let region_id = *region.key();
let engine = region.value();
let closed = engine
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
.await;
match closed {
Ok(_) => info!("Region {region_id} is closed"),
Err(e) => warn!("Failed to close region {region_id}, err: {e}"),
}
}
self.region_map.clear();

let engines = self.engines.write().unwrap().drain().collect::<Vec<_>>();
// The underlying log store will automatically stop when all regions have been removed.
for (engine_name, engine) in engines {
engine
.stop()
Expand Down
15 changes: 6 additions & 9 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,6 @@ impl MitoEngine {
}
}

/// Stop the engine.
///
/// Stopping the engine doesn't stop the underlying log store as other components might
/// still use it.
pub async fn stop(&self) -> Result<()> {
self.inner.stop().await
}

/// Returns true if the specific region exists.
pub fn is_region_exists(&self, region_id: RegionId) -> bool {
self.inner.workers.is_region_exists(region_id)
Expand Down Expand Up @@ -191,8 +183,13 @@ impl RegionEngine for MitoEngine {
self.inner.get_metadata(region_id).map_err(BoxedError::new)
}

/// Stop the engine.
///
/// Stopping the engine doesn't stop the underlying log store as other components might
/// still use it. (When no other components are referencing the log store, it will
/// automatically shutdown.)
async fn stop(&self) -> std::result::Result<(), BoxedError> {
self.stop().await.map_err(BoxedError::new)
self.inner.stop().await.map_err(BoxedError::new)
}
}

Expand Down

0 comments on commit 0fc442b

Please sign in to comment.