Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: stop region server #2356

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ impl Default for DatanodeOptions {
meta_client_options: None,
wal: WalConfig::default(),
storage: StorageConfig::default(),
region_engine: vec![],
region_engine: vec![RegionEngineConfig::Mito(MitoConfig::default())],
logging: LoggingOptions::default(),
heartbeat: HeartbeatOptions::default(),
enable_telemetry: true,
Expand Down Expand Up @@ -497,6 +497,7 @@ impl Datanode {
.map_err(BoxedError::new)
.context(ShutdownInstanceSnafu)?;
}
self.region_server.stop().await?;
Ok(())
}

Expand Down
13 changes: 13 additions & 0 deletions src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,18 @@ pub enum Error {
location: Location,
source: store_api::metadata::MetadataError,
},

#[snafu(display(
"Failed to stop region engine {}, location:{}, source: {}",
name,
location,
source
))]
StopRegionEngine {
name: String,
location: Location,
source: BoxedError,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -670,6 +682,7 @@ impl ErrorExt for Error {
source.status_code()
}
HandleRegionRequest { source, .. } => source.status_code(),
StopRegionEngine { source, .. } => source.status_code(),
}
}

Expand Down
37 changes: 34 additions & 3 deletions src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use common_query::physical_plan::DfPhysicalPlanAdapter;
use common_query::{DfPhysicalPlan, Output};
use common_recordbatch::SendableRecordBatchStream;
use common_runtime::Runtime;
use common_telemetry::info;
use common_telemetry::{info, warn};
use dashmap::DashMap;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::{CatalogList, CatalogProvider};
Expand All @@ -48,7 +48,7 @@ use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::RegionEngineRef;
use store_api::region_request::RegionRequest;
use store_api::region_request::{RegionCloseRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::table::scan::StreamScanAdapter;
Expand All @@ -57,7 +57,7 @@ use tonic::{Request, Response, Result as TonicResult};
use crate::error::{
BuildRegionRequestsSnafu, DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu,
GetRegionMetadataSnafu, HandleRegionRequestSnafu, RegionEngineNotFoundSnafu,
RegionNotFoundSnafu, Result, UnsupportedOutputSnafu,
RegionNotFoundSnafu, Result, StopRegionEngineSnafu, UnsupportedOutputSnafu,
};

#[derive(Clone)]
Expand Down Expand Up @@ -95,6 +95,11 @@ impl RegionServer {
pub fn runtime(&self) -> Arc<Runtime> {
self.inner.runtime.clone()
}

/// Stop the region server.
pub async fn stop(&self) -> Result<()> {
self.inner.stop().await
}
}

#[async_trait]
Expand Down Expand Up @@ -272,6 +277,32 @@ impl RegionServerInner {
Output::Stream(stream) => Ok(stream),
}
}

async fn stop(&self) -> Result<()> {
for region in self.region_map.iter() {
let region_id = *region.key();
let engine = region.value();
let closed = engine
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
.await;
match closed {
Ok(_) => info!("Region {region_id} is closed"),
Err(e) => warn!("Failed to close region {region_id}, err: {e}"),
}
}
self.region_map.clear();

let engines = self.engines.write().unwrap().drain().collect::<Vec<_>>();
for (engine_name, engine) in engines {
engine
.stop()
.await
.context(StopRegionEngineSnafu { name: &engine_name })?;
info!("Region engine {engine_name} is stopped");
}

Ok(())
}
}

enum RegionChange {
Expand Down
17 changes: 9 additions & 8 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,6 @@ impl MitoEngine {
}
}

/// Stop the engine.
///
/// Stopping the engine doesn't stop the underlying log store as other components might
/// still use it.
pub async fn stop(&self) -> Result<()> {
self.inner.stop().await
}

/// Returns true if the specific region exists.
pub fn is_region_exists(&self, region_id: RegionId) -> bool {
self.inner.workers.is_region_exists(region_id)
Expand Down Expand Up @@ -190,6 +182,15 @@ impl RegionEngine for MitoEngine {
) -> std::result::Result<RegionMetadataRef, BoxedError> {
self.inner.get_metadata(region_id).map_err(BoxedError::new)
}

/// Stop the engine.
///
/// Stopping the engine doesn't stop the underlying log store as other components might
/// still use it. (When no other components are referencing the log store, it will
/// automatically shutdown.)
async fn stop(&self) -> std::result::Result<(), BoxedError> {
fengjiachun marked this conversation as resolved.
Show resolved Hide resolved
self.inner.stop().await.map_err(BoxedError::new)
}
}

// Tests methods.
Expand Down
3 changes: 3 additions & 0 deletions src/store-api/src/region_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ pub trait RegionEngine: Send + Sync {

/// Retrieve region's metadata.
async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError>;

/// Stop the engine
async fn stop(&self) -> Result<(), BoxedError>;
}

pub type RegionEngineRef = Arc<dyn RegionEngine>;
Loading