diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 8376b76f6888..7793e148ac4c 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -105,6 +105,9 @@ global_write_buffer_reject_size = "2GB" sst_meta_cache_size = "128MB" # Cache size for vectors and arrow arrays (default 512MB). Setting it to 0 to disable the cache. vector_cache_size = "512MB" +# Buffer size for SST writing. +sst_write_buffer_size = "8MB" + # Log options # [logging] diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index da2466497bbc..0be8b08c63f4 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -191,7 +191,6 @@ mod tests { use std::io::Write; use std::time::Duration; - use common_base::readable_size::ReadableSize; use common_test_util::temp_dir::create_named_temp_file; use datanode::config::{CompactionConfig, FileConfig, ObjectStoreConfig, RegionManifestConfig}; use servers::heartbeat_options::HeartbeatOptions; @@ -300,7 +299,6 @@ mod tests { max_inflight_tasks: 3, max_files_in_level0: 7, max_purge_tasks: 32, - sst_write_buffer_size: ReadableSize::mb(8), }, options.storage.compaction, ); diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index 5730ce9c85af..739d451d95e8 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -250,8 +250,6 @@ pub struct CompactionConfig { pub max_files_in_level0: usize, /// Max task number for SST purge task after compaction. pub max_purge_tasks: usize, - /// Buffer threshold while writing SST files - pub sst_write_buffer_size: ReadableSize, } impl Default for CompactionConfig { @@ -260,7 +258,6 @@ impl Default for CompactionConfig { max_inflight_tasks: 4, max_files_in_level0: 8, max_purge_tasks: 32, - sst_write_buffer_size: ReadableSize::mb(8), } } } @@ -312,7 +309,6 @@ impl From<&DatanodeOptions> for StorageEngineConfig { manifest_gc_duration: value.storage.manifest.gc_duration, max_files_in_l0: value.storage.compaction.max_files_in_level0, max_purge_tasks: value.storage.compaction.max_purge_tasks, - sst_write_buffer_size: value.storage.compaction.sst_write_buffer_size, max_flush_tasks: value.storage.flush.max_flush_tasks, region_write_buffer_size: value.storage.flush.region_write_buffer_size, picker_schedule_interval: value.storage.flush.picker_schedule_interval, diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index a3c830005013..3f009b667e37 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -22,6 +22,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Instant; +use common_base::readable_size::ReadableSize; use common_telemetry::{debug, error}; pub use picker::CompactionPickerRef; use snafu::ResultExt; @@ -30,6 +31,7 @@ use tokio::sync::mpsc::{self, Sender}; use crate::access_layer::AccessLayerRef; use crate::compaction::twcs::TwcsPicker; +use crate::config::MitoConfig; use crate::error::{ CompactRegionSnafu, Error, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result, }; @@ -51,6 +53,8 @@ pub struct CompactionRequest { pub(crate) file_purger: FilePurgerRef, /// Start time of compaction task. pub(crate) start_time: Instant, + /// Buffering threshold while writing SST files. + pub(crate) sst_write_buffer_size: ReadableSize, } impl CompactionRequest { @@ -103,6 +107,7 @@ impl CompactionScheduler { access_layer: &AccessLayerRef, file_purger: &FilePurgerRef, waiter: OptionOutputTx, + engine_config: Arc, ) -> Result<()> { if let Some(status) = self.region_status.get_mut(®ion_id) { // Region is compacting. Add the waiter to pending list. @@ -117,19 +122,27 @@ impl CompactionScheduler { access_layer.clone(), file_purger.clone(), ); - let request = status.new_compaction_request(self.request_sender.clone(), waiter); + let request = + status.new_compaction_request(self.request_sender.clone(), waiter, engine_config); self.region_status.insert(region_id, status); self.schedule_compaction_request(request) } /// Notifies the scheduler that the compaction job is finished successfully. - pub(crate) fn on_compaction_finished(&mut self, region_id: RegionId) { + pub(crate) fn on_compaction_finished( + &mut self, + region_id: RegionId, + engine_config: Arc, + ) { let Some(status) = self.region_status.get_mut(®ion_id) else { return; }; // We should always try to compact the region until picker returns None. - let request = - status.new_compaction_request(self.request_sender.clone(), OptionOutputTx::none()); + let request = status.new_compaction_request( + self.request_sender.clone(), + OptionOutputTx::none(), + engine_config, + ); // Try to schedule next compaction task for this region. if let Err(e) = self.schedule_compaction_request(request) { error!(e; "Failed to schedule next compaction for region {}", region_id); @@ -138,7 +151,7 @@ impl CompactionScheduler { /// Notifies the scheduler that the compaction job is failed. pub(crate) fn on_compaction_failed(&mut self, region_id: RegionId, err: Arc) { - error!(err; "Region {} failed to flush, cancel all pending tasks", region_id); + error!(err; "Region {} failed to compact, cancel all pending tasks", region_id); // Remove this region. let Some(status) = self.region_status.remove(®ion_id) else { return; @@ -236,7 +249,7 @@ impl PendingCompaction { } } - /// Send flush error to waiter. + /// Send compaction error to waiter. fn on_failure(&mut self, region_id: RegionId, err: Arc) { for waiter in self.waiters.drain(..) { waiter.send(Err(err.clone()).context(CompactRegionSnafu { region_id })); @@ -300,6 +313,7 @@ impl CompactionStatus { &mut self, request_sender: Sender, waiter: OptionOutputTx, + engine_config: Arc, ) -> CompactionRequest { let current_version = self.version_control.current().version; let start_time = Instant::now(); @@ -310,6 +324,7 @@ impl CompactionStatus { waiters: Vec::new(), file_purger: self.file_purger.clone(), start_time, + sst_write_buffer_size: engine_config.sst_write_buffer_size, }; if let Some(pending) = self.pending_compaction.take() { @@ -352,6 +367,7 @@ mod tests { &env.access_layer, &purger, waiter, + Arc::new(MitoConfig::default()), ) .unwrap(); let output = output_rx.await.unwrap().unwrap(); @@ -369,6 +385,7 @@ mod tests { &env.access_layer, &purger, waiter, + Arc::new(MitoConfig::default()), ) .unwrap(); let output = output_rx.await.unwrap().unwrap(); @@ -427,6 +444,7 @@ mod tests { &env.access_layer, &purger, OptionOutputTx::none(), + Arc::new(MitoConfig::default()), ) .unwrap(); // Should schedule 1 compaction. @@ -454,6 +472,7 @@ mod tests { &env.access_layer, &purger, OptionOutputTx::none(), + Arc::new(MitoConfig::default()), ) .unwrap(); assert_eq!(1, scheduler.region_status.len()); @@ -466,7 +485,7 @@ mod tests { .is_some()); // On compaction finished and schedule next compaction. - scheduler.on_compaction_finished(region_id); + scheduler.on_compaction_finished(region_id, Arc::new(MitoConfig::default())); assert_eq!(1, scheduler.region_status.len()); assert_eq!(2, job_scheduler.num_jobs()); // 5 files for next compaction. @@ -484,6 +503,7 @@ mod tests { &env.access_layer, &purger, OptionOutputTx::none(), + Arc::new(MitoConfig::default()), ) .unwrap(); assert_eq!(2, job_scheduler.num_jobs()); diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 92df30e92f8c..e3bae3acfb4a 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -120,6 +120,7 @@ impl Picker for TwcsPicker { waiters, file_purger, start_time, + sst_write_buffer_size, } = req; let region_metadata = current_version.metadata.clone(); @@ -167,7 +168,7 @@ impl Picker for TwcsPicker { sst_layer: access_layer, outputs, expired_ssts, - sst_write_buffer_size: ReadableSize::mb(4), + sst_write_buffer_size, compaction_time_window: Some(time_window_size), request_sender, waiters, @@ -355,7 +356,7 @@ impl CompactionTask for TwcsCompactionTask { Ok((added, deleted)) => { info!( "Compacted SST files, input: {:?}, output: {:?}, window: {:?}", - added, deleted, self.compaction_time_window + deleted, added, self.compaction_time_window ); BackgroundNotify::CompactionFinished(CompactionFinished { diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 09b33a886bde..5350c2c8e1a0 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -26,6 +26,8 @@ const DEFAULT_NUM_WORKERS: usize = 1; /// Default max running background job. const DEFAULT_MAX_BG_JOB: usize = 4; +const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5); + /// Configuration for [MitoEngine](crate::engine::MitoEngine). #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] #[serde(default)] @@ -63,6 +65,8 @@ pub struct MitoConfig { pub sst_meta_cache_size: ReadableSize, /// Cache size for vectors and arrow arrays (default 512MB). Setting it to 0 to disable the cache. pub vector_cache_size: ReadableSize, + /// Buffer size for SST writing. + pub sst_write_buffer_size: ReadableSize, } impl Default for MitoConfig { @@ -79,6 +83,7 @@ impl Default for MitoConfig { global_write_buffer_reject_size: ReadableSize::gb(2), sst_meta_cache_size: ReadableSize::mb(128), vector_cache_size: ReadableSize::mb(512), + sst_write_buffer_size: ReadableSize::mb(8), } } } @@ -117,5 +122,13 @@ impl MitoConfig { self.global_write_buffer_reject_size ); } + + if self.sst_write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE { + self.sst_write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE; + warn!( + "Sanitize sst write buffer size to {}", + self.sst_write_buffer_size + ); + } } } diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 8bbbc6c94c54..504123bfd9a1 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -26,6 +26,7 @@ use strum::IntoStaticStr; use tokio::sync::mpsc; use crate::access_layer::AccessLayerRef; +use crate::config::MitoConfig; use crate::error::{ Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result, }; @@ -198,6 +199,7 @@ pub(crate) struct RegionFlushTask { pub(crate) memtable_builder: MemtableBuilderRef, pub(crate) file_purger: FilePurgerRef, pub(crate) listener: WorkerListener, + pub(crate) engine_config: Arc, pub(crate) row_group_size: Option, } @@ -289,8 +291,10 @@ impl RegionFlushTask { .with_label_values(&["flush_memtables"]) .start_timer(); - // TODO(yingwen): Make it configurable. - let mut write_opts = WriteOptions::default(); + let mut write_opts = WriteOptions { + write_buffer_size: self.engine_config.sst_write_buffer_size, + ..Default::default() + }; if let Some(row_group_size) = self.row_group_size { write_opts.row_group_size = row_group_size; } @@ -723,6 +727,7 @@ mod tests { memtable_builder: builder.memtable_builder(), file_purger: builder.file_purger(), listener: WorkerListener::default(), + engine_config: Arc::new(MitoConfig::default()), row_group_size: None, }; task.push_sender(OptionOutputTx::from(output_tx)); diff --git a/src/mito2/src/worker/handle_alter.rs b/src/mito2/src/worker/handle_alter.rs index 0231217efb0d..693235e90f9f 100644 --- a/src/mito2/src/worker/handle_alter.rs +++ b/src/mito2/src/worker/handle_alter.rs @@ -80,7 +80,7 @@ impl RegionWorkerLoop { info!("Flush region: {} before alteration", region_id); // Try to submit a flush task. - let task = self.new_flush_task(®ion, FlushReason::Alter, None); + let task = self.new_flush_task(®ion, FlushReason::Alter, None, self.config.clone()); if let Err(e) = self.flush_scheduler .schedule_flush(region.region_id, ®ion.version_control, task) diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index 58fd714c30d6..c00eaa865417 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -38,6 +38,7 @@ impl RegionWorkerLoop { ®ion.access_layer, ®ion.file_purger, sender, + self.config.clone(), ) { error!(e; "Failed to schedule compaction task for region: {}", region_id); } else { @@ -86,8 +87,10 @@ impl RegionWorkerLoop { } // compaction finished. request.on_success(); + // Schedule next compaction if necessary. - self.compaction_scheduler.on_compaction_finished(region_id); + self.compaction_scheduler + .on_compaction_finished(region_id, self.config.clone()); } /// When compaction fails, we simply log the error. diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 24ffce1ce21c..8585adf31fae 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -14,12 +14,15 @@ //! Handling flush related requests. +use std::sync::Arc; + use common_telemetry::{error, info, warn}; use common_time::util::current_time_millis; use store_api::logstore::LogStore; use store_api::region_request::RegionFlushRequest; use store_api::storage::RegionId; +use crate::config::MitoConfig; use crate::error::{RegionTruncatedSnafu, Result}; use crate::flush::{FlushReason, RegionFlushTask}; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; @@ -39,7 +42,12 @@ impl RegionWorkerLoop { return; }; - let mut task = self.new_flush_task(®ion, FlushReason::Manual, request.row_group_size); + let mut task = self.new_flush_task( + ®ion, + FlushReason::Manual, + request.row_group_size, + self.config.clone(), + ); task.push_sender(sender); if let Err(e) = self.flush_scheduler @@ -94,7 +102,8 @@ impl RegionWorkerLoop { if region.last_flush_millis() < min_last_flush_time { // If flush time of this region is earlier than `min_last_flush_time`, we can flush this region. - let task = self.new_flush_task(region, FlushReason::EngineFull, None); + let task = + self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone()); self.flush_scheduler.schedule_flush( region.region_id, ®ion.version_control, @@ -107,7 +116,8 @@ impl RegionWorkerLoop { // TODO(yingwen): Maybe flush more tables to reduce write buffer size. if let Some(region) = max_mem_region { if !self.flush_scheduler.is_flush_requested(region.region_id) { - let task = self.new_flush_task(region, FlushReason::EngineFull, None); + let task = + self.new_flush_task(region, FlushReason::EngineFull, None, self.config.clone()); self.flush_scheduler.schedule_flush( region.region_id, ®ion.version_control, @@ -125,6 +135,7 @@ impl RegionWorkerLoop { region: &MitoRegionRef, reason: FlushReason, row_group_size: Option, + engine_config: Arc, ) -> RegionFlushTask { // TODO(yingwen): metrics for flush requested. RegionFlushTask { @@ -136,6 +147,7 @@ impl RegionWorkerLoop { memtable_builder: self.memtable_builder.clone(), file_purger: region.file_purger.clone(), listener: self.listener.clone(), + engine_config, row_group_size, } } @@ -220,6 +232,7 @@ impl RegionWorkerLoop { ®ion.access_layer, ®ion.file_purger, OptionOutputTx::none(), + self.config.clone(), ) { warn!( "Failed to schedule compaction after flush, region: {}, err: {}", diff --git a/src/storage/src/config.rs b/src/storage/src/config.rs index 6fa4b84cfcfc..73d75dd24594 100644 --- a/src/storage/src/config.rs +++ b/src/storage/src/config.rs @@ -34,7 +34,6 @@ pub struct EngineConfig { pub manifest_gc_duration: Option, pub max_files_in_l0: usize, pub max_purge_tasks: usize, - pub sst_write_buffer_size: ReadableSize, /// Max inflight flush tasks. pub max_flush_tasks: usize, /// Default write buffer size for a region. @@ -59,7 +58,6 @@ impl Default for EngineConfig { manifest_gc_duration: Some(Duration::from_secs(30)), max_files_in_l0: 8, max_purge_tasks: 32, - sst_write_buffer_size: ReadableSize::mb(8), max_flush_tasks: DEFAULT_MAX_FLUSH_TASKS, region_write_buffer_size: DEFAULT_REGION_WRITE_BUFFER_SIZE, picker_schedule_interval: Duration::from_millis( diff --git a/src/storage/src/flush.rs b/src/storage/src/flush.rs index 44f60624721c..9aa82c4154e0 100644 --- a/src/storage/src/flush.rs +++ b/src/storage/src/flush.rs @@ -18,6 +18,7 @@ mod scheduler; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use common_base::readable_size::ReadableSize; use common_telemetry::logging; pub use picker::{FlushPicker, PickerConfig}; pub use scheduler::{ @@ -269,7 +270,7 @@ impl FlushJob { let iter = m.iter(iter_ctx.clone())?; let sst_layer = self.sst_layer.clone(); let write_options = WriteOptions { - sst_write_buffer_size: self.engine_config.sst_write_buffer_size, + sst_write_buffer_size: ReadableSize::mb(8), // deprecated usage }; futures.push(async move { Ok(sst_layer diff --git a/src/storage/src/flush/scheduler.rs b/src/storage/src/flush/scheduler.rs index a6fa575ce93b..8d03ed6af68e 100644 --- a/src/storage/src/flush/scheduler.rs +++ b/src/storage/src/flush/scheduler.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; +use common_base::readable_size::ReadableSize; use common_runtime::{RepeatedTask, TaskFunction}; use common_telemetry::logging; use snafu::{ensure, ResultExt}; @@ -147,7 +148,7 @@ impl From<&FlushRegionRequest> for CompactionRequestImpl { compaction_time_window: req.compaction_time_window, sender: None, picker: req.compaction_picker.clone(), - sst_write_buffer_size: req.engine_config.sst_write_buffer_size, + sst_write_buffer_size: ReadableSize::mb(8), // deprecated usage // compaction triggered by flush always reschedules reschedule_on_finish: true, } diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index edd2e4cc462f..a14ada258898 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -374,7 +374,7 @@ where let mut inner = self.inner.lock().await; ensure!(!inner.is_closed(), error::ClosedRegionSnafu); - let sst_write_buffer_size = inner.engine_config.sst_write_buffer_size; + let sst_write_buffer_size = ReadableSize::mb(8); // deprecated usage inner .manual_compact( diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index cb697a7661a2..e9af72ef9e70 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -706,7 +706,6 @@ type = "{}" max_inflight_tasks = 4 max_files_in_level0 = 8 max_purge_tasks = 32 -sst_write_buffer_size = "8MiB" [datanode.storage.manifest] checkpoint_margin = 10 @@ -733,6 +732,7 @@ global_write_buffer_size = "1GiB" global_write_buffer_reject_size = "2GiB" sst_meta_cache_size = "128MiB" vector_cache_size = "512MiB" +sst_write_buffer_size = "8MiB" [[datanode.region_engine]]