Skip to content

Commit

Permalink
feat: stop region server (#2356)
Browse files Browse the repository at this point in the history
* feat: stop region server

* fix: close region first
  • Loading branch information
fengjiachun authored and waynexia committed Sep 12, 2023
1 parent 4af126e commit 80c5d52
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 12 deletions.
3 changes: 2 additions & 1 deletion src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ impl Default for DatanodeOptions {
meta_client_options: None,
wal: WalConfig::default(),
storage: StorageConfig::default(),
region_engine: vec![],
region_engine: vec![RegionEngineConfig::Mito(MitoConfig::default())],
logging: LoggingOptions::default(),
heartbeat: HeartbeatOptions::default(),
enable_telemetry: true,
Expand Down Expand Up @@ -497,6 +497,7 @@ impl Datanode {
.map_err(BoxedError::new)
.context(ShutdownInstanceSnafu)?;
}
self.region_server.stop().await?;
Ok(())
}

Expand Down
13 changes: 13 additions & 0 deletions src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,18 @@ pub enum Error {
location: Location,
source: store_api::metadata::MetadataError,
},

#[snafu(display(
"Failed to stop region engine {}, location:{}, source: {}",
name,
location,
source
))]
StopRegionEngine {
name: String,
location: Location,
source: BoxedError,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -670,6 +682,7 @@ impl ErrorExt for Error {
source.status_code()
}
HandleRegionRequest { source, .. } => source.status_code(),
StopRegionEngine { source, .. } => source.status_code(),
}
}

Expand Down
37 changes: 34 additions & 3 deletions src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use common_query::physical_plan::DfPhysicalPlanAdapter;
use common_query::{DfPhysicalPlan, Output};
use common_recordbatch::SendableRecordBatchStream;
use common_runtime::Runtime;
use common_telemetry::info;
use common_telemetry::{info, warn};
use dashmap::DashMap;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::{CatalogList, CatalogProvider};
Expand All @@ -48,7 +48,7 @@ use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::RegionEngineRef;
use store_api::region_request::RegionRequest;
use store_api::region_request::{RegionCloseRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::table::scan::StreamScanAdapter;
Expand All @@ -57,7 +57,7 @@ use tonic::{Request, Response, Result as TonicResult};
use crate::error::{
BuildRegionRequestsSnafu, DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu,
GetRegionMetadataSnafu, HandleRegionRequestSnafu, RegionEngineNotFoundSnafu,
RegionNotFoundSnafu, Result, UnsupportedOutputSnafu,
RegionNotFoundSnafu, Result, StopRegionEngineSnafu, UnsupportedOutputSnafu,
};

#[derive(Clone)]
Expand Down Expand Up @@ -95,6 +95,11 @@ impl RegionServer {
pub fn runtime(&self) -> Arc<Runtime> {
self.inner.runtime.clone()
}

/// Stop the region server.
pub async fn stop(&self) -> Result<()> {
self.inner.stop().await
}
}

#[async_trait]
Expand Down Expand Up @@ -272,6 +277,32 @@ impl RegionServerInner {
Output::Stream(stream) => Ok(stream),
}
}

async fn stop(&self) -> Result<()> {
for region in self.region_map.iter() {
let region_id = *region.key();
let engine = region.value();
let closed = engine
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
.await;
match closed {
Ok(_) => info!("Region {region_id} is closed"),
Err(e) => warn!("Failed to close region {region_id}, err: {e}"),
}
}
self.region_map.clear();

let engines = self.engines.write().unwrap().drain().collect::<Vec<_>>();
for (engine_name, engine) in engines {
engine
.stop()
.await
.context(StopRegionEngineSnafu { name: &engine_name })?;
info!("Region engine {engine_name} is stopped");
}

Ok(())
}
}

enum RegionChange {
Expand Down
17 changes: 9 additions & 8 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,6 @@ impl MitoEngine {
}
}

/// Stop the engine.
///
/// Stopping the engine doesn't stop the underlying log store as other components might
/// still use it.
pub async fn stop(&self) -> Result<()> {
self.inner.stop().await
}

/// Returns true if the specific region exists.
pub fn is_region_exists(&self, region_id: RegionId) -> bool {
self.inner.workers.is_region_exists(region_id)
Expand Down Expand Up @@ -190,6 +182,15 @@ impl RegionEngine for MitoEngine {
) -> std::result::Result<RegionMetadataRef, BoxedError> {
self.inner.get_metadata(region_id).map_err(BoxedError::new)
}

/// Stop the engine.
///
/// Stopping the engine doesn't stop the underlying log store as other components might
/// still use it. (When no other components are referencing the log store, it will
/// automatically shutdown.)
async fn stop(&self) -> std::result::Result<(), BoxedError> {
self.inner.stop().await.map_err(BoxedError::new)
}
}

// Tests methods.
Expand Down
3 changes: 3 additions & 0 deletions src/store-api/src/region_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ pub trait RegionEngine: Send + Sync {

/// Retrieve region's metadata.
async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError>;

/// Stop the engine
async fn stop(&self) -> Result<(), BoxedError>;
}

pub type RegionEngineRef = Arc<dyn RegionEngine>;

0 comments on commit 80c5d52

Please sign in to comment.