Skip to content

Commit

Permalink
feat: stop region server
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed Sep 12, 2023
1 parent 5f7d118 commit 77efaa2
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 2 deletions.
3 changes: 2 additions & 1 deletion src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ impl Default for DatanodeOptions {
meta_client_options: None,
wal: WalConfig::default(),
storage: StorageConfig::default(),
region_engine: vec![],
region_engine: vec![RegionEngineConfig::Mito(MitoConfig::default())],
logging: LoggingOptions::default(),
heartbeat: HeartbeatOptions::default(),
enable_telemetry: true,
Expand Down Expand Up @@ -497,6 +497,7 @@ impl Datanode {
.map_err(BoxedError::new)
.context(ShutdownInstanceSnafu)?;
}
self.region_server.stop().await?;
Ok(())
}

Expand Down
13 changes: 13 additions & 0 deletions src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,18 @@ pub enum Error {
location: Location,
source: store_api::metadata::MetadataError,
},

#[snafu(display(
"Failed to stop region engine {}, location:{}, source: {}",
name,
location,
source
))]
StopRegionEngine {
name: String,
location: Location,
source: BoxedError,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -670,6 +682,7 @@ impl ErrorExt for Error {
source.status_code()
}
HandleRegionRequest { source, .. } => source.status_code(),
StopRegionEngine { source, .. } => source.status_code(),
}
}

Expand Down
25 changes: 24 additions & 1 deletion src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use tonic::{Request, Response, Result as TonicResult};
use crate::error::{
BuildRegionRequestsSnafu, DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu,
GetRegionMetadataSnafu, HandleRegionRequestSnafu, RegionEngineNotFoundSnafu,
RegionNotFoundSnafu, Result, UnsupportedOutputSnafu,
RegionNotFoundSnafu, Result, StopRegionEngineSnafu, UnsupportedOutputSnafu,
};

#[derive(Clone)]
Expand Down Expand Up @@ -95,6 +95,11 @@ impl RegionServer {
pub fn runtime(&self) -> Arc<Runtime> {
self.inner.runtime.clone()
}

/// Stop the region server.
pub async fn stop(&self) -> Result<()> {
self.inner.stop().await
}
}

#[async_trait]
Expand Down Expand Up @@ -272,6 +277,24 @@ impl RegionServerInner {
Output::Stream(stream) => Ok(stream),
}
}

async fn stop(&self) -> Result<()> {
let region_ids = self.region_map.iter().map(|x| *x.key()).collect::<Vec<_>>();
info!("Stopping region server with regions: {:?}", region_ids);
self.region_map.clear();

let engines = self.engines.write().unwrap().drain().collect::<Vec<_>>();
// The underlying log store will automatically stop when all regions have been removed.
for (engine_name, engine) in engines {
engine
.stop()
.await
.context(StopRegionEngineSnafu { name: &engine_name })?;
info!("Region engine {engine_name} is stopped");
}

Ok(())
}
}

enum RegionChange {
Expand Down
4 changes: 4 additions & 0 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ impl RegionEngine for MitoEngine {
) -> std::result::Result<RegionMetadataRef, BoxedError> {
self.inner.get_metadata(region_id).map_err(BoxedError::new)
}

async fn stop(&self) -> std::result::Result<(), BoxedError> {
self.stop().await.map_err(BoxedError::new)
}
}

// Tests methods.
Expand Down
3 changes: 3 additions & 0 deletions src/store-api/src/region_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ pub trait RegionEngine: Send + Sync {

/// Retrieve region's metadata.
async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError>;

/// Stop the engine
async fn stop(&self) -> Result<(), BoxedError>;
}

pub type RegionEngineRef = Arc<dyn RegionEngine>;

0 comments on commit 77efaa2

Please sign in to comment.