Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow initializing regions in background #2930

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ rpc_runtime_size = 8
# It will block the datanode start if it can't receive leases in the heartbeat from metasrv.
require_lease_before_startup = false

# Initialize all regions in the background during the startup.
# By default, it provides services after all regions have been initialized.
initialize_region_in_background = false

[heartbeat]
# Interval for sending heartbeat messages to the Metasrv, 3 seconds by default.
interval = "3s"
Expand Down
12 changes: 11 additions & 1 deletion src/common/error/src/status_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ pub enum StatusCode {
RegionNotFound = 4005,
RegionAlreadyExists = 4006,
RegionReadonly = 4007,
RegionNotReady = 4008,
// If mutually exclusive operations are reached at the same time,
// only one can be executed, another one will get region busy.
RegionBusy = 4009,
// ====== End of catalog related status code =======

// ====== Begin of storage related status code =====
Expand Down Expand Up @@ -103,7 +107,9 @@ impl StatusCode {
match self {
StatusCode::StorageUnavailable
| StatusCode::RuntimeResourcesExhausted
| StatusCode::Internal => true,
| StatusCode::Internal
| StatusCode::RegionNotReady
| StatusCode::RegionBusy => true,

StatusCode::Success
| StatusCode::Unknown
Expand Down Expand Up @@ -152,6 +158,8 @@ impl StatusCode {
| StatusCode::TableAlreadyExists
| StatusCode::TableNotFound
| StatusCode::RegionNotFound
| StatusCode::RegionNotReady
| StatusCode::RegionBusy
| StatusCode::RegionAlreadyExists
| StatusCode::RegionReadonly
| StatusCode::TableColumnNotFound
Expand Down Expand Up @@ -183,6 +191,8 @@ impl StatusCode {
v if v == StatusCode::TableAlreadyExists as u32 => Some(StatusCode::TableAlreadyExists),
v if v == StatusCode::TableNotFound as u32 => Some(StatusCode::TableNotFound),
v if v == StatusCode::RegionNotFound as u32 => Some(StatusCode::RegionNotFound),
v if v == StatusCode::RegionNotReady as u32 => Some(StatusCode::RegionNotReady),
v if v == StatusCode::RegionBusy as u32 => Some(StatusCode::RegionBusy),
v if v == StatusCode::RegionAlreadyExists as u32 => {
Some(StatusCode::RegionAlreadyExists)
}
Expand Down
2 changes: 2 additions & 0 deletions src/datanode/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ pub struct DatanodeOptions {
pub mode: Mode,
pub node_id: Option<u64>,
pub require_lease_before_startup: bool,
pub initialize_region_in_background: bool,
pub rpc_addr: String,
pub rpc_hostname: Option<String>,
pub rpc_runtime_size: usize,
Expand All @@ -249,6 +250,7 @@ impl Default for DatanodeOptions {
mode: Mode::Standalone,
node_id: None,
require_lease_before_startup: false,
initialize_region_in_background: false,
rpc_addr: "127.0.0.1:3001".to_string(),
rpc_hostname: None,
rpc_runtime_size: 8,
Expand Down
159 changes: 97 additions & 62 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ use common_config::wal::{KafkaConfig, RaftEngineConfig};
use common_config::{WalConfig, WAL_OPTIONS_KEY};
use common_error::ext::BoxedError;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_meta::key::datanode_table::DatanodeTableManager;
use common_meta::key::datanode_table::{DatanodeTableManager, DatanodeTableValue};
use common_meta::kv_backend::KvBackendRef;
pub use common_procedure::options::ProcedureConfig;
use common_runtime::Runtime;
use common_telemetry::{error, info, warn};
use file_engine::engine::FileRegionEngine;
use futures::future;
use futures_util::future::try_join_all;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use log_store::kafka::log_store::KafkaLogStore;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use meta_client::client::MetaClient;
Expand Down Expand Up @@ -213,6 +213,7 @@ impl DatanodeBuilder {

pub async fn build(mut self) -> Result<Datanode> {
let mode = &self.opts.mode;
let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;

let meta_client = self.meta_client.take();

Expand All @@ -233,8 +234,26 @@ impl DatanodeBuilder {

let region_server = self.new_region_server(region_event_listener).await?;

self.initialize_region_server(&region_server, kv_backend, !controlled_by_metasrv)
.await?;
let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone());
let table_values = datanode_table_manager
.tables(node_id)
.try_collect::<Vec<_>>()
.await
.context(GetMetadataSnafu)?;

let open_all_regions =
open_all_regions(region_server.clone(), table_values, !controlled_by_metasrv);

if self.opts.initialize_region_in_background {
// Opens regions in background.
common_runtime::spawn_bg(async move {
if let Err(err) = open_all_regions.await {
error!(err; "Failed to open regions during the startup.");
}
});
} else {
open_all_regions.await?;
}

let heartbeat_task = if let Some(meta_client) = meta_client {
Some(HeartbeatTask::try_new(&self.opts, region_server.clone(), meta_client).await?)
Expand Down Expand Up @@ -331,74 +350,24 @@ impl DatanodeBuilder {
Ok((server, addr))
}

#[cfg(test)]
/// Open all regions belong to this datanode.
async fn initialize_region_server(
&self,
region_server: &RegionServer,
kv_backend: KvBackendRef,
open_with_writable: bool,
) -> Result<()> {
let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone());
let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
let mut regions = vec![];
let mut table_values = datanode_table_manager.tables(node_id);

while let Some(table_value) = table_values.next().await {
let table_value = table_value.context(GetMetadataSnafu)?;
for region_number in table_value.regions {
// Augments region options with wal options if a wal options is provided.
let mut region_options = table_value.region_info.region_options.clone();
table_value
.region_info
.region_wal_options
.get(&region_number.to_string())
.and_then(|wal_options| {
region_options.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone())
});

regions.push((
RegionId::new(table_value.table_id, region_number),
table_value.region_info.engine.clone(),
table_value.region_info.region_storage_path.clone(),
region_options,
));
}
}
info!("going to open {} regions", regions.len());
let semaphore = Arc::new(tokio::sync::Semaphore::new(OPEN_REGION_PARALLELISM));
let mut tasks = vec![];

for (region_id, engine, store_path, options) in regions {
let region_dir = region_dir(&store_path, region_id);
let semaphore_moved = semaphore.clone();
tasks.push(async move {
let _permit = semaphore_moved.acquire().await;
region_server
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: engine.clone(),
region_dir,
options,
skip_wal_replay: false,
}),
)
.await?;
if open_with_writable {
if let Err(e) = region_server.set_writable(region_id, true) {
error!(
e; "failed to set writable for region {region_id}"
);
}
}
Ok(())
});
}
let _ = try_join_all(tasks).await?;

info!("region server is initialized");
let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone());
let table_values = datanode_table_manager
.tables(node_id)
.try_collect::<Vec<_>>()
.await
.context(GetMetadataSnafu)?;

Ok(())
open_all_regions(region_server.clone(), table_values, open_with_writable).await
}

async fn new_region_server(
Expand Down Expand Up @@ -544,6 +513,72 @@ impl DatanodeBuilder {
}
}

/// Open all regions belong to this datanode.
async fn open_all_regions(
region_server: RegionServer,
table_values: Vec<DatanodeTableValue>,
open_with_writable: bool,
) -> Result<()> {
let mut regions = vec![];
for table_value in table_values {
for region_number in table_value.regions {
// Augments region options with wal options if a wal options is provided.
let mut region_options = table_value.region_info.region_options.clone();
table_value
.region_info
.region_wal_options
.get(&region_number.to_string())
.and_then(|wal_options| {
region_options.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone())
});

regions.push((
RegionId::new(table_value.table_id, region_number),
table_value.region_info.engine.clone(),
table_value.region_info.region_storage_path.clone(),
region_options,
));
}
}
info!("going to open {} regions", regions.len());
let semaphore = Arc::new(tokio::sync::Semaphore::new(OPEN_REGION_PARALLELISM));
let mut tasks = vec![];

let region_server_ref = &region_server;
for (region_id, engine, store_path, options) in regions {
let region_dir = region_dir(&store_path, region_id);
let semaphore_moved = semaphore.clone();

tasks.push(async move {
let _permit = semaphore_moved.acquire().await;
region_server_ref
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: engine.clone(),
region_dir,
options,
skip_wal_replay: false,
}),
)
.await?;
if open_with_writable {
if let Err(e) = region_server_ref.set_writable(region_id, true) {
error!(
e; "failed to set writable for region {region_id}"
);
}
}
Ok(())
});
}
let _ = try_join_all(tasks).await?;

info!("all regions are opened");

Ok(())
}

#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
Expand Down
14 changes: 14 additions & 0 deletions src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,18 @@ pub enum Error {
location: Location,
},

#[snafu(display("Region {} not ready", region_id))]
RegionNotReady {
region_id: RegionId,
location: Location,
},

#[snafu(display("Region {} is busy", region_id))]
RegionBusy {
region_id: RegionId,
location: Location,
},

#[snafu(display("Region engine {} is not registered", name))]
RegionEngineNotFound { name: String, location: Location },

Expand Down Expand Up @@ -295,6 +307,8 @@ impl ErrorExt for Error {
| GetRegionMetadata { .. } => StatusCode::Internal,

RegionNotFound { .. } => StatusCode::RegionNotFound,
RegionNotReady { .. } => StatusCode::RegionNotReady,
RegionBusy { .. } => StatusCode::RegionBusy,

StartServer { source, .. } | ShutdownServer { source, .. } => source.status_code(),

Expand Down
Loading
Loading