Skip to content

Commit

Permalink
chore: decrease mutable write buffer limit (#2390)
Browse files Browse the repository at this point in the history
* chore: set mutable limit to half of the global write buffer size

* refactor: put handle_flush_finished after handle_flush_request

* refactor: rename tests.rs to basic_test.rs

* style: fmt code
  • Loading branch information
evenyag authored Sep 14, 2023
1 parent 47bf300 commit 26992d5
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 79 deletions.
4 changes: 2 additions & 2 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#[cfg(test)]
mod alter_test;
#[cfg(test)]
mod basic_test;
#[cfg(test)]
mod close_test;
#[cfg(test)]
mod compaction_test;
Expand All @@ -32,8 +34,6 @@ pub(crate) mod listener;
mod open_test;
#[cfg(test)]
mod projection_test;
#[cfg(test)]
mod tests;

use std::sync::Arc;

Expand Down
File renamed without changes.
21 changes: 11 additions & 10 deletions src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ impl WriteBufferManagerImpl {

/// Returns the size limit for mutable memtables.
fn get_mutable_limit(global_write_buffer_size: usize) -> usize {
global_write_buffer_size * 7 / 8
// Reserves half of the write buffer for mutable memtable.
global_write_buffer_size / 2
}
}

Expand Down Expand Up @@ -591,17 +592,17 @@ mod tests {

#[test]
fn test_get_mutable_limit() {
assert_eq!(7, WriteBufferManagerImpl::get_mutable_limit(8));
assert_eq!(8, WriteBufferManagerImpl::get_mutable_limit(10));
assert_eq!(56, WriteBufferManagerImpl::get_mutable_limit(64));
assert_eq!(4, WriteBufferManagerImpl::get_mutable_limit(8));
assert_eq!(5, WriteBufferManagerImpl::get_mutable_limit(10));
assert_eq!(32, WriteBufferManagerImpl::get_mutable_limit(64));
assert_eq!(0, WriteBufferManagerImpl::get_mutable_limit(0));
}

#[test]
fn test_over_mutable_limit() {
// Mutable limit is 800.
// Mutable limit is 500.
let manager = WriteBufferManagerImpl::new(1000);
manager.reserve_mem(500);
manager.reserve_mem(400);
assert!(!manager.should_flush_engine());
assert!(!manager.should_stall());

Expand All @@ -610,20 +611,20 @@ mod tests {
assert!(manager.should_flush_engine());

// Freezes mutable.
manager.schedule_free_mem(500);
manager.schedule_free_mem(400);
assert!(!manager.should_flush_engine());
assert_eq!(900, manager.memory_used.load(Ordering::Relaxed));
assert_eq!(800, manager.memory_used.load(Ordering::Relaxed));
assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));

// Releases immutable.
manager.free_mem(500);
manager.free_mem(400);
assert_eq!(400, manager.memory_used.load(Ordering::Relaxed));
assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
}

#[test]
fn test_over_global() {
// Mutable limit is 800.
// Mutable limit is 500.
let manager = WriteBufferManagerImpl::new(1000);
manager.reserve_mem(1100);
assert!(manager.should_stall());
Expand Down
134 changes: 67 additions & 67 deletions src/mito2/src/worker/handle_flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,73 +26,6 @@ use crate::region::MitoRegionRef;
use crate::request::{FlushFailed, FlushFinished, OnFailure, OptionOutputTx};
use crate::worker::RegionWorkerLoop;

impl<S: LogStore> RegionWorkerLoop<S> {
/// On region flush job finished.
pub(crate) async fn handle_flush_finished(
&mut self,
region_id: RegionId,
mut request: FlushFinished,
) {
let Some(region) = self.regions.writable_region_or(region_id, &mut request) else {
return;
};

// Write region edit to manifest.
let edit = RegionEdit {
files_to_add: std::mem::take(&mut request.file_metas),
files_to_remove: Vec::new(),
compaction_time_window: None,
flushed_entry_id: Some(request.flushed_entry_id),
flushed_sequence: Some(request.flushed_sequence),
};
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
if let Err(e) = region.manifest_manager.update(action_list).await {
error!(e; "Failed to write manifest, region: {}", region_id);
request.on_failure(e);
return;
}

// Apply edit to region's version.
region.version_control.apply_edit(
edit,
&request.memtables_to_remove,
region.file_purger.clone(),
);
region.update_flush_millis();

// Delete wal.
info!(
"Region {} flush finished, tries to bump wal to {}",
region_id, request.flushed_entry_id
);
if let Err(e) = self.wal.obsolete(region_id, request.flushed_entry_id).await {
error!(e; "Failed to write wal, region: {}", region_id);
request.on_failure(e);
return;
}

// Notifies waiters.
request.on_success();

// Handle pending requests for the region.
if let Some((ddl_requests, write_requests)) =
self.flush_scheduler.on_flush_success(region_id)
{
// Perform DDLs first because they require empty memtables.
self.handle_ddl_requests(ddl_requests).await;
// Handle pending write requests, we don't stall these requests.
self.handle_write_requests(write_requests, false).await;
}

// Handle stalled requests.
let stalled = std::mem::take(&mut self.stalled_requests);
// We already stalled these requests, don't stall them again.
self.handle_write_requests(stalled.requests, false).await;

self.listener.on_flush_success(region_id);
}
}

impl<S> RegionWorkerLoop<S> {
/// Handles manual flush request.
pub(crate) async fn handle_flush_request(
Expand Down Expand Up @@ -191,3 +124,70 @@ impl<S> RegionWorkerLoop<S> {
}
}
}

impl<S: LogStore> RegionWorkerLoop<S> {
/// On region flush job finished.
pub(crate) async fn handle_flush_finished(
&mut self,
region_id: RegionId,
mut request: FlushFinished,
) {
let Some(region) = self.regions.writable_region_or(region_id, &mut request) else {
return;
};

// Write region edit to manifest.
let edit = RegionEdit {
files_to_add: std::mem::take(&mut request.file_metas),
files_to_remove: Vec::new(),
compaction_time_window: None,
flushed_entry_id: Some(request.flushed_entry_id),
flushed_sequence: Some(request.flushed_sequence),
};
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
if let Err(e) = region.manifest_manager.update(action_list).await {
error!(e; "Failed to write manifest, region: {}", region_id);
request.on_failure(e);
return;
}

// Apply edit to region's version.
region.version_control.apply_edit(
edit,
&request.memtables_to_remove,
region.file_purger.clone(),
);
region.update_flush_millis();

// Delete wal.
info!(
"Region {} flush finished, tries to bump wal to {}",
region_id, request.flushed_entry_id
);
if let Err(e) = self.wal.obsolete(region_id, request.flushed_entry_id).await {
error!(e; "Failed to write wal, region: {}", region_id);
request.on_failure(e);
return;
}

// Notifies waiters.
request.on_success();

// Handle pending requests for the region.
if let Some((ddl_requests, write_requests)) =
self.flush_scheduler.on_flush_success(region_id)
{
// Perform DDLs first because they require empty memtables.
self.handle_ddl_requests(ddl_requests).await;
// Handle pending write requests, we don't stall these requests.
self.handle_write_requests(write_requests, false).await;
}

// Handle stalled requests.
let stalled = std::mem::take(&mut self.stalled_requests);
// We already stalled these requests, don't stall them again.
self.handle_write_requests(stalled.requests, false).await;

self.listener.on_flush_success(region_id);
}
}

0 comments on commit 26992d5

Please sign in to comment.