diff --git a/config/config.md b/config/config.md
index 8e1fb5837aec..65d8fa0c16db 100644
--- a/config/config.md
+++ b/config/config.md
@@ -83,7 +83,7 @@
| `wal.backoff_max` | String | `10s` | The maximum backoff delay.
**It's only used when the provider is `kafka`**. |
| `wal.backoff_base` | Integer | `2` | The exponential backoff rate, i.e. next backoff = base * current backoff.
**It's only used when the provider is `kafka`**. |
| `wal.backoff_deadline` | String | `5mins` | The deadline of retries.
**It's only used when the provider is `kafka`**. |
-| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.
**It's only used when the provider is `kafka`**.
This option ensures that when Kafka messages are deleted, the system
can still successfully replay memtable data without throwing an
out-of-range error.
However, enabling this option might lead to unexpected data loss,
as the system will skip over missing entries instead of treating
them as critical errors. |
+| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.
**It's only used when the provider is `kafka`**.
This option ensures that when Kafka messages are deleted, the system
can still successfully replay memtable data without throwing an
out-of-range error.
However, enabling this option might lead to unexpected data loss,
as the system will skip over missing entries instead of treating
them as critical errors. |
| `metadata_store` | -- | -- | Metadata storage options. |
| `metadata_store.file_size` | String | `256MB` | Kv file size in bytes. |
| `metadata_store.purge_threshold` | String | `4GB` | Kv purge threshold. |
@@ -116,7 +116,9 @@
| `region_engine.mito.worker_request_batch_size` | Integer | `64` | Max batch size for a worker to handle requests. |
| `region_engine.mito.manifest_checkpoint_distance` | Integer | `10` | Number of meta action updated to trigger a new checkpoint for the manifest. |
| `region_engine.mito.compress_manifest` | Bool | `false` | Whether to compress manifest and checkpoint file by gzip (default false). |
-| `region_engine.mito.max_background_jobs` | Integer | `4` | Max number of running background jobs |
+| `region_engine.mito.max_background_flushes` | Integer | Auto | Max number of running background flush jobs (default: 1/2 of cpu cores). |
+| `region_engine.mito.max_background_compactions` | Integer | Auto | Max number of running background compaction jobs (default: 1/4 of cpu cores). |
+| `region_engine.mito.max_background_purges` | Integer | Auto | Max number of running background purge jobs (default: number of cpu cores). |
| `region_engine.mito.auto_flush_interval` | String | `1h` | Interval to auto flush a region if it has not flushed yet. |
| `region_engine.mito.global_write_buffer_size` | String | Auto | Global write buffer size for all regions. If not set, it's default to 1/8 of OS memory with a max limitation of 1GB. |
| `region_engine.mito.global_write_buffer_reject_size` | String | Auto | Global write buffer size threshold to reject write requests. If not set, it's default to 2 times of `global_write_buffer_size`. |
@@ -410,7 +412,7 @@
| `wal.backoff_deadline` | String | `5mins` | The deadline of retries.
**It's only used when the provider is `kafka`**. |
| `wal.create_index` | Bool | `true` | Whether to enable WAL index creation.
**It's only used when the provider is `kafka`**. |
| `wal.dump_index_interval` | String | `60s` | The interval for dumping WAL indexes.
**It's only used when the provider is `kafka`**. |
-| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.
**It's only used when the provider is `kafka`**.
This option ensures that when Kafka messages are deleted, the system
can still successfully replay memtable data without throwing an
out-of-range error.
However, enabling this option might lead to unexpected data loss,
as the system will skip over missing entries instead of treating
them as critical errors. |
+| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.
**It's only used when the provider is `kafka`**.
This option ensures that when Kafka messages are deleted, the system
can still successfully replay memtable data without throwing an
out-of-range error.
However, enabling this option might lead to unexpected data loss,
as the system will skip over missing entries instead of treating
them as critical errors. |
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.
- `File`: the data is stored in the local file system.
- `S3`: the data is stored in the S3 object storage.
- `Gcs`: the data is stored in the Google Cloud Storage.
- `Azblob`: the data is stored in the Azure Blob Storage.
- `Oss`: the data is stored in the Aliyun OSS. |
@@ -437,7 +439,9 @@
| `region_engine.mito.worker_request_batch_size` | Integer | `64` | Max batch size for a worker to handle requests. |
| `region_engine.mito.manifest_checkpoint_distance` | Integer | `10` | Number of meta action updated to trigger a new checkpoint for the manifest. |
| `region_engine.mito.compress_manifest` | Bool | `false` | Whether to compress manifest and checkpoint file by gzip (default false). |
-| `region_engine.mito.max_background_jobs` | Integer | `4` | Max number of running background jobs |
+| `region_engine.mito.max_background_flushes` | Integer | Auto | Max number of running background flush jobs (default: 1/2 of cpu cores). |
+| `region_engine.mito.max_background_compactions` | Integer | Auto | Max number of running background compaction jobs (default: 1/4 of cpu cores). |
+| `region_engine.mito.max_background_purges` | Integer | Auto | Max number of running background purge jobs (default: number of cpu cores). |
| `region_engine.mito.auto_flush_interval` | String | `1h` | Interval to auto flush a region if it has not flushed yet. |
| `region_engine.mito.global_write_buffer_size` | String | Auto | Global write buffer size for all regions. If not set, it's default to 1/8 of OS memory with a max limitation of 1GB. |
| `region_engine.mito.global_write_buffer_reject_size` | String | Auto | Global write buffer size threshold to reject write requests. If not set, it's default to 2 times of `global_write_buffer_size` |
diff --git a/config/datanode.example.toml b/config/datanode.example.toml
index 85b7793ef65a..557cd4cef02d 100644
--- a/config/datanode.example.toml
+++ b/config/datanode.example.toml
@@ -215,12 +215,12 @@ dump_index_interval = "60s"
## Ignore missing entries during read WAL.
## **It's only used when the provider is `kafka`**.
-##
-## This option ensures that when Kafka messages are deleted, the system
-## can still successfully replay memtable data without throwing an
-## out-of-range error.
-## However, enabling this option might lead to unexpected data loss,
-## as the system will skip over missing entries instead of treating
+##
+## This option ensures that when Kafka messages are deleted, the system
+## can still successfully replay memtable data without throwing an
+## out-of-range error.
+## However, enabling this option might lead to unexpected data loss,
+## as the system will skip over missing entries instead of treating
## them as critical errors.
overwrite_entry_start_id = false
@@ -416,8 +416,17 @@ manifest_checkpoint_distance = 10
## Whether to compress manifest and checkpoint file by gzip (default false).
compress_manifest = false
-## Max number of running background jobs
-max_background_jobs = 4
+## Max number of running background flush jobs (default: 1/2 of cpu cores).
+## @toml2docs:none-default="Auto"
+#+ max_background_flushes = 4
+
+## Max number of running background compaction jobs (default: 1/4 of cpu cores).
+## @toml2docs:none-default="Auto"
+#+ max_background_compactions = 2
+
+## Max number of running background purge jobs (default: number of cpu cores).
+## @toml2docs:none-default="Auto"
+#+ max_background_purges = 8
## Interval to auto flush a region if it has not flushed yet.
auto_flush_interval = "1h"
diff --git a/config/standalone.example.toml b/config/standalone.example.toml
index 81f4ee47f03b..defd34d8f598 100644
--- a/config/standalone.example.toml
+++ b/config/standalone.example.toml
@@ -239,12 +239,12 @@ backoff_deadline = "5mins"
## Ignore missing entries during read WAL.
## **It's only used when the provider is `kafka`**.
-##
-## This option ensures that when Kafka messages are deleted, the system
-## can still successfully replay memtable data without throwing an
-## out-of-range error.
-## However, enabling this option might lead to unexpected data loss,
-## as the system will skip over missing entries instead of treating
+##
+## This option ensures that when Kafka messages are deleted, the system
+## can still successfully replay memtable data without throwing an
+## out-of-range error.
+## However, enabling this option might lead to unexpected data loss,
+## as the system will skip over missing entries instead of treating
## them as critical errors.
overwrite_entry_start_id = false
@@ -454,8 +454,17 @@ manifest_checkpoint_distance = 10
## Whether to compress manifest and checkpoint file by gzip (default false).
compress_manifest = false
-## Max number of running background jobs
-max_background_jobs = 4
+## Max number of running background flush jobs (default: 1/2 of cpu cores).
+## @toml2docs:none-default="Auto"
+#+ max_background_flushes = 4
+
+## Max number of running background compaction jobs (default: 1/4 of cpu cores).
+## @toml2docs:none-default="Auto"
+#+ max_background_compactions = 2
+
+## Max number of running background purge jobs (default: number of cpu cores).
+## @toml2docs:none-default="Auto"
+#+ max_background_purges = 8
## Interval to auto flush a region if it has not flushed yet.
auto_flush_interval = "1h"
diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs
index d919633ba964..004b2230536d 100644
--- a/src/mito2/src/compaction/compactor.rs
+++ b/src/mito2/src/compaction/compactor.rs
@@ -134,7 +134,7 @@ pub async fn open_compaction_region(
));
let file_purger = {
- let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_jobs));
+ let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_purges));
Arc::new(LocalFilePurger::new(
purge_scheduler.clone(),
access_layer.clone(),
diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs
index 433bc456621a..001d5ffaa824 100644
--- a/src/mito2/src/config.rs
+++ b/src/mito2/src/config.rs
@@ -28,9 +28,6 @@ use crate::error::Result;
use crate::memtable::MemtableConfig;
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
-/// Default max running background job.
-const DEFAULT_MAX_BG_JOB: usize = 4;
-
const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
/// Default channel size for parallel scan task.
const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
@@ -69,8 +66,12 @@ pub struct MitoConfig {
pub compress_manifest: bool,
// Background job configs:
- /// Max number of running background jobs (default 4).
- pub max_background_jobs: usize,
+ /// Max number of running background flush jobs (default: 1/2 of cpu cores).
+ pub max_background_flushes: usize,
+ /// Max number of running background compaction jobs (default: 1/4 of cpu cores).
+ pub max_background_compactions: usize,
+ /// Max number of running background purge jobs (default: number of cpu cores).
+ pub max_background_purges: usize,
// Flush configs:
/// Interval to auto flush a region if it has not flushed yet (default 30 min).
@@ -137,7 +138,9 @@ impl Default for MitoConfig {
worker_request_batch_size: 64,
manifest_checkpoint_distance: 10,
compress_manifest: false,
- max_background_jobs: DEFAULT_MAX_BG_JOB,
+ max_background_flushes: divide_num_cpus(2),
+ max_background_compactions: divide_num_cpus(4),
+ max_background_purges: common_config::utils::get_cpus(),
auto_flush_interval: Duration::from_secs(30 * 60),
global_write_buffer_size: ReadableSize::gb(1),
global_write_buffer_reject_size: ReadableSize::gb(2),
@@ -185,9 +188,26 @@ impl MitoConfig {
self.worker_channel_size = 1;
}
- if self.max_background_jobs == 0 {
- warn!("Sanitize max background jobs 0 to {}", DEFAULT_MAX_BG_JOB);
- self.max_background_jobs = DEFAULT_MAX_BG_JOB;
+ if self.max_background_flushes == 0 {
+ warn!(
+ "Sanitize max background flushes 0 to {}",
+ divide_num_cpus(2)
+ );
+ self.max_background_flushes = divide_num_cpus(2);
+ }
+ if self.max_background_compactions == 0 {
+ warn!(
+ "Sanitize max background compactions 0 to {}",
+ divide_num_cpus(4)
+ );
+ self.max_background_compactions = divide_num_cpus(4);
+ }
+ if self.max_background_purges == 0 {
+ warn!(
+ "Sanitize max background purges 0 to {}",
+ common_config::utils::get_cpus()
+ );
+ self.max_background_purges = common_config::utils::get_cpus();
}
if self.global_write_buffer_reject_size <= self.global_write_buffer_size {
diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs
index 7951331b20d9..9a80d4f84fb7 100644
--- a/src/mito2/src/engine/compaction_test.rs
+++ b/src/mito2/src/engine/compaction_test.rs
@@ -272,7 +272,7 @@ async fn test_readonly_during_compaction() {
.create_engine_with(
MitoConfig {
// Ensure there is only one background worker for purge task.
- max_background_jobs: 1,
+ max_background_purges: 1,
..Default::default()
},
None,
@@ -310,7 +310,7 @@ async fn test_readonly_during_compaction() {
listener.wake();
let notify = Arc::new(Notify::new());
- // We already sets max background jobs to 1, so we can submit a task to the
+ // We already sets max background purges to 1, so we can submit a task to the
// purge scheduler to ensure all purge tasks are finished.
let job_notify = notify.clone();
engine
diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs
index e790ed08c1a9..eb2cea19d2ad 100644
--- a/src/mito2/src/worker.rs
+++ b/src/mito2/src/worker.rs
@@ -114,8 +114,10 @@ pub(crate) const MAX_INITIAL_CHECK_DELAY_SECS: u64 = 60 * 3;
pub(crate) struct WorkerGroup {
/// Workers of the group.
workers: Vec,
- /// Global background job scheduelr.
- scheduler: SchedulerRef,
+ /// Flush background job pool.
+ flush_job_pool: SchedulerRef,
+ /// Compaction background job pool.
+ compact_job_pool: SchedulerRef,
/// Scheduler for file purgers.
purge_scheduler: SchedulerRef,
/// Cache.
@@ -146,10 +148,10 @@ impl WorkerGroup {
let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
.await?
.with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
- let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
+ let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
+ let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
// We use another scheduler to avoid purge jobs blocking other jobs.
- // A purge job is cheaper than other background jobs so they share the same job limit.
- let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
+ let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_purges));
let write_cache = write_cache_from_config(
&config,
object_store_manager.clone(),
@@ -178,7 +180,8 @@ impl WorkerGroup {
log_store: log_store.clone(),
object_store_manager: object_store_manager.clone(),
write_buffer_manager: write_buffer_manager.clone(),
- scheduler: scheduler.clone(),
+ flush_job_pool: flush_job_pool.clone(),
+ compact_job_pool: compact_job_pool.clone(),
purge_scheduler: purge_scheduler.clone(),
listener: WorkerListener::default(),
cache_manager: cache_manager.clone(),
@@ -195,7 +198,8 @@ impl WorkerGroup {
Ok(WorkerGroup {
workers,
- scheduler,
+ flush_job_pool,
+ compact_job_pool,
purge_scheduler,
cache_manager,
})
@@ -205,8 +209,11 @@ impl WorkerGroup {
pub(crate) async fn stop(&self) -> Result<()> {
info!("Stop region worker group");
+ // TODO(yingwen): Do we need to stop gracefully?
// Stops the scheduler gracefully.
- self.scheduler.stop(true).await?;
+ self.compact_job_pool.stop(true).await?;
+ // Stops the scheduler gracefully.
+ self.flush_job_pool.stop(true).await?;
// Stops the purge scheduler gracefully.
self.purge_scheduler.stop(true).await?;
@@ -275,8 +282,9 @@ impl WorkerGroup {
.with_notifier(flush_sender.clone()),
)
});
- let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
- let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
+ let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
+ let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
+ let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_flushes));
let puffin_manager_factory = PuffinManagerFactory::new(
&config.index.aux_path,
config.index.staging_size.as_bytes(),
@@ -310,7 +318,8 @@ impl WorkerGroup {
log_store: log_store.clone(),
object_store_manager: object_store_manager.clone(),
write_buffer_manager: write_buffer_manager.clone(),
- scheduler: scheduler.clone(),
+ flush_job_pool: flush_job_pool.clone(),
+ compact_job_pool: compact_job_pool.clone(),
purge_scheduler: purge_scheduler.clone(),
listener: WorkerListener::new(listener.clone()),
cache_manager: cache_manager.clone(),
@@ -327,7 +336,8 @@ impl WorkerGroup {
Ok(WorkerGroup {
workers,
- scheduler,
+ flush_job_pool,
+ compact_job_pool,
purge_scheduler,
cache_manager,
})
@@ -382,7 +392,8 @@ struct WorkerStarter {
log_store: Arc,
object_store_manager: ObjectStoreManagerRef,
write_buffer_manager: WriteBufferManagerRef,
- scheduler: SchedulerRef,
+ compact_job_pool: SchedulerRef,
+ flush_job_pool: SchedulerRef,
purge_scheduler: SchedulerRef,
listener: WorkerListener,
cache_manager: CacheManagerRef,
@@ -423,9 +434,9 @@ impl WorkerStarter {
),
purge_scheduler: self.purge_scheduler.clone(),
write_buffer_manager: self.write_buffer_manager,
- flush_scheduler: FlushScheduler::new(self.scheduler.clone()),
+ flush_scheduler: FlushScheduler::new(self.flush_job_pool),
compaction_scheduler: CompactionScheduler::new(
- self.scheduler,
+ self.compact_job_pool,
sender.clone(),
self.cache_manager.clone(),
self.config,
diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs
index c426af4f5f6a..823de40d1124 100644
--- a/tests-integration/tests/http.rs
+++ b/tests-integration/tests/http.rs
@@ -866,7 +866,6 @@ worker_channel_size = 128
worker_request_batch_size = 64
manifest_checkpoint_distance = 10
compress_manifest = false
-max_background_jobs = 4
auto_flush_interval = "30m"
enable_experimental_write_cache = false
experimental_write_cache_path = ""
@@ -939,6 +938,9 @@ fn drop_lines_with_inconsistent_results(input: String) -> String {
"content_cache_size =",
"name =",
"recovery_parallelism =",
+ "max_background_flushes =",
+ "max_background_compactions =",
+ "max_background_purges =",
];
input