Skip to content

Commit

Permalink
fix: unify all sst_write_buffer_size usage (#2712)
Browse files Browse the repository at this point in the history
* fix: unify all sst_write_buffer_size usage

* fix: some CR comments

* fix: logs
  • Loading branch information
v0y4g3r authored Nov 9, 2023
1 parent 0cd6dac commit b53537e
Show file tree
Hide file tree
Showing 15 changed files with 80 additions and 28 deletions.
3 changes: 3 additions & 0 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ global_write_buffer_reject_size = "2GB"
sst_meta_cache_size = "128MB"
# Cache size for vectors and arrow arrays (default 512MB). Setting it to 0 to disable the cache.
vector_cache_size = "512MB"
# Buffer size for SST writing.
sst_write_buffer_size = "8MB"


# Log options
# [logging]
Expand Down
2 changes: 0 additions & 2 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ mod tests {
use std::io::Write;
use std::time::Duration;

use common_base::readable_size::ReadableSize;
use common_test_util::temp_dir::create_named_temp_file;
use datanode::config::{CompactionConfig, FileConfig, ObjectStoreConfig, RegionManifestConfig};
use servers::heartbeat_options::HeartbeatOptions;
Expand Down Expand Up @@ -300,7 +299,6 @@ mod tests {
max_inflight_tasks: 3,
max_files_in_level0: 7,
max_purge_tasks: 32,
sst_write_buffer_size: ReadableSize::mb(8),
},
options.storage.compaction,
);
Expand Down
4 changes: 0 additions & 4 deletions src/datanode/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,6 @@ pub struct CompactionConfig {
pub max_files_in_level0: usize,
/// Max task number for SST purge task after compaction.
pub max_purge_tasks: usize,
/// Buffer threshold while writing SST files
pub sst_write_buffer_size: ReadableSize,
}

impl Default for CompactionConfig {
Expand All @@ -260,7 +258,6 @@ impl Default for CompactionConfig {
max_inflight_tasks: 4,
max_files_in_level0: 8,
max_purge_tasks: 32,
sst_write_buffer_size: ReadableSize::mb(8),
}
}
}
Expand Down Expand Up @@ -312,7 +309,6 @@ impl From<&DatanodeOptions> for StorageEngineConfig {
manifest_gc_duration: value.storage.manifest.gc_duration,
max_files_in_l0: value.storage.compaction.max_files_in_level0,
max_purge_tasks: value.storage.compaction.max_purge_tasks,
sst_write_buffer_size: value.storage.compaction.sst_write_buffer_size,
max_flush_tasks: value.storage.flush.max_flush_tasks,
region_write_buffer_size: value.storage.flush.region_write_buffer_size,
picker_schedule_interval: value.storage.flush.picker_schedule_interval,
Expand Down
34 changes: 27 additions & 7 deletions src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;

use common_base::readable_size::ReadableSize;
use common_telemetry::{debug, error};
pub use picker::CompactionPickerRef;
use snafu::ResultExt;
Expand All @@ -30,6 +31,7 @@ use tokio::sync::mpsc::{self, Sender};

use crate::access_layer::AccessLayerRef;
use crate::compaction::twcs::TwcsPicker;
use crate::config::MitoConfig;
use crate::error::{
CompactRegionSnafu, Error, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result,
};
Expand All @@ -51,6 +53,8 @@ pub struct CompactionRequest {
pub(crate) file_purger: FilePurgerRef,
/// Start time of compaction task.
pub(crate) start_time: Instant,
/// Buffering threshold while writing SST files.
pub(crate) sst_write_buffer_size: ReadableSize,
}

impl CompactionRequest {
Expand Down Expand Up @@ -103,6 +107,7 @@ impl CompactionScheduler {
access_layer: &AccessLayerRef,
file_purger: &FilePurgerRef,
waiter: OptionOutputTx,
engine_config: Arc<MitoConfig>,
) -> Result<()> {
if let Some(status) = self.region_status.get_mut(&region_id) {
// Region is compacting. Add the waiter to pending list.
Expand All @@ -117,19 +122,27 @@ impl CompactionScheduler {
access_layer.clone(),
file_purger.clone(),
);
let request = status.new_compaction_request(self.request_sender.clone(), waiter);
let request =
status.new_compaction_request(self.request_sender.clone(), waiter, engine_config);
self.region_status.insert(region_id, status);
self.schedule_compaction_request(request)
}

/// Notifies the scheduler that the compaction job is finished successfully.
pub(crate) fn on_compaction_finished(&mut self, region_id: RegionId) {
pub(crate) fn on_compaction_finished(
&mut self,
region_id: RegionId,
engine_config: Arc<MitoConfig>,
) {
let Some(status) = self.region_status.get_mut(&region_id) else {
return;
};
// We should always try to compact the region until picker returns None.
let request =
status.new_compaction_request(self.request_sender.clone(), OptionOutputTx::none());
let request = status.new_compaction_request(
self.request_sender.clone(),
OptionOutputTx::none(),
engine_config,
);
// Try to schedule next compaction task for this region.
if let Err(e) = self.schedule_compaction_request(request) {
error!(e; "Failed to schedule next compaction for region {}", region_id);
Expand All @@ -138,7 +151,7 @@ impl CompactionScheduler {

/// Notifies the scheduler that the compaction job is failed.
pub(crate) fn on_compaction_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);
error!(err; "Region {} failed to compact, cancel all pending tasks", region_id);
// Remove this region.
let Some(status) = self.region_status.remove(&region_id) else {
return;
Expand Down Expand Up @@ -236,7 +249,7 @@ impl PendingCompaction {
}
}

/// Send flush error to waiter.
/// Send compaction error to waiter.
fn on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
for waiter in self.waiters.drain(..) {
waiter.send(Err(err.clone()).context(CompactRegionSnafu { region_id }));
Expand Down Expand Up @@ -300,6 +313,7 @@ impl CompactionStatus {
&mut self,
request_sender: Sender<WorkerRequest>,
waiter: OptionOutputTx,
engine_config: Arc<MitoConfig>,
) -> CompactionRequest {
let current_version = self.version_control.current().version;
let start_time = Instant::now();
Expand All @@ -310,6 +324,7 @@ impl CompactionStatus {
waiters: Vec::new(),
file_purger: self.file_purger.clone(),
start_time,
sst_write_buffer_size: engine_config.sst_write_buffer_size,
};

if let Some(pending) = self.pending_compaction.take() {
Expand Down Expand Up @@ -352,6 +367,7 @@ mod tests {
&env.access_layer,
&purger,
waiter,
Arc::new(MitoConfig::default()),
)
.unwrap();
let output = output_rx.await.unwrap().unwrap();
Expand All @@ -369,6 +385,7 @@ mod tests {
&env.access_layer,
&purger,
waiter,
Arc::new(MitoConfig::default()),
)
.unwrap();
let output = output_rx.await.unwrap().unwrap();
Expand Down Expand Up @@ -427,6 +444,7 @@ mod tests {
&env.access_layer,
&purger,
OptionOutputTx::none(),
Arc::new(MitoConfig::default()),
)
.unwrap();
// Should schedule 1 compaction.
Expand Down Expand Up @@ -454,6 +472,7 @@ mod tests {
&env.access_layer,
&purger,
OptionOutputTx::none(),
Arc::new(MitoConfig::default()),
)
.unwrap();
assert_eq!(1, scheduler.region_status.len());
Expand All @@ -466,7 +485,7 @@ mod tests {
.is_some());

// On compaction finished and schedule next compaction.
scheduler.on_compaction_finished(region_id);
scheduler.on_compaction_finished(region_id, Arc::new(MitoConfig::default()));
assert_eq!(1, scheduler.region_status.len());
assert_eq!(2, job_scheduler.num_jobs());
// 5 files for next compaction.
Expand All @@ -484,6 +503,7 @@ mod tests {
&env.access_layer,
&purger,
OptionOutputTx::none(),
Arc::new(MitoConfig::default()),
)
.unwrap();
assert_eq!(2, job_scheduler.num_jobs());
Expand Down
5 changes: 3 additions & 2 deletions src/mito2/src/compaction/twcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ impl Picker for TwcsPicker {
waiters,
file_purger,
start_time,
sst_write_buffer_size,
} = req;

let region_metadata = current_version.metadata.clone();
Expand Down Expand Up @@ -167,7 +168,7 @@ impl Picker for TwcsPicker {
sst_layer: access_layer,
outputs,
expired_ssts,
sst_write_buffer_size: ReadableSize::mb(4),
sst_write_buffer_size,
compaction_time_window: Some(time_window_size),
request_sender,
waiters,
Expand Down Expand Up @@ -355,7 +356,7 @@ impl CompactionTask for TwcsCompactionTask {
Ok((added, deleted)) => {
info!(
"Compacted SST files, input: {:?}, output: {:?}, window: {:?}",
added, deleted, self.compaction_time_window
deleted, added, self.compaction_time_window
);

BackgroundNotify::CompactionFinished(CompactionFinished {
Expand Down
13 changes: 13 additions & 0 deletions src/mito2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const DEFAULT_NUM_WORKERS: usize = 1;
/// Default max running background job.
const DEFAULT_MAX_BG_JOB: usize = 4;

const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);

/// Configuration for [MitoEngine](crate::engine::MitoEngine).
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(default)]
Expand Down Expand Up @@ -63,6 +65,8 @@ pub struct MitoConfig {
pub sst_meta_cache_size: ReadableSize,
/// Cache size for vectors and arrow arrays (default 512MB). Setting it to 0 to disable the cache.
pub vector_cache_size: ReadableSize,
/// Buffer size for SST writing.
pub sst_write_buffer_size: ReadableSize,
}

impl Default for MitoConfig {
Expand All @@ -79,6 +83,7 @@ impl Default for MitoConfig {
global_write_buffer_reject_size: ReadableSize::gb(2),
sst_meta_cache_size: ReadableSize::mb(128),
vector_cache_size: ReadableSize::mb(512),
sst_write_buffer_size: ReadableSize::mb(8),
}
}
}
Expand Down Expand Up @@ -117,5 +122,13 @@ impl MitoConfig {
self.global_write_buffer_reject_size
);
}

if self.sst_write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
self.sst_write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
warn!(
"Sanitize sst write buffer size to {}",
self.sst_write_buffer_size
);
}
}
}
9 changes: 7 additions & 2 deletions src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use strum::IntoStaticStr;
use tokio::sync::mpsc;

use crate::access_layer::AccessLayerRef;
use crate::config::MitoConfig;
use crate::error::{
Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result,
};
Expand Down Expand Up @@ -198,6 +199,7 @@ pub(crate) struct RegionFlushTask {
pub(crate) memtable_builder: MemtableBuilderRef,
pub(crate) file_purger: FilePurgerRef,
pub(crate) listener: WorkerListener,
pub(crate) engine_config: Arc<MitoConfig>,
pub(crate) row_group_size: Option<usize>,
}

Expand Down Expand Up @@ -289,8 +291,10 @@ impl RegionFlushTask {
.with_label_values(&["flush_memtables"])
.start_timer();

// TODO(yingwen): Make it configurable.
let mut write_opts = WriteOptions::default();
let mut write_opts = WriteOptions {
write_buffer_size: self.engine_config.sst_write_buffer_size,
..Default::default()
};
if let Some(row_group_size) = self.row_group_size {
write_opts.row_group_size = row_group_size;
}
Expand Down Expand Up @@ -723,6 +727,7 @@ mod tests {
memtable_builder: builder.memtable_builder(),
file_purger: builder.file_purger(),
listener: WorkerListener::default(),
engine_config: Arc::new(MitoConfig::default()),
row_group_size: None,
};
task.push_sender(OptionOutputTx::from(output_tx));
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/worker/handle_alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl<S> RegionWorkerLoop<S> {
info!("Flush region: {} before alteration", region_id);

// Try to submit a flush task.
let task = self.new_flush_task(&region, FlushReason::Alter, None);
let task = self.new_flush_task(&region, FlushReason::Alter, None, self.config.clone());
if let Err(e) =
self.flush_scheduler
.schedule_flush(region.region_id, &region.version_control, task)
Expand Down
5 changes: 4 additions & 1 deletion src/mito2/src/worker/handle_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
&region.access_layer,
&region.file_purger,
sender,
self.config.clone(),
) {
error!(e; "Failed to schedule compaction task for region: {}", region_id);
} else {
Expand Down Expand Up @@ -86,8 +87,10 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
// compaction finished.
request.on_success();

// Schedule next compaction if necessary.
self.compaction_scheduler.on_compaction_finished(region_id);
self.compaction_scheduler
.on_compaction_finished(region_id, self.config.clone());
}

/// When compaction fails, we simply log the error.
Expand Down
Loading

0 comments on commit b53537e

Please sign in to comment.