From 1ea29ed3f774b975b809ffac568b58e4b860237b Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 13 Dec 2023 15:11:05 +0000 Subject: [PATCH] refactor: open regions in background --- src/datanode/src/datanode.rs | 124 +++++++++++++++++++++-------------- 1 file changed, 73 insertions(+), 51 deletions(-) diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 8910f444e328..c55145e5269c 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -209,6 +209,7 @@ impl DatanodeBuilder { pub async fn build(mut self) -> Result { let mode = &self.opts.mode; + let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?; let meta_client = self.meta_client.take(); @@ -233,8 +234,18 @@ impl DatanodeBuilder { .new_region_server(log_store, region_event_listener) .await?; - self.initialize_region_server(®ion_server, kv_backend, !controlled_by_metasrv) - .await?; + let region_server_moved = region_server.clone(); + + // Opens regions in background. + common_runtime::spawn_bg(async move { + open_all_regions( + ®ion_server_moved, + kv_backend, + !controlled_by_metasrv, + node_id, + ) + .await + }); let heartbeat_task = if let Some(meta_client) = meta_client { Some(HeartbeatTask::try_new(&self.opts, region_server.clone(), meta_client).await?) @@ -331,6 +342,7 @@ impl DatanodeBuilder { Ok((server, addr)) } + #[cfg(test)] /// Open all regions belong to this datanode. async fn initialize_region_server( &self, @@ -338,56 +350,8 @@ impl DatanodeBuilder { kv_backend: KvBackendRef, open_with_writable: bool, ) -> Result<()> { - let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone()); let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?; - let mut regions = vec![]; - let mut table_values = datanode_table_manager.tables(node_id); - - while let Some(table_value) = table_values.next().await { - let table_value = table_value.context(GetMetadataSnafu)?; - for region_number in table_value.regions { - regions.push(( - RegionId::new(table_value.table_id, region_number), - table_value.region_info.engine.clone(), - table_value.region_info.region_storage_path.clone(), - table_value.region_info.region_options.clone(), - )); - } - } - info!("going to open {} regions", regions.len()); - let semaphore = Arc::new(tokio::sync::Semaphore::new(OPEN_REGION_PARALLELISM)); - let mut tasks = vec![]; - - for (region_id, engine, store_path, options) in regions { - let region_dir = region_dir(&store_path, region_id); - let semaphore_moved = semaphore.clone(); - tasks.push(async move { - let _permit = semaphore_moved.acquire().await; - region_server - .handle_request( - region_id, - RegionRequest::Open(RegionOpenRequest { - engine: engine.clone(), - region_dir, - options, - }), - ) - .await?; - if open_with_writable { - if let Err(e) = region_server.set_writable(region_id, true) { - error!( - e; "failed to set writable for region {region_id}" - ); - } - } - Ok(()) - }); - } - let _ = try_join_all(tasks).await?; - - info!("region server is initialized"); - - Ok(()) + open_all_regions(region_server, kv_backend, open_with_writable, node_id).await } async fn new_region_server( @@ -506,6 +470,64 @@ impl DatanodeBuilder { } } +/// Open all regions belong to this datanode. +async fn open_all_regions( + region_server: &RegionServer, + kv_backend: KvBackendRef, + open_with_writable: bool, + node_id: u64, +) -> Result<()> { + let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone()); + let mut regions = vec![]; + let mut table_values = datanode_table_manager.tables(node_id); + + while let Some(table_value) = table_values.next().await { + let table_value = table_value.context(GetMetadataSnafu)?; + for region_number in table_value.regions { + regions.push(( + RegionId::new(table_value.table_id, region_number), + table_value.region_info.engine.clone(), + table_value.region_info.region_storage_path.clone(), + table_value.region_info.region_options.clone(), + )); + } + } + info!("going to open {} regions", regions.len()); + let semaphore = Arc::new(tokio::sync::Semaphore::new(OPEN_REGION_PARALLELISM)); + let mut tasks = vec![]; + + for (region_id, engine, store_path, options) in regions { + let region_dir = region_dir(&store_path, region_id); + let semaphore_moved = semaphore.clone(); + tasks.push(async move { + let _permit = semaphore_moved.acquire().await; + region_server + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: engine.clone(), + region_dir, + options, + }), + ) + .await?; + if open_with_writable { + if let Err(e) = region_server.set_writable(region_id, true) { + error!( + e; "failed to set writable for region {region_id}" + ); + } + } + Ok(()) + }); + } + let _ = try_join_all(tasks).await?; + + info!("all regions are opened"); + + Ok(()) +} + #[cfg(test)] mod tests { use std::assert_matches::assert_matches;