diff --git a/config/config.md b/config/config.md
index 5c4878c9d37d..97efadc3177d 100644
--- a/config/config.md
+++ b/config/config.md
@@ -306,6 +306,7 @@
| `node_id` | Integer | `None` | The datanode identifier and should be unique in the cluster. |
| `require_lease_before_startup` | Bool | `false` | Start services after regions have obtained leases.
It will block the datanode start if it can't receive leases in the heartbeat from metasrv. |
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.
By default, it provides services after all regions have been initialized. |
+| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
| `rpc_addr` | String | `127.0.0.1:3001` | The gRPC address of the datanode. |
| `rpc_hostname` | String | `None` | The hostname of the datanode. |
| `rpc_runtime_size` | Integer | `8` | The number of gRPC server worker threads. |
diff --git a/config/datanode.example.toml b/config/datanode.example.toml
index 3a20d3ac5f16..16ffbbb0a4b9 100644
--- a/config/datanode.example.toml
+++ b/config/datanode.example.toml
@@ -13,6 +13,9 @@ require_lease_before_startup = false
## By default, it provides services after all regions have been initialized.
init_regions_in_background = false
+## Parallelism of initializing regions.
+init_regions_parallelism = 16
+
## The gRPC address of the datanode.
rpc_addr = "127.0.0.1:3001"
diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs
index 7e76c7d68169..8696cac7ea1e 100644
--- a/src/datanode/src/config.rs
+++ b/src/datanode/src/config.rs
@@ -264,6 +264,7 @@ pub struct DatanodeOptions {
pub node_id: Option,
pub require_lease_before_startup: bool,
pub init_regions_in_background: bool,
+ pub init_regions_parallelism: usize,
pub rpc_addr: String,
pub rpc_hostname: Option,
pub rpc_runtime_size: usize,
@@ -291,6 +292,7 @@ impl Default for DatanodeOptions {
node_id: None,
require_lease_before_startup: false,
init_regions_in_background: false,
+ init_regions_parallelism: 16,
rpc_addr: "127.0.0.1:3001".to_string(),
rpc_hostname: None,
rpc_runtime_size: 8,
diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs
index cc88a3cdcd07..21abc202afc7 100644
--- a/src/datanode/src/datanode.rs
+++ b/src/datanode/src/datanode.rs
@@ -31,7 +31,6 @@ use common_wal::config::kafka::DatanodeKafkaConfig;
use common_wal::config::raft_engine::RaftEngineConfig;
use common_wal::config::DatanodeWalConfig;
use file_engine::engine::FileRegionEngine;
-use futures_util::future::try_join_all;
use futures_util::TryStreamExt;
use log_store::kafka::log_store::KafkaLogStore;
use log_store::raft_engine::log_store::RaftEngineLogStore;
@@ -45,17 +44,17 @@ use query::QueryEngineFactory;
use servers::export_metrics::ExportMetricsTask;
use servers::server::ServerHandlers;
use servers::Mode;
-use snafu::{OptionExt, ResultExt};
+use snafu::{ensure, OptionExt, ResultExt};
use store_api::path_utils::{region_dir, WAL_DIR};
use store_api::region_engine::RegionEngineRef;
-use store_api::region_request::{RegionOpenRequest, RegionRequest};
+use store_api::region_request::RegionOpenRequest;
use store_api::storage::RegionId;
use tokio::fs;
use tokio::sync::Notify;
use crate::config::{DatanodeOptions, RegionEngineConfig};
use crate::error::{
- BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu, MissingKvBackendSnafu,
+ self, BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu, MissingKvBackendSnafu,
MissingNodeIdSnafu, OpenLogStoreSnafu, Result, RuntimeResourceSnafu, ShutdownInstanceSnafu,
ShutdownServerSnafu, StartServerSnafu,
};
@@ -68,8 +67,6 @@ use crate::heartbeat::HeartbeatTask;
use crate::region_server::{DummyTableProviderFactory, RegionServer};
use crate::store;
-const OPEN_REGION_PARALLELISM: usize = 16;
-
/// Datanode service.
pub struct Datanode {
services: ServerHandlers,
@@ -219,8 +216,12 @@ impl DatanodeBuilder {
.await
.context(GetMetadataSnafu)?;
- let open_all_regions =
- open_all_regions(region_server.clone(), table_values, !controlled_by_metasrv);
+ let open_all_regions = open_all_regions(
+ region_server.clone(),
+ table_values,
+ !controlled_by_metasrv,
+ self.opts.init_regions_parallelism,
+ );
if self.opts.init_regions_in_background {
// Opens regions in background.
@@ -286,7 +287,13 @@ impl DatanodeBuilder {
.await
.context(GetMetadataSnafu)?;
- open_all_regions(region_server.clone(), table_values, open_with_writable).await
+ open_all_regions(
+ region_server.clone(),
+ table_values,
+ open_with_writable,
+ self.opts.init_regions_parallelism,
+ )
+ .await
}
async fn new_region_server(
@@ -447,6 +454,7 @@ async fn open_all_regions(
region_server: RegionServer,
table_values: Vec,
open_with_writable: bool,
+ init_regions_parallelism: usize,
) -> Result<()> {
let mut regions = vec![];
for table_value in table_values {
@@ -467,40 +475,46 @@ async fn open_all_regions(
));
}
}
- info!("going to open {} region(s)", regions.len());
- let semaphore = Arc::new(tokio::sync::Semaphore::new(OPEN_REGION_PARALLELISM));
- let mut tasks = vec![];
+ let num_regions = regions.len();
+ info!("going to open {} region(s)", num_regions);
- let region_server_ref = ®ion_server;
+ let mut region_requests = Vec::with_capacity(regions.len());
for (region_id, engine, store_path, options) in regions {
let region_dir = region_dir(&store_path, region_id);
- let semaphore_moved = semaphore.clone();
-
- tasks.push(async move {
- let _permit = semaphore_moved.acquire().await;
- region_server_ref
- .handle_request(
- region_id,
- RegionRequest::Open(RegionOpenRequest {
- engine: engine.clone(),
- region_dir,
- options,
- skip_wal_replay: false,
- }),
- )
- .await?;
- if open_with_writable {
- if let Err(e) = region_server_ref.set_writable(region_id, true) {
- error!(
- e; "failed to set writable for region {region_id}"
- );
- }
- }
- Ok(())
- });
+ region_requests.push((
+ region_id,
+ RegionOpenRequest {
+ engine,
+ region_dir,
+ options,
+ skip_wal_replay: false,
+ },
+ ));
}
- let _ = try_join_all(tasks).await?;
+ let open_regions = region_server
+ .handle_batch_open_requests(init_regions_parallelism, region_requests)
+ .await?;
+ ensure!(
+ open_regions.len() == num_regions,
+ error::UnexpectedSnafu {
+ violated: format!(
+ "Expected to open {} of regions, only {} of regions has opened",
+ num_regions,
+ open_regions.len()
+ )
+ }
+ );
+
+ for region_id in open_regions {
+ if open_with_writable {
+ if let Err(e) = region_server.set_writable(region_id, true) {
+ error!(
+ e; "failed to set writable for region {region_id}"
+ );
+ }
+ }
+ }
info!("all regions are opened");
Ok(())
diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs
index f1a37f624997..c5956f731bac 100644
--- a/src/datanode/src/error.rs
+++ b/src/datanode/src/error.rs
@@ -292,6 +292,13 @@ pub enum Error {
source: BoxedError,
},
+ #[snafu(display("Failed to open batch regions"))]
+ HandleBatchOpenRequest {
+ #[snafu(implicit)]
+ location: Location,
+ source: BoxedError,
+ },
+
#[snafu(display("RegionId {} not found", region_id))]
RegionNotFound {
region_id: RegionId,
@@ -455,9 +462,9 @@ impl ErrorExt for Error {
TableIdProviderNotFound { .. } | UnsupportedGrpcRequest { .. } => {
StatusCode::Unsupported
}
- HandleRegionRequest { source, .. } | GetRegionMetadata { source, .. } => {
- source.status_code()
- }
+ HandleRegionRequest { source, .. }
+ | GetRegionMetadata { source, .. }
+ | HandleBatchOpenRequest { source, .. } => source.status_code(),
StopRegionEngine { source, .. } => source.status_code(),
FindLogicalRegions { source, .. } => source.status_code(),
diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs
index 13b10c497cef..738d98fdd43b 100644
--- a/src/datanode/src/region_server.rs
+++ b/src/datanode/src/region_server.rs
@@ -30,7 +30,7 @@ use common_recordbatch::SendableRecordBatchStream;
use common_runtime::Runtime;
use common_telemetry::tracing::{self, info_span};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
-use common_telemetry::{info, warn};
+use common_telemetry::{error, info, warn};
use dashmap::DashMap;
use futures_util::future::try_join_all;
use metric_engine::engine::MetricEngine;
@@ -49,15 +49,17 @@ use store_api::metric_engine_consts::{
FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
};
use store_api::region_engine::{RegionEngineRef, RegionRole, SetReadonlyResponse};
-use store_api::region_request::{AffectedRows, RegionCloseRequest, RegionRequest};
+use store_api::region_request::{
+ AffectedRows, RegionCloseRequest, RegionOpenRequest, RegionRequest,
+};
use store_api::storage::RegionId;
use tonic::{Request, Response, Result as TonicResult};
use crate::error::{
self, BuildRegionRequestsSnafu, DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu,
- FindLogicalRegionsSnafu, HandleRegionRequestSnafu, NewPlanDecoderSnafu,
- RegionEngineNotFoundSnafu, RegionNotFoundSnafu, Result, StopRegionEngineSnafu, UnexpectedSnafu,
- UnsupportedOutputSnafu,
+ FindLogicalRegionsSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu,
+ NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, Result,
+ StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
};
use crate::event_listener::RegionServerEventListenerRef;
@@ -116,6 +118,17 @@ impl RegionServer {
})
}
+ #[tracing::instrument(skip_all)]
+ pub async fn handle_batch_open_requests(
+ &self,
+ parallelism: usize,
+ requests: Vec<(RegionId, RegionOpenRequest)>,
+ ) -> Result> {
+ self.inner
+ .handle_batch_open_requests(parallelism, requests)
+ .await
+ }
+
#[tracing::instrument(skip_all, fields(request_type = request.request_type()))]
pub async fn handle_request(
&self,
@@ -454,6 +467,113 @@ impl RegionServerInner {
Ok(CurrentEngine::Engine(engine))
}
+ async fn handle_batch_open_requests_inner(
+ &self,
+ engine: RegionEngineRef,
+ parallelism: usize,
+ requests: Vec<(RegionId, RegionOpenRequest)>,
+ ) -> Result> {
+ let region_changes = requests
+ .iter()
+ .map(|(region_id, open)| {
+ let attribute = parse_region_attribute(&open.engine, &open.options)?;
+ Ok((*region_id, RegionChange::Register(attribute)))
+ })
+ .collect::>>()?;
+
+ for (®ion_id, region_change) in ®ion_changes {
+ self.set_region_status_not_ready(region_id, &engine, region_change)
+ }
+
+ let mut open_regions = Vec::with_capacity(requests.len());
+ let mut errors = vec![];
+ match engine
+ .handle_batch_open_requests(parallelism, requests)
+ .await
+ .with_context(|_| HandleBatchOpenRequestSnafu)
+ {
+ Ok(results) => {
+ for (region_id, result) in results {
+ let region_change = ®ion_changes[®ion_id];
+ match result {
+ Ok(_) => {
+ if let Err(e) = self
+ .set_region_status_ready(region_id, engine.clone(), *region_change)
+ .await
+ {
+ error!(e; "Failed to set region to ready: {}", region_id);
+ errors.push(BoxedError::new(e));
+ } else {
+ open_regions.push(region_id)
+ }
+ }
+ Err(e) => {
+ self.unset_region_status(region_id, *region_change);
+ error!(e; "Failed to open region: {}", region_id);
+ errors.push(e);
+ }
+ }
+ }
+ }
+ Err(e) => {
+ for (®ion_id, region_change) in ®ion_changes {
+ self.unset_region_status(region_id, *region_change);
+ }
+ error!(e; "Failed to open batch regions");
+ errors.push(BoxedError::new(e));
+ }
+ }
+
+ if !errors.is_empty() {
+ return error::UnexpectedSnafu {
+ // Returns the first error.
+ violated: format!("Failed to open batch regions: {:?}", errors[0]),
+ }
+ .fail();
+ }
+
+ Ok(open_regions)
+ }
+
+ pub async fn handle_batch_open_requests(
+ &self,
+ parallelism: usize,
+ requests: Vec<(RegionId, RegionOpenRequest)>,
+ ) -> Result> {
+ let mut engine_grouped_requests: HashMap> =
+ HashMap::with_capacity(requests.len());
+ for (region_id, request) in requests {
+ if let Some(requests) = engine_grouped_requests.get_mut(&request.engine) {
+ requests.push((region_id, request));
+ } else {
+ engine_grouped_requests
+ .insert(request.engine.to_string(), vec![(region_id, request)]);
+ }
+ }
+
+ let mut results = Vec::with_capacity(engine_grouped_requests.keys().len());
+ for (engine, requests) in engine_grouped_requests {
+ let engine = self
+ .engines
+ .read()
+ .unwrap()
+ .get(&engine)
+ .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
+ .clone();
+ results.push(
+ self.handle_batch_open_requests_inner(engine, parallelism, requests)
+ .await,
+ )
+ }
+
+ Ok(results
+ .into_iter()
+ .collect::>>()?
+ .into_iter()
+ .flatten()
+ .collect::>())
+ }
+
pub async fn handle_request(
&self,
region_id: RegionId,
@@ -715,6 +835,7 @@ impl RegionServerInner {
}
}
+#[derive(Debug, Clone, Copy)]
enum RegionChange {
None,
Register(RegionAttribute),
@@ -740,6 +861,7 @@ fn parse_region_attribute(
}
}
+#[derive(Debug, Clone, Copy)]
enum RegionAttribute {
Mito,
Metric { physical: bool },
diff --git a/tests/cases/standalone/common/basic.result b/tests/cases/standalone/common/basic.result
index 3fdd6b1d530d..2651bc733cac 100644
--- a/tests/cases/standalone/common/basic.result
+++ b/tests/cases/standalone/common/basic.result
@@ -54,10 +54,6 @@ SELECT idc, avg(memory_util) FROM system_metrics GROUP BY idc ORDER BY idc;
| idc_b | 66.7 |
+-------+---------------------------------+
-DROP TABLE system_metrics;
-
-Affected Rows: 0
-
create table foo (
host string,
ts timestamp DEFAULT '2023-04-29 00:00:00+00:00',
@@ -90,6 +86,95 @@ select * from foo;
| host3 | 2023-04-29T00:00:00 | 0.0 |
+-------+---------------------+-----+
+CREATE TABLE phy (ts timestamp time index, val double) engine=metric with ("physical_metric_table" = "");
+
+Affected Rows: 0
+
+CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) engine = metric with ("on_physical_table" = "phy");
+
+Affected Rows: 0
+
+INSERT INTO t1 VALUES ('host1',0, 0), ('host2', 1, 1,);
+
+Affected Rows: 2
+
+SELECT * from t1;
+
++-------+-------------------------+-----+
+| host | ts | val |
++-------+-------------------------+-----+
+| host2 | 1970-01-01T00:00:00.001 | 1.0 |
+| host1 | 1970-01-01T00:00:00 | 0.0 |
++-------+-------------------------+-----+
+
+CREATE TABLE t2 (ts timestamp time index, job string primary key, val double) engine = metric with ("on_physical_table" = "phy");
+
+Affected Rows: 0
+
+SELECT * from t2;
+
+++
+++
+
+INSERT INTO t2 VALUES ('job1', 0, 0), ('job2', 1, 1);
+
+Affected Rows: 2
+
+-- SQLNESS ARG restart=true
+SELECT * FROM system_metrics;
+
++-------+-------+----------+-------------+-----------+-------------------------+
+| host | idc | cpu_util | memory_util | disk_util | ts |
++-------+-------+----------+-------------+-----------+-------------------------+
+| host1 | idc_a | 11.8 | 10.3 | 10.3 | 2022-11-03T03:39:57.450 |
+| host1 | idc_b | 50.0 | 66.7 | 40.6 | 2022-11-03T03:39:57.450 |
+| host2 | idc_a | 80.0 | 70.3 | 90.0 | 2022-11-03T03:39:57.450 |
++-------+-------+----------+-------------+-----------+-------------------------+
+
+select * from foo;
+
++-------+---------------------+-----+
+| host | ts | cpu |
++-------+---------------------+-----+
+| host1 | 2000-01-01T00:00:00 | 1.1 |
+| host2 | 2023-04-29T00:00:00 | 2.2 |
+| host3 | 2023-04-29T00:00:00 | 0.0 |
++-------+---------------------+-----+
+
+SELECT * from t1;
+
++-------+-------------------------+-----+
+| host | ts | val |
++-------+-------------------------+-----+
+| host2 | 1970-01-01T00:00:00.001 | 1.0 |
+| host1 | 1970-01-01T00:00:00 | 0.0 |
++-------+-------------------------+-----+
+
+SELECT * from t2;
+
++------+-------------------------+-----+
+| job | ts | val |
++------+-------------------------+-----+
+| job2 | 1970-01-01T00:00:00.001 | 1.0 |
+| job1 | 1970-01-01T00:00:00 | 0.0 |
++------+-------------------------+-----+
+
+DROP TABLE t1;
+
+Affected Rows: 0
+
+DROP TABLE t2;
+
+Affected Rows: 0
+
+DROP TABLE phy;
+
+Affected Rows: 0
+
+DROP TABLE system_metrics;
+
+Affected Rows: 0
+
DROP TABLE foo;
Affected Rows: 0
diff --git a/tests/cases/standalone/common/basic.sql b/tests/cases/standalone/common/basic.sql
index 707f09df75a7..4c4065874256 100644
--- a/tests/cases/standalone/common/basic.sql
+++ b/tests/cases/standalone/common/basic.sql
@@ -23,8 +23,6 @@ SELECT avg(cpu_util) FROM system_metrics;
SELECT idc, avg(memory_util) FROM system_metrics GROUP BY idc ORDER BY idc;
-DROP TABLE system_metrics;
-
create table foo (
host string,
ts timestamp DEFAULT '2023-04-29 00:00:00+00:00',
@@ -41,4 +39,36 @@ insert into foo (host) values ('host3');
select * from foo;
-DROP TABLE foo;
\ No newline at end of file
+CREATE TABLE phy (ts timestamp time index, val double) engine=metric with ("physical_metric_table" = "");
+
+CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) engine = metric with ("on_physical_table" = "phy");
+
+INSERT INTO t1 VALUES ('host1',0, 0), ('host2', 1, 1,);
+
+SELECT * from t1;
+
+CREATE TABLE t2 (ts timestamp time index, job string primary key, val double) engine = metric with ("on_physical_table" = "phy");
+
+SELECT * from t2;
+
+INSERT INTO t2 VALUES ('job1', 0, 0), ('job2', 1, 1);
+
+-- SQLNESS ARG restart=true
+
+SELECT * FROM system_metrics;
+
+select * from foo;
+
+SELECT * from t1;
+
+SELECT * from t2;
+
+DROP TABLE t1;
+
+DROP TABLE t2;
+
+DROP TABLE phy;
+
+DROP TABLE system_metrics;
+
+DROP TABLE foo;