Skip to content

Commit

Permalink
feat: invoke handle_batch_open_requests (#4107)
Browse files Browse the repository at this point in the history
* feat: open all regions via invoking `handle_batch_open_requests`

* tests: add sqlness tests

* refactor: avoid cloning

* chore: apply suggestions from CR

* chore: update config.md

* chore: apply suggestions from CR
  • Loading branch information
WenyXu authored Jun 7, 2024
1 parent e982d2e commit e142ca4
Show file tree
Hide file tree
Showing 8 changed files with 317 additions and 53 deletions.
1 change: 1 addition & 0 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@
| `node_id` | Integer | `None` | The datanode identifier and should be unique in the cluster. |
| `require_lease_before_startup` | Bool | `false` | Start services after regions have obtained leases.<br/>It will block the datanode start if it can't receive leases in the heartbeat from metasrv. |
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.<br/>By default, it provides services after all regions have been initialized. |
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
| `rpc_addr` | String | `127.0.0.1:3001` | The gRPC address of the datanode. |
| `rpc_hostname` | String | `None` | The hostname of the datanode. |
| `rpc_runtime_size` | Integer | `8` | The number of gRPC server worker threads. |
Expand Down
3 changes: 3 additions & 0 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ require_lease_before_startup = false
## By default, it provides services after all regions have been initialized.
init_regions_in_background = false

## Parallelism of initializing regions.
init_regions_parallelism = 16

## The gRPC address of the datanode.
rpc_addr = "127.0.0.1:3001"

Expand Down
2 changes: 2 additions & 0 deletions src/datanode/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ pub struct DatanodeOptions {
pub node_id: Option<u64>,
pub require_lease_before_startup: bool,
pub init_regions_in_background: bool,
pub init_regions_parallelism: usize,
pub rpc_addr: String,
pub rpc_hostname: Option<String>,
pub rpc_runtime_size: usize,
Expand Down Expand Up @@ -291,6 +292,7 @@ impl Default for DatanodeOptions {
node_id: None,
require_lease_before_startup: false,
init_regions_in_background: false,
init_regions_parallelism: 16,
rpc_addr: "127.0.0.1:3001".to_string(),
rpc_hostname: None,
rpc_runtime_size: 8,
Expand Down
90 changes: 52 additions & 38 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use common_wal::config::kafka::DatanodeKafkaConfig;
use common_wal::config::raft_engine::RaftEngineConfig;
use common_wal::config::DatanodeWalConfig;
use file_engine::engine::FileRegionEngine;
use futures_util::future::try_join_all;
use futures_util::TryStreamExt;
use log_store::kafka::log_store::KafkaLogStore;
use log_store::raft_engine::log_store::RaftEngineLogStore;
Expand All @@ -45,17 +44,17 @@ use query::QueryEngineFactory;
use servers::export_metrics::ExportMetricsTask;
use servers::server::ServerHandlers;
use servers::Mode;
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::path_utils::{region_dir, WAL_DIR};
use store_api::region_engine::RegionEngineRef;
use store_api::region_request::{RegionOpenRequest, RegionRequest};
use store_api::region_request::RegionOpenRequest;
use store_api::storage::RegionId;
use tokio::fs;
use tokio::sync::Notify;

use crate::config::{DatanodeOptions, RegionEngineConfig};
use crate::error::{
BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu, MissingKvBackendSnafu,
self, BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu, MissingKvBackendSnafu,
MissingNodeIdSnafu, OpenLogStoreSnafu, Result, RuntimeResourceSnafu, ShutdownInstanceSnafu,
ShutdownServerSnafu, StartServerSnafu,
};
Expand All @@ -68,8 +67,6 @@ use crate::heartbeat::HeartbeatTask;
use crate::region_server::{DummyTableProviderFactory, RegionServer};
use crate::store;

const OPEN_REGION_PARALLELISM: usize = 16;

/// Datanode service.
pub struct Datanode {
services: ServerHandlers,
Expand Down Expand Up @@ -219,8 +216,12 @@ impl DatanodeBuilder {
.await
.context(GetMetadataSnafu)?;

let open_all_regions =
open_all_regions(region_server.clone(), table_values, !controlled_by_metasrv);
let open_all_regions = open_all_regions(
region_server.clone(),
table_values,
!controlled_by_metasrv,
self.opts.init_regions_parallelism,
);

if self.opts.init_regions_in_background {
// Opens regions in background.
Expand Down Expand Up @@ -286,7 +287,13 @@ impl DatanodeBuilder {
.await
.context(GetMetadataSnafu)?;

open_all_regions(region_server.clone(), table_values, open_with_writable).await
open_all_regions(
region_server.clone(),
table_values,
open_with_writable,
self.opts.init_regions_parallelism,
)
.await
}

async fn new_region_server(
Expand Down Expand Up @@ -447,6 +454,7 @@ async fn open_all_regions(
region_server: RegionServer,
table_values: Vec<DatanodeTableValue>,
open_with_writable: bool,
init_regions_parallelism: usize,
) -> Result<()> {
let mut regions = vec![];
for table_value in table_values {
Expand All @@ -467,40 +475,46 @@ async fn open_all_regions(
));
}
}
info!("going to open {} region(s)", regions.len());
let semaphore = Arc::new(tokio::sync::Semaphore::new(OPEN_REGION_PARALLELISM));
let mut tasks = vec![];
let num_regions = regions.len();
info!("going to open {} region(s)", num_regions);

let region_server_ref = &region_server;
let mut region_requests = Vec::with_capacity(regions.len());
for (region_id, engine, store_path, options) in regions {
let region_dir = region_dir(&store_path, region_id);
let semaphore_moved = semaphore.clone();

tasks.push(async move {
let _permit = semaphore_moved.acquire().await;
region_server_ref
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: engine.clone(),
region_dir,
options,
skip_wal_replay: false,
}),
)
.await?;
if open_with_writable {
if let Err(e) = region_server_ref.set_writable(region_id, true) {
error!(
e; "failed to set writable for region {region_id}"
);
}
}
Ok(())
});
region_requests.push((
region_id,
RegionOpenRequest {
engine,
region_dir,
options,
skip_wal_replay: false,
},
));
}
let _ = try_join_all(tasks).await?;

let open_regions = region_server
.handle_batch_open_requests(init_regions_parallelism, region_requests)
.await?;
ensure!(
open_regions.len() == num_regions,
error::UnexpectedSnafu {
violated: format!(
"Expected to open {} of regions, only {} of regions has opened",
num_regions,
open_regions.len()
)
}
);

for region_id in open_regions {
if open_with_writable {
if let Err(e) = region_server.set_writable(region_id, true) {
error!(
e; "failed to set writable for region {region_id}"
);
}
}
}
info!("all regions are opened");

Ok(())
Expand Down
13 changes: 10 additions & 3 deletions src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,13 @@ pub enum Error {
source: BoxedError,
},

#[snafu(display("Failed to open batch regions"))]
HandleBatchOpenRequest {
#[snafu(implicit)]
location: Location,
source: BoxedError,
},

#[snafu(display("RegionId {} not found", region_id))]
RegionNotFound {
region_id: RegionId,
Expand Down Expand Up @@ -455,9 +462,9 @@ impl ErrorExt for Error {
TableIdProviderNotFound { .. } | UnsupportedGrpcRequest { .. } => {
StatusCode::Unsupported
}
HandleRegionRequest { source, .. } | GetRegionMetadata { source, .. } => {
source.status_code()
}
HandleRegionRequest { source, .. }
| GetRegionMetadata { source, .. }
| HandleBatchOpenRequest { source, .. } => source.status_code(),
StopRegionEngine { source, .. } => source.status_code(),

FindLogicalRegions { source, .. } => source.status_code(),
Expand Down
132 changes: 127 additions & 5 deletions src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use common_recordbatch::SendableRecordBatchStream;
use common_runtime::Runtime;
use common_telemetry::tracing::{self, info_span};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{info, warn};
use common_telemetry::{error, info, warn};
use dashmap::DashMap;
use futures_util::future::try_join_all;
use metric_engine::engine::MetricEngine;
Expand All @@ -49,15 +49,17 @@ use store_api::metric_engine_consts::{
FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
};
use store_api::region_engine::{RegionEngineRef, RegionRole, SetReadonlyResponse};
use store_api::region_request::{AffectedRows, RegionCloseRequest, RegionRequest};
use store_api::region_request::{
AffectedRows, RegionCloseRequest, RegionOpenRequest, RegionRequest,
};
use store_api::storage::RegionId;
use tonic::{Request, Response, Result as TonicResult};

use crate::error::{
self, BuildRegionRequestsSnafu, DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu,
FindLogicalRegionsSnafu, HandleRegionRequestSnafu, NewPlanDecoderSnafu,
RegionEngineNotFoundSnafu, RegionNotFoundSnafu, Result, StopRegionEngineSnafu, UnexpectedSnafu,
UnsupportedOutputSnafu,
FindLogicalRegionsSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu,
NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, Result,
StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
};
use crate::event_listener::RegionServerEventListenerRef;

Expand Down Expand Up @@ -116,6 +118,17 @@ impl RegionServer {
})
}

#[tracing::instrument(skip_all)]
pub async fn handle_batch_open_requests(
&self,
parallelism: usize,
requests: Vec<(RegionId, RegionOpenRequest)>,
) -> Result<Vec<RegionId>> {
self.inner
.handle_batch_open_requests(parallelism, requests)
.await
}

#[tracing::instrument(skip_all, fields(request_type = request.request_type()))]
pub async fn handle_request(
&self,
Expand Down Expand Up @@ -454,6 +467,113 @@ impl RegionServerInner {
Ok(CurrentEngine::Engine(engine))
}

async fn handle_batch_open_requests_inner(
&self,
engine: RegionEngineRef,
parallelism: usize,
requests: Vec<(RegionId, RegionOpenRequest)>,
) -> Result<Vec<RegionId>> {
let region_changes = requests
.iter()
.map(|(region_id, open)| {
let attribute = parse_region_attribute(&open.engine, &open.options)?;
Ok((*region_id, RegionChange::Register(attribute)))
})
.collect::<Result<HashMap<_, _>>>()?;

for (&region_id, region_change) in &region_changes {
self.set_region_status_not_ready(region_id, &engine, region_change)
}

let mut open_regions = Vec::with_capacity(requests.len());
let mut errors = vec![];
match engine
.handle_batch_open_requests(parallelism, requests)
.await
.with_context(|_| HandleBatchOpenRequestSnafu)
{
Ok(results) => {
for (region_id, result) in results {
let region_change = &region_changes[&region_id];
match result {
Ok(_) => {
if let Err(e) = self
.set_region_status_ready(region_id, engine.clone(), *region_change)
.await
{
error!(e; "Failed to set region to ready: {}", region_id);
errors.push(BoxedError::new(e));
} else {
open_regions.push(region_id)
}
}
Err(e) => {
self.unset_region_status(region_id, *region_change);
error!(e; "Failed to open region: {}", region_id);
errors.push(e);
}
}
}
}
Err(e) => {
for (&region_id, region_change) in &region_changes {
self.unset_region_status(region_id, *region_change);
}
error!(e; "Failed to open batch regions");
errors.push(BoxedError::new(e));
}
}

if !errors.is_empty() {
return error::UnexpectedSnafu {
// Returns the first error.
violated: format!("Failed to open batch regions: {:?}", errors[0]),
}
.fail();
}

Ok(open_regions)
}

pub async fn handle_batch_open_requests(
&self,
parallelism: usize,
requests: Vec<(RegionId, RegionOpenRequest)>,
) -> Result<Vec<RegionId>> {
let mut engine_grouped_requests: HashMap<String, Vec<_>> =
HashMap::with_capacity(requests.len());
for (region_id, request) in requests {
if let Some(requests) = engine_grouped_requests.get_mut(&request.engine) {
requests.push((region_id, request));
} else {
engine_grouped_requests
.insert(request.engine.to_string(), vec![(region_id, request)]);
}
}

let mut results = Vec::with_capacity(engine_grouped_requests.keys().len());
for (engine, requests) in engine_grouped_requests {
let engine = self
.engines
.read()
.unwrap()
.get(&engine)
.with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
.clone();
results.push(
self.handle_batch_open_requests_inner(engine, parallelism, requests)
.await,
)
}

Ok(results
.into_iter()
.collect::<Result<Vec<_>>>()?
.into_iter()
.flatten()
.collect::<Vec<_>>())
}

pub async fn handle_request(
&self,
region_id: RegionId,
Expand Down Expand Up @@ -715,6 +835,7 @@ impl RegionServerInner {
}
}

#[derive(Debug, Clone, Copy)]
enum RegionChange {
None,
Register(RegionAttribute),
Expand All @@ -740,6 +861,7 @@ fn parse_region_attribute(
}
}

#[derive(Debug, Clone, Copy)]
enum RegionAttribute {
Mito,
Metric { physical: bool },
Expand Down
Loading

0 comments on commit e142ca4

Please sign in to comment.