Skip to content

Commit

Permalink
feat: allow initializing regions in background (#2930)
Browse files Browse the repository at this point in the history
* refactor: open regions in background

* feat: add status code of RegionNotReady

* feat: use RegionNotReady instead of RegionNotFound for a registering region

* chore: apply suggestions from CR

* feat: add status code of RegionBusy

* feat: return RegionBusy for mutually exclusive operations
  • Loading branch information
WenyXu authored Dec 20, 2023
1 parent 9da1f23 commit 6c1c7d8
Show file tree
Hide file tree
Showing 9 changed files with 696 additions and 88 deletions.
4 changes: 4 additions & 0 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ rpc_runtime_size = 8
# It will block the datanode start if it can't receive leases in the heartbeat from metasrv.
require_lease_before_startup = false

# Initialize all regions in the background during the startup.
# By default, it provides services after all regions have been initialized.
initialize_region_in_background = false

[heartbeat]
# Interval for sending heartbeat messages to the Metasrv, 3 seconds by default.
interval = "3s"
Expand Down
12 changes: 11 additions & 1 deletion src/common/error/src/status_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ pub enum StatusCode {
RegionNotFound = 4005,
RegionAlreadyExists = 4006,
RegionReadonly = 4007,
RegionNotReady = 4008,
// If mutually exclusive operations are reached at the same time,
// only one can be executed, another one will get region busy.
RegionBusy = 4009,
// ====== End of catalog related status code =======

// ====== Begin of storage related status code =====
Expand Down Expand Up @@ -103,7 +107,9 @@ impl StatusCode {
match self {
StatusCode::StorageUnavailable
| StatusCode::RuntimeResourcesExhausted
| StatusCode::Internal => true,
| StatusCode::Internal
| StatusCode::RegionNotReady
| StatusCode::RegionBusy => true,

StatusCode::Success
| StatusCode::Unknown
Expand Down Expand Up @@ -152,6 +158,8 @@ impl StatusCode {
| StatusCode::TableAlreadyExists
| StatusCode::TableNotFound
| StatusCode::RegionNotFound
| StatusCode::RegionNotReady
| StatusCode::RegionBusy
| StatusCode::RegionAlreadyExists
| StatusCode::RegionReadonly
| StatusCode::TableColumnNotFound
Expand Down Expand Up @@ -183,6 +191,8 @@ impl StatusCode {
v if v == StatusCode::TableAlreadyExists as u32 => Some(StatusCode::TableAlreadyExists),
v if v == StatusCode::TableNotFound as u32 => Some(StatusCode::TableNotFound),
v if v == StatusCode::RegionNotFound as u32 => Some(StatusCode::RegionNotFound),
v if v == StatusCode::RegionNotReady as u32 => Some(StatusCode::RegionNotReady),
v if v == StatusCode::RegionBusy as u32 => Some(StatusCode::RegionBusy),
v if v == StatusCode::RegionAlreadyExists as u32 => {
Some(StatusCode::RegionAlreadyExists)
}
Expand Down
2 changes: 2 additions & 0 deletions src/datanode/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ pub struct DatanodeOptions {
pub mode: Mode,
pub node_id: Option<u64>,
pub require_lease_before_startup: bool,
pub initialize_region_in_background: bool,
pub rpc_addr: String,
pub rpc_hostname: Option<String>,
pub rpc_runtime_size: usize,
Expand All @@ -249,6 +250,7 @@ impl Default for DatanodeOptions {
mode: Mode::Standalone,
node_id: None,
require_lease_before_startup: false,
initialize_region_in_background: false,
rpc_addr: "127.0.0.1:3001".to_string(),
rpc_hostname: None,
rpc_runtime_size: 8,
Expand Down
159 changes: 97 additions & 62 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ use common_config::wal::{KafkaConfig, RaftEngineConfig};
use common_config::{WalConfig, WAL_OPTIONS_KEY};
use common_error::ext::BoxedError;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_meta::key::datanode_table::DatanodeTableManager;
use common_meta::key::datanode_table::{DatanodeTableManager, DatanodeTableValue};
use common_meta::kv_backend::KvBackendRef;
pub use common_procedure::options::ProcedureConfig;
use common_runtime::Runtime;
use common_telemetry::{error, info, warn};
use file_engine::engine::FileRegionEngine;
use futures::future;
use futures_util::future::try_join_all;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use log_store::kafka::log_store::KafkaLogStore;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use meta_client::client::MetaClient;
Expand Down Expand Up @@ -213,6 +213,7 @@ impl DatanodeBuilder {

pub async fn build(mut self) -> Result<Datanode> {
let mode = &self.opts.mode;
let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;

let meta_client = self.meta_client.take();

Expand All @@ -233,8 +234,26 @@ impl DatanodeBuilder {

let region_server = self.new_region_server(region_event_listener).await?;

self.initialize_region_server(&region_server, kv_backend, !controlled_by_metasrv)
.await?;
let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone());
let table_values = datanode_table_manager
.tables(node_id)
.try_collect::<Vec<_>>()
.await
.context(GetMetadataSnafu)?;

let open_all_regions =
open_all_regions(region_server.clone(), table_values, !controlled_by_metasrv);

if self.opts.initialize_region_in_background {
// Opens regions in background.
common_runtime::spawn_bg(async move {
if let Err(err) = open_all_regions.await {
error!(err; "Failed to open regions during the startup.");
}
});
} else {
open_all_regions.await?;
}

let heartbeat_task = if let Some(meta_client) = meta_client {
Some(HeartbeatTask::try_new(&self.opts, region_server.clone(), meta_client).await?)
Expand Down Expand Up @@ -331,74 +350,24 @@ impl DatanodeBuilder {
Ok((server, addr))
}

#[cfg(test)]
/// Open all regions belong to this datanode.
async fn initialize_region_server(
&self,
region_server: &RegionServer,
kv_backend: KvBackendRef,
open_with_writable: bool,
) -> Result<()> {
let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone());
let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
let mut regions = vec![];
let mut table_values = datanode_table_manager.tables(node_id);

while let Some(table_value) = table_values.next().await {
let table_value = table_value.context(GetMetadataSnafu)?;
for region_number in table_value.regions {
// Augments region options with wal options if a wal options is provided.
let mut region_options = table_value.region_info.region_options.clone();
table_value
.region_info
.region_wal_options
.get(&region_number.to_string())
.and_then(|wal_options| {
region_options.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone())
});

regions.push((
RegionId::new(table_value.table_id, region_number),
table_value.region_info.engine.clone(),
table_value.region_info.region_storage_path.clone(),
region_options,
));
}
}
info!("going to open {} regions", regions.len());
let semaphore = Arc::new(tokio::sync::Semaphore::new(OPEN_REGION_PARALLELISM));
let mut tasks = vec![];

for (region_id, engine, store_path, options) in regions {
let region_dir = region_dir(&store_path, region_id);
let semaphore_moved = semaphore.clone();
tasks.push(async move {
let _permit = semaphore_moved.acquire().await;
region_server
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: engine.clone(),
region_dir,
options,
skip_wal_replay: false,
}),
)
.await?;
if open_with_writable {
if let Err(e) = region_server.set_writable(region_id, true) {
error!(
e; "failed to set writable for region {region_id}"
);
}
}
Ok(())
});
}
let _ = try_join_all(tasks).await?;

info!("region server is initialized");
let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone());
let table_values = datanode_table_manager
.tables(node_id)
.try_collect::<Vec<_>>()
.await
.context(GetMetadataSnafu)?;

Ok(())
open_all_regions(region_server.clone(), table_values, open_with_writable).await
}

async fn new_region_server(
Expand Down Expand Up @@ -544,6 +513,72 @@ impl DatanodeBuilder {
}
}

/// Open all regions belong to this datanode.
async fn open_all_regions(
region_server: RegionServer,
table_values: Vec<DatanodeTableValue>,
open_with_writable: bool,
) -> Result<()> {
let mut regions = vec![];
for table_value in table_values {
for region_number in table_value.regions {
// Augments region options with wal options if a wal options is provided.
let mut region_options = table_value.region_info.region_options.clone();
table_value
.region_info
.region_wal_options
.get(&region_number.to_string())
.and_then(|wal_options| {
region_options.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone())
});

regions.push((
RegionId::new(table_value.table_id, region_number),
table_value.region_info.engine.clone(),
table_value.region_info.region_storage_path.clone(),
region_options,
));
}
}
info!("going to open {} regions", regions.len());
let semaphore = Arc::new(tokio::sync::Semaphore::new(OPEN_REGION_PARALLELISM));
let mut tasks = vec![];

let region_server_ref = &region_server;
for (region_id, engine, store_path, options) in regions {
let region_dir = region_dir(&store_path, region_id);
let semaphore_moved = semaphore.clone();

tasks.push(async move {
let _permit = semaphore_moved.acquire().await;
region_server_ref
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: engine.clone(),
region_dir,
options,
skip_wal_replay: false,
}),
)
.await?;
if open_with_writable {
if let Err(e) = region_server_ref.set_writable(region_id, true) {
error!(
e; "failed to set writable for region {region_id}"
);
}
}
Ok(())
});
}
let _ = try_join_all(tasks).await?;

info!("all regions are opened");

Ok(())
}

#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
Expand Down
14 changes: 14 additions & 0 deletions src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,18 @@ pub enum Error {
location: Location,
},

#[snafu(display("Region {} not ready", region_id))]
RegionNotReady {
region_id: RegionId,
location: Location,
},

#[snafu(display("Region {} is busy", region_id))]
RegionBusy {
region_id: RegionId,
location: Location,
},

#[snafu(display("Region engine {} is not registered", name))]
RegionEngineNotFound { name: String, location: Location },

Expand Down Expand Up @@ -295,6 +307,8 @@ impl ErrorExt for Error {
| GetRegionMetadata { .. } => StatusCode::Internal,

RegionNotFound { .. } => StatusCode::RegionNotFound,
RegionNotReady { .. } => StatusCode::RegionNotReady,
RegionBusy { .. } => StatusCode::RegionBusy,

StartServer { source, .. } | ShutdownServer { source, .. } => source.status_code(),

Expand Down
Loading

0 comments on commit 6c1c7d8

Please sign in to comment.