Skip to content

Commit

Permalink
chore: apply suggestions from CR
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Dec 20, 2023
1 parent c3a9c3d commit d8cd26f
Show file tree
Hide file tree
Showing 5 changed files with 309 additions and 99 deletions.
4 changes: 0 additions & 4 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@ mode = "standalone"
# Whether to enable greptimedb telemetry, true by default.
enable_telemetry = true

# Initialize all regions in the background during the startup.
# By default, it provides services after all regions have been initialized.
initialize_region_in_background = false

# HTTP server options.
[http]
# Server address, "127.0.0.1:4000" by default.
Expand Down
3 changes: 0 additions & 3 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ impl SubCommand {
pub struct StandaloneOptions {
pub mode: Mode,
pub enable_telemetry: bool,
pub initialize_region_in_background: bool,
pub http: HttpOptions,
pub grpc: GrpcOptions,
pub mysql: MysqlOptions,
Expand All @@ -120,7 +119,6 @@ impl Default for StandaloneOptions {
Self {
mode: Mode::Standalone,
enable_telemetry: true,
initialize_region_in_background: false,
http: HttpOptions::default(),
grpc: GrpcOptions::default(),
mysql: MysqlOptions::default(),
Expand Down Expand Up @@ -168,7 +166,6 @@ impl StandaloneOptions {
storage: self.storage,
region_engine: self.region_engine,
rpc_addr: self.grpc.addr,
initialize_region_in_background: self.initialize_region_in_background,
..Default::default()
}
}
Expand Down
47 changes: 23 additions & 24 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ use common_config::wal::{KafkaConfig, RaftEngineConfig};
use common_config::{WalConfig, WAL_OPTIONS_KEY};
use common_error::ext::BoxedError;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_meta::key::datanode_table::DatanodeTableManager;
use common_meta::key::datanode_table::{DatanodeTableManager, DatanodeTableValue};
use common_meta::kv_backend::KvBackendRef;
pub use common_procedure::options::ProcedureConfig;
use common_runtime::Runtime;
use common_telemetry::{error, info, warn};
use file_engine::engine::FileRegionEngine;
use futures::future;
use futures_util::future::try_join_all;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use log_store::kafka::log_store::KafkaLogStore;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use meta_client::client::MetaClient;
Expand Down Expand Up @@ -234,18 +234,21 @@ impl DatanodeBuilder {

let region_server = self.new_region_server(region_event_listener).await?;

let open_all_regions = open_all_regions(
region_server.clone(),
kv_backend,
!controlled_by_metasrv,
node_id,
);
let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone());
let table_values = datanode_table_manager
.tables(node_id)
.try_collect::<Vec<_>>()
.await
.context(GetMetadataSnafu)?;

let open_all_regions =
open_all_regions(region_server.clone(), table_values, !controlled_by_metasrv);

if self.opts.initialize_region_in_background {
// Opens regions in background.
common_runtime::spawn_bg(async move {
if let Err(err) = open_all_regions.await {
error!(err; "Failed to opening regions during the startup.");
error!(err; "Failed to open regions during the startup.");
}
});
} else {
Expand Down Expand Up @@ -356,13 +359,15 @@ impl DatanodeBuilder {
open_with_writable: bool,
) -> Result<()> {
let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
open_all_regions(
region_server.clone(),
kv_backend,
open_with_writable,
node_id,
)
.await

let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone());
let table_values = datanode_table_manager
.tables(node_id)
.try_collect::<Vec<_>>()
.await
.context(GetMetadataSnafu)?;

open_all_regions(region_server.clone(), table_values, open_with_writable).await
}

async fn new_region_server(
Expand Down Expand Up @@ -511,17 +516,11 @@ impl DatanodeBuilder {
/// Open all regions belong to this datanode.
async fn open_all_regions(
region_server: RegionServer,
kv_backend: KvBackendRef,
table_values: Vec<DatanodeTableValue>,
open_with_writable: bool,
node_id: u64,
) -> Result<()> {
let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone());
let mut regions = vec![];
let mut table_values = datanode_table_manager.tables(node_id);

while let Some(table_value) = table_values.next().await {
let table_value = table_value.context(GetMetadataSnafu)?;

for table_value in table_values {
for region_number in table_value.regions {
// Augments region options with wal options if a wal options is provided.
let mut region_options = table_value.region_info.region_options.clone();
Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ pub enum Error {
location: Location,
},

#[snafu(display("RegionId {} not ready", region_id))]
#[snafu(display("Region {} not ready", region_id))]
RegionNotReady {
region_id: RegionId,
location: Location,
Expand Down
Loading

0 comments on commit d8cd26f

Please sign in to comment.