Skip to content

Commit

Permalink
refactor: open regions in background
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Dec 13, 2023
1 parent d3da128 commit 1ea29ed
Showing 1 changed file with 73 additions and 51 deletions.
124 changes: 73 additions & 51 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ impl DatanodeBuilder {

pub async fn build(mut self) -> Result<Datanode> {
let mode = &self.opts.mode;
let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;

let meta_client = self.meta_client.take();

Expand All @@ -233,8 +234,18 @@ impl DatanodeBuilder {
.new_region_server(log_store, region_event_listener)
.await?;

self.initialize_region_server(&region_server, kv_backend, !controlled_by_metasrv)
.await?;
let region_server_moved = region_server.clone();

// Opens regions in background.
common_runtime::spawn_bg(async move {
open_all_regions(
&region_server_moved,
kv_backend,
!controlled_by_metasrv,
node_id,
)
.await
});

let heartbeat_task = if let Some(meta_client) = meta_client {
Some(HeartbeatTask::try_new(&self.opts, region_server.clone(), meta_client).await?)
Expand Down Expand Up @@ -331,63 +342,16 @@ impl DatanodeBuilder {
Ok((server, addr))
}

#[cfg(test)]
/// Open all regions belong to this datanode.
async fn initialize_region_server(
&self,
region_server: &RegionServer,
kv_backend: KvBackendRef,
open_with_writable: bool,
) -> Result<()> {
let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone());
let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
let mut regions = vec![];
let mut table_values = datanode_table_manager.tables(node_id);

while let Some(table_value) = table_values.next().await {
let table_value = table_value.context(GetMetadataSnafu)?;
for region_number in table_value.regions {
regions.push((
RegionId::new(table_value.table_id, region_number),
table_value.region_info.engine.clone(),
table_value.region_info.region_storage_path.clone(),
table_value.region_info.region_options.clone(),
));
}
}
info!("going to open {} regions", regions.len());
let semaphore = Arc::new(tokio::sync::Semaphore::new(OPEN_REGION_PARALLELISM));
let mut tasks = vec![];

for (region_id, engine, store_path, options) in regions {
let region_dir = region_dir(&store_path, region_id);
let semaphore_moved = semaphore.clone();
tasks.push(async move {
let _permit = semaphore_moved.acquire().await;
region_server
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: engine.clone(),
region_dir,
options,
}),
)
.await?;
if open_with_writable {
if let Err(e) = region_server.set_writable(region_id, true) {
error!(
e; "failed to set writable for region {region_id}"
);
}
}
Ok(())
});
}
let _ = try_join_all(tasks).await?;

info!("region server is initialized");

Ok(())
open_all_regions(region_server, kv_backend, open_with_writable, node_id).await
}

async fn new_region_server(
Expand Down Expand Up @@ -506,6 +470,64 @@ impl DatanodeBuilder {
}
}

/// Open all regions belong to this datanode.
async fn open_all_regions(
region_server: &RegionServer,
kv_backend: KvBackendRef,
open_with_writable: bool,
node_id: u64,
) -> Result<()> {
let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone());
let mut regions = vec![];
let mut table_values = datanode_table_manager.tables(node_id);

while let Some(table_value) = table_values.next().await {
let table_value = table_value.context(GetMetadataSnafu)?;
for region_number in table_value.regions {
regions.push((
RegionId::new(table_value.table_id, region_number),
table_value.region_info.engine.clone(),
table_value.region_info.region_storage_path.clone(),
table_value.region_info.region_options.clone(),
));
}
}
info!("going to open {} regions", regions.len());
let semaphore = Arc::new(tokio::sync::Semaphore::new(OPEN_REGION_PARALLELISM));
let mut tasks = vec![];

for (region_id, engine, store_path, options) in regions {
let region_dir = region_dir(&store_path, region_id);
let semaphore_moved = semaphore.clone();
tasks.push(async move {
let _permit = semaphore_moved.acquire().await;
region_server
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: engine.clone(),
region_dir,
options,
}),
)
.await?;
if open_with_writable {
if let Err(e) = region_server.set_writable(region_id, true) {
error!(
e; "failed to set writable for region {region_id}"
);
}
}
Ok(())
});
}
let _ = try_join_all(tasks).await?;

info!("all regions are opened");

Ok(())
}

#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
Expand Down

0 comments on commit 1ea29ed

Please sign in to comment.