Skip to content

Commit

Permalink
feat: do not require worker's number to be power of 2 (#2732)
Browse files Browse the repository at this point in the history
Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Nov 13, 2023
1 parent 3f981ef commit 6599bb5
Show file tree
Hide file tree
Showing 7 changed files with 10 additions and 12 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/mito2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ lazy_static = "1.4"
log-store = { workspace = true, optional = true }
memcomparable = "0.2"
moka = { workspace = true, features = ["sync"] }
num_cpus = "1.13"
object-store.workspace = true
parquet = { workspace = true, features = ["async"] }
paste.workspace = true
Expand Down
7 changes: 2 additions & 5 deletions src/mito2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ use common_datasource::compression::CompressionType;
use common_telemetry::warn;
use serde::{Deserialize, Serialize};

/// Default region worker num.
const DEFAULT_NUM_WORKERS: usize = 1;
/// Default max running background job.
const DEFAULT_MAX_BG_JOB: usize = 4;

Expand Down Expand Up @@ -72,7 +70,7 @@ pub struct MitoConfig {
impl Default for MitoConfig {
fn default() -> Self {
MitoConfig {
num_workers: DEFAULT_NUM_WORKERS,
num_workers: num_cpus::get() / 2,
worker_channel_size: 128,
worker_request_batch_size: 64,
manifest_checkpoint_distance: 10,
Expand All @@ -94,9 +92,8 @@ impl MitoConfig {
// Sanitize worker num.
let num_workers_before = self.num_workers;
if self.num_workers == 0 {
self.num_workers = DEFAULT_NUM_WORKERS;
self.num_workers = num_cpus::get() / 2;
}
self.num_workers = self.num_workers.next_power_of_two();
if num_workers_before != self.num_workers {
warn!(
"Sanitize worker num {} to {}",
Expand Down
4 changes: 1 addition & 3 deletions src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ impl WorkerGroup {
log_store: Arc<S>,
object_store_manager: ObjectStoreManagerRef,
) -> WorkerGroup {
assert!(config.num_workers.is_power_of_two());
let config = Arc::new(config);
let write_buffer_manager = Arc::new(WriteBufferManagerImpl::new(
config.global_write_buffer_size.as_bytes() as usize,
Expand Down Expand Up @@ -210,7 +209,6 @@ impl WorkerGroup {
write_buffer_manager: Option<WriteBufferManagerRef>,
listener: Option<crate::engine::listener::EventListenerRef>,
) -> WorkerGroup {
assert!(config.num_workers.is_power_of_two());
let config = Arc::new(config);
let write_buffer_manager = write_buffer_manager.unwrap_or_else(|| {
Arc::new(WriteBufferManagerImpl::new(
Expand Down Expand Up @@ -248,7 +246,7 @@ impl WorkerGroup {
}

fn value_to_index(value: usize, num_workers: usize) -> usize {
value & (num_workers - 1)
value % num_workers
}

/// Worker start config.
Expand Down
1 change: 0 additions & 1 deletion src/servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ influxdb_line_protocol = { git = "https://github.com/evenyag/influxdb_iox", bran
itertools.workspace = true
lazy_static.workspace = true
mime_guess = "2.0"
num_cpus = "1.13"
once_cell.workspace = true
openmetrics-parser = "0.4"
opensrv-mysql = "0.4"
Expand Down
1 change: 1 addition & 0 deletions tests-integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ uuid.workspace = true
datafusion-expr.workspace = true
datafusion.workspace = true
itertools.workspace = true
num_cpus = "1.13"
opentelemetry-proto.workspace = true
partition.workspace = true
paste.workspace = true
Expand Down
5 changes: 3 additions & 2 deletions tests-integration/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ auto_flush_interval = "1h"
[[datanode.region_engine]]
[datanode.region_engine.mito]
num_workers = 1
num_workers = {}
worker_channel_size = 128
worker_request_batch_size = 64
manifest_checkpoint_distance = 10
Expand All @@ -754,7 +754,8 @@ enable_jaeger_tracing = false
[logging]
enable_jaeger_tracing = false"#,
store_type
store_type,
num_cpus::get() / 2
);
let body_text = drop_lines_with_inconsistent_results(res_get.text().await);
assert_eq!(body_text, expected_toml_str);
Expand Down

0 comments on commit 6599bb5

Please sign in to comment.