From 1008af5324919fef3af1c2eaaa60c30758e46321 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Sat, 26 Oct 2024 07:36:16 +0800 Subject: [PATCH] feat!: Divide flush and compaction job pool (#4871) * feat: divide flush/compact job pool * feat!: divide bg jobs config * docs: update config examples * test: fix tests --- config/config.md | 12 +++++--- config/datanode.example.toml | 25 ++++++++++----- config/standalone.example.toml | 25 ++++++++++----- src/mito2/src/compaction/compactor.rs | 2 +- src/mito2/src/config.rs | 38 +++++++++++++++++------ src/mito2/src/engine/compaction_test.rs | 4 +-- src/mito2/src/worker.rs | 41 ++++++++++++++++--------- tests-integration/tests/http.rs | 4 ++- 8 files changed, 103 insertions(+), 48 deletions(-) diff --git a/config/config.md b/config/config.md index 8e1fb5837aec..65d8fa0c16db 100644 --- a/config/config.md +++ b/config/config.md @@ -83,7 +83,7 @@ | `wal.backoff_max` | String | `10s` | The maximum backoff delay.
**It's only used when the provider is `kafka`**. | | `wal.backoff_base` | Integer | `2` | The exponential backoff rate, i.e. next backoff = base * current backoff.
**It's only used when the provider is `kafka`**. | | `wal.backoff_deadline` | String | `5mins` | The deadline of retries.
**It's only used when the provider is `kafka`**. | -| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.
**It's only used when the provider is `kafka`**.

This option ensures that when Kafka messages are deleted, the system
can still successfully replay memtable data without throwing an
out-of-range error.
However, enabling this option might lead to unexpected data loss,
as the system will skip over missing entries instead of treating
them as critical errors. | +| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.
**It's only used when the provider is `kafka`**.

This option ensures that when Kafka messages are deleted, the system
can still successfully replay memtable data without throwing an
out-of-range error.
However, enabling this option might lead to unexpected data loss,
as the system will skip over missing entries instead of treating
them as critical errors. | | `metadata_store` | -- | -- | Metadata storage options. | | `metadata_store.file_size` | String | `256MB` | Kv file size in bytes. | | `metadata_store.purge_threshold` | String | `4GB` | Kv purge threshold. | @@ -116,7 +116,9 @@ | `region_engine.mito.worker_request_batch_size` | Integer | `64` | Max batch size for a worker to handle requests. | | `region_engine.mito.manifest_checkpoint_distance` | Integer | `10` | Number of meta action updated to trigger a new checkpoint for the manifest. | | `region_engine.mito.compress_manifest` | Bool | `false` | Whether to compress manifest and checkpoint file by gzip (default false). | -| `region_engine.mito.max_background_jobs` | Integer | `4` | Max number of running background jobs | +| `region_engine.mito.max_background_flushes` | Integer | Auto | Max number of running background flush jobs (default: 1/2 of cpu cores). | +| `region_engine.mito.max_background_compactions` | Integer | Auto | Max number of running background compaction jobs (default: 1/4 of cpu cores). | +| `region_engine.mito.max_background_purges` | Integer | Auto | Max number of running background purge jobs (default: number of cpu cores). | | `region_engine.mito.auto_flush_interval` | String | `1h` | Interval to auto flush a region if it has not flushed yet. | | `region_engine.mito.global_write_buffer_size` | String | Auto | Global write buffer size for all regions. If not set, it's default to 1/8 of OS memory with a max limitation of 1GB. | | `region_engine.mito.global_write_buffer_reject_size` | String | Auto | Global write buffer size threshold to reject write requests. If not set, it's default to 2 times of `global_write_buffer_size`. | @@ -410,7 +412,7 @@ | `wal.backoff_deadline` | String | `5mins` | The deadline of retries.
**It's only used when the provider is `kafka`**. | | `wal.create_index` | Bool | `true` | Whether to enable WAL index creation.
**It's only used when the provider is `kafka`**. | | `wal.dump_index_interval` | String | `60s` | The interval for dumping WAL indexes.
**It's only used when the provider is `kafka`**. | -| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.
**It's only used when the provider is `kafka`**.

This option ensures that when Kafka messages are deleted, the system
can still successfully replay memtable data without throwing an
out-of-range error.
However, enabling this option might lead to unexpected data loss,
as the system will skip over missing entries instead of treating
them as critical errors. | +| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.
**It's only used when the provider is `kafka`**.

This option ensures that when Kafka messages are deleted, the system
can still successfully replay memtable data without throwing an
out-of-range error.
However, enabling this option might lead to unexpected data loss,
as the system will skip over missing entries instead of treating
them as critical errors. | | `storage` | -- | -- | The data storage options. | | `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. | | `storage.type` | String | `File` | The storage type used to store the data.
- `File`: the data is stored in the local file system.
- `S3`: the data is stored in the S3 object storage.
- `Gcs`: the data is stored in the Google Cloud Storage.
- `Azblob`: the data is stored in the Azure Blob Storage.
- `Oss`: the data is stored in the Aliyun OSS. | @@ -437,7 +439,9 @@ | `region_engine.mito.worker_request_batch_size` | Integer | `64` | Max batch size for a worker to handle requests. | | `region_engine.mito.manifest_checkpoint_distance` | Integer | `10` | Number of meta action updated to trigger a new checkpoint for the manifest. | | `region_engine.mito.compress_manifest` | Bool | `false` | Whether to compress manifest and checkpoint file by gzip (default false). | -| `region_engine.mito.max_background_jobs` | Integer | `4` | Max number of running background jobs | +| `region_engine.mito.max_background_flushes` | Integer | Auto | Max number of running background flush jobs (default: 1/2 of cpu cores). | +| `region_engine.mito.max_background_compactions` | Integer | Auto | Max number of running background compaction jobs (default: 1/4 of cpu cores). | +| `region_engine.mito.max_background_purges` | Integer | Auto | Max number of running background purge jobs (default: number of cpu cores). | | `region_engine.mito.auto_flush_interval` | String | `1h` | Interval to auto flush a region if it has not flushed yet. | | `region_engine.mito.global_write_buffer_size` | String | Auto | Global write buffer size for all regions. If not set, it's default to 1/8 of OS memory with a max limitation of 1GB. | | `region_engine.mito.global_write_buffer_reject_size` | String | Auto | Global write buffer size threshold to reject write requests. If not set, it's default to 2 times of `global_write_buffer_size` | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 85b7793ef65a..557cd4cef02d 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -215,12 +215,12 @@ dump_index_interval = "60s" ## Ignore missing entries during read WAL. ## **It's only used when the provider is `kafka`**. -## -## This option ensures that when Kafka messages are deleted, the system -## can still successfully replay memtable data without throwing an -## out-of-range error. -## However, enabling this option might lead to unexpected data loss, -## as the system will skip over missing entries instead of treating +## +## This option ensures that when Kafka messages are deleted, the system +## can still successfully replay memtable data without throwing an +## out-of-range error. +## However, enabling this option might lead to unexpected data loss, +## as the system will skip over missing entries instead of treating ## them as critical errors. overwrite_entry_start_id = false @@ -416,8 +416,17 @@ manifest_checkpoint_distance = 10 ## Whether to compress manifest and checkpoint file by gzip (default false). compress_manifest = false -## Max number of running background jobs -max_background_jobs = 4 +## Max number of running background flush jobs (default: 1/2 of cpu cores). +## @toml2docs:none-default="Auto" +#+ max_background_flushes = 4 + +## Max number of running background compaction jobs (default: 1/4 of cpu cores). +## @toml2docs:none-default="Auto" +#+ max_background_compactions = 2 + +## Max number of running background purge jobs (default: number of cpu cores). +## @toml2docs:none-default="Auto" +#+ max_background_purges = 8 ## Interval to auto flush a region if it has not flushed yet. auto_flush_interval = "1h" diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 81f4ee47f03b..defd34d8f598 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -239,12 +239,12 @@ backoff_deadline = "5mins" ## Ignore missing entries during read WAL. ## **It's only used when the provider is `kafka`**. -## -## This option ensures that when Kafka messages are deleted, the system -## can still successfully replay memtable data without throwing an -## out-of-range error. -## However, enabling this option might lead to unexpected data loss, -## as the system will skip over missing entries instead of treating +## +## This option ensures that when Kafka messages are deleted, the system +## can still successfully replay memtable data without throwing an +## out-of-range error. +## However, enabling this option might lead to unexpected data loss, +## as the system will skip over missing entries instead of treating ## them as critical errors. overwrite_entry_start_id = false @@ -454,8 +454,17 @@ manifest_checkpoint_distance = 10 ## Whether to compress manifest and checkpoint file by gzip (default false). compress_manifest = false -## Max number of running background jobs -max_background_jobs = 4 +## Max number of running background flush jobs (default: 1/2 of cpu cores). +## @toml2docs:none-default="Auto" +#+ max_background_flushes = 4 + +## Max number of running background compaction jobs (default: 1/4 of cpu cores). +## @toml2docs:none-default="Auto" +#+ max_background_compactions = 2 + +## Max number of running background purge jobs (default: number of cpu cores). +## @toml2docs:none-default="Auto" +#+ max_background_purges = 8 ## Interval to auto flush a region if it has not flushed yet. auto_flush_interval = "1h" diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index d919633ba964..004b2230536d 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -134,7 +134,7 @@ pub async fn open_compaction_region( )); let file_purger = { - let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_jobs)); + let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_purges)); Arc::new(LocalFilePurger::new( purge_scheduler.clone(), access_layer.clone(), diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 433bc456621a..001d5ffaa824 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -28,9 +28,6 @@ use crate::error::Result; use crate::memtable::MemtableConfig; use crate::sst::DEFAULT_WRITE_BUFFER_SIZE; -/// Default max running background job. -const DEFAULT_MAX_BG_JOB: usize = 4; - const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5); /// Default channel size for parallel scan task. const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32; @@ -69,8 +66,12 @@ pub struct MitoConfig { pub compress_manifest: bool, // Background job configs: - /// Max number of running background jobs (default 4). - pub max_background_jobs: usize, + /// Max number of running background flush jobs (default: 1/2 of cpu cores). + pub max_background_flushes: usize, + /// Max number of running background compaction jobs (default: 1/4 of cpu cores). + pub max_background_compactions: usize, + /// Max number of running background purge jobs (default: number of cpu cores). + pub max_background_purges: usize, // Flush configs: /// Interval to auto flush a region if it has not flushed yet (default 30 min). @@ -137,7 +138,9 @@ impl Default for MitoConfig { worker_request_batch_size: 64, manifest_checkpoint_distance: 10, compress_manifest: false, - max_background_jobs: DEFAULT_MAX_BG_JOB, + max_background_flushes: divide_num_cpus(2), + max_background_compactions: divide_num_cpus(4), + max_background_purges: common_config::utils::get_cpus(), auto_flush_interval: Duration::from_secs(30 * 60), global_write_buffer_size: ReadableSize::gb(1), global_write_buffer_reject_size: ReadableSize::gb(2), @@ -185,9 +188,26 @@ impl MitoConfig { self.worker_channel_size = 1; } - if self.max_background_jobs == 0 { - warn!("Sanitize max background jobs 0 to {}", DEFAULT_MAX_BG_JOB); - self.max_background_jobs = DEFAULT_MAX_BG_JOB; + if self.max_background_flushes == 0 { + warn!( + "Sanitize max background flushes 0 to {}", + divide_num_cpus(2) + ); + self.max_background_flushes = divide_num_cpus(2); + } + if self.max_background_compactions == 0 { + warn!( + "Sanitize max background compactions 0 to {}", + divide_num_cpus(4) + ); + self.max_background_compactions = divide_num_cpus(4); + } + if self.max_background_purges == 0 { + warn!( + "Sanitize max background purges 0 to {}", + common_config::utils::get_cpus() + ); + self.max_background_purges = common_config::utils::get_cpus(); } if self.global_write_buffer_reject_size <= self.global_write_buffer_size { diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index 7951331b20d9..9a80d4f84fb7 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -272,7 +272,7 @@ async fn test_readonly_during_compaction() { .create_engine_with( MitoConfig { // Ensure there is only one background worker for purge task. - max_background_jobs: 1, + max_background_purges: 1, ..Default::default() }, None, @@ -310,7 +310,7 @@ async fn test_readonly_during_compaction() { listener.wake(); let notify = Arc::new(Notify::new()); - // We already sets max background jobs to 1, so we can submit a task to the + // We already sets max background purges to 1, so we can submit a task to the // purge scheduler to ensure all purge tasks are finished. let job_notify = notify.clone(); engine diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index e790ed08c1a9..eb2cea19d2ad 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -114,8 +114,10 @@ pub(crate) const MAX_INITIAL_CHECK_DELAY_SECS: u64 = 60 * 3; pub(crate) struct WorkerGroup { /// Workers of the group. workers: Vec, - /// Global background job scheduelr. - scheduler: SchedulerRef, + /// Flush background job pool. + flush_job_pool: SchedulerRef, + /// Compaction background job pool. + compact_job_pool: SchedulerRef, /// Scheduler for file purgers. purge_scheduler: SchedulerRef, /// Cache. @@ -146,10 +148,10 @@ impl WorkerGroup { let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path) .await? .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _)); - let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs)); + let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes)); + let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions)); // We use another scheduler to avoid purge jobs blocking other jobs. - // A purge job is cheaper than other background jobs so they share the same job limit. - let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs)); + let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_purges)); let write_cache = write_cache_from_config( &config, object_store_manager.clone(), @@ -178,7 +180,8 @@ impl WorkerGroup { log_store: log_store.clone(), object_store_manager: object_store_manager.clone(), write_buffer_manager: write_buffer_manager.clone(), - scheduler: scheduler.clone(), + flush_job_pool: flush_job_pool.clone(), + compact_job_pool: compact_job_pool.clone(), purge_scheduler: purge_scheduler.clone(), listener: WorkerListener::default(), cache_manager: cache_manager.clone(), @@ -195,7 +198,8 @@ impl WorkerGroup { Ok(WorkerGroup { workers, - scheduler, + flush_job_pool, + compact_job_pool, purge_scheduler, cache_manager, }) @@ -205,8 +209,11 @@ impl WorkerGroup { pub(crate) async fn stop(&self) -> Result<()> { info!("Stop region worker group"); + // TODO(yingwen): Do we need to stop gracefully? // Stops the scheduler gracefully. - self.scheduler.stop(true).await?; + self.compact_job_pool.stop(true).await?; + // Stops the scheduler gracefully. + self.flush_job_pool.stop(true).await?; // Stops the purge scheduler gracefully. self.purge_scheduler.stop(true).await?; @@ -275,8 +282,9 @@ impl WorkerGroup { .with_notifier(flush_sender.clone()), ) }); - let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs)); - let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs)); + let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes)); + let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions)); + let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_flushes)); let puffin_manager_factory = PuffinManagerFactory::new( &config.index.aux_path, config.index.staging_size.as_bytes(), @@ -310,7 +318,8 @@ impl WorkerGroup { log_store: log_store.clone(), object_store_manager: object_store_manager.clone(), write_buffer_manager: write_buffer_manager.clone(), - scheduler: scheduler.clone(), + flush_job_pool: flush_job_pool.clone(), + compact_job_pool: compact_job_pool.clone(), purge_scheduler: purge_scheduler.clone(), listener: WorkerListener::new(listener.clone()), cache_manager: cache_manager.clone(), @@ -327,7 +336,8 @@ impl WorkerGroup { Ok(WorkerGroup { workers, - scheduler, + flush_job_pool, + compact_job_pool, purge_scheduler, cache_manager, }) @@ -382,7 +392,8 @@ struct WorkerStarter { log_store: Arc, object_store_manager: ObjectStoreManagerRef, write_buffer_manager: WriteBufferManagerRef, - scheduler: SchedulerRef, + compact_job_pool: SchedulerRef, + flush_job_pool: SchedulerRef, purge_scheduler: SchedulerRef, listener: WorkerListener, cache_manager: CacheManagerRef, @@ -423,9 +434,9 @@ impl WorkerStarter { ), purge_scheduler: self.purge_scheduler.clone(), write_buffer_manager: self.write_buffer_manager, - flush_scheduler: FlushScheduler::new(self.scheduler.clone()), + flush_scheduler: FlushScheduler::new(self.flush_job_pool), compaction_scheduler: CompactionScheduler::new( - self.scheduler, + self.compact_job_pool, sender.clone(), self.cache_manager.clone(), self.config, diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index c426af4f5f6a..823de40d1124 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -866,7 +866,6 @@ worker_channel_size = 128 worker_request_batch_size = 64 manifest_checkpoint_distance = 10 compress_manifest = false -max_background_jobs = 4 auto_flush_interval = "30m" enable_experimental_write_cache = false experimental_write_cache_path = "" @@ -939,6 +938,9 @@ fn drop_lines_with_inconsistent_results(input: String) -> String { "content_cache_size =", "name =", "recovery_parallelism =", + "max_background_flushes =", + "max_background_compactions =", + "max_background_purges =", ]; input