Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: decrease mutable write buffer limit #2390

Merged
merged 4 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#[cfg(test)]
mod alter_test;
#[cfg(test)]
mod basic_test;
#[cfg(test)]
mod close_test;
#[cfg(test)]
mod compaction_test;
Expand All @@ -32,8 +34,6 @@ pub(crate) mod listener;
mod open_test;
#[cfg(test)]
mod projection_test;
#[cfg(test)]
mod tests;

use std::sync::Arc;

Expand Down
File renamed without changes.
21 changes: 11 additions & 10 deletions src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ impl WriteBufferManagerImpl {

/// Returns the size limit for mutable memtables.
fn get_mutable_limit(global_write_buffer_size: usize) -> usize {
global_write_buffer_size * 7 / 8
// Reserves half of the write buffer for mutable memtable.
global_write_buffer_size / 2
}
}

Expand Down Expand Up @@ -591,17 +592,17 @@ mod tests {

#[test]
fn test_get_mutable_limit() {
assert_eq!(7, WriteBufferManagerImpl::get_mutable_limit(8));
assert_eq!(8, WriteBufferManagerImpl::get_mutable_limit(10));
assert_eq!(56, WriteBufferManagerImpl::get_mutable_limit(64));
assert_eq!(4, WriteBufferManagerImpl::get_mutable_limit(8));
assert_eq!(5, WriteBufferManagerImpl::get_mutable_limit(10));
assert_eq!(32, WriteBufferManagerImpl::get_mutable_limit(64));
assert_eq!(0, WriteBufferManagerImpl::get_mutable_limit(0));
}

#[test]
fn test_over_mutable_limit() {
// Mutable limit is 800.
// Mutable limit is 500.
let manager = WriteBufferManagerImpl::new(1000);
manager.reserve_mem(500);
manager.reserve_mem(400);
assert!(!manager.should_flush_engine());
assert!(!manager.should_stall());

Expand All @@ -610,20 +611,20 @@ mod tests {
assert!(manager.should_flush_engine());

// Freezes mutable.
manager.schedule_free_mem(500);
manager.schedule_free_mem(400);
assert!(!manager.should_flush_engine());
assert_eq!(900, manager.memory_used.load(Ordering::Relaxed));
assert_eq!(800, manager.memory_used.load(Ordering::Relaxed));
assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));

// Releases immutable.
manager.free_mem(500);
manager.free_mem(400);
assert_eq!(400, manager.memory_used.load(Ordering::Relaxed));
assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
}

#[test]
fn test_over_global() {
// Mutable limit is 800.
// Mutable limit is 500.
let manager = WriteBufferManagerImpl::new(1000);
manager.reserve_mem(1100);
assert!(manager.should_stall());
Expand Down
134 changes: 67 additions & 67 deletions src/mito2/src/worker/handle_flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,73 +26,6 @@ use crate::region::MitoRegionRef;
use crate::request::{FlushFailed, FlushFinished, OnFailure, OptionOutputTx};
use crate::worker::RegionWorkerLoop;

impl<S: LogStore> RegionWorkerLoop<S> {
/// On region flush job finished.
pub(crate) async fn handle_flush_finished(
&mut self,
region_id: RegionId,
mut request: FlushFinished,
) {
let Some(region) = self.regions.writable_region_or(region_id, &mut request) else {
return;
};

// Write region edit to manifest.
let edit = RegionEdit {
files_to_add: std::mem::take(&mut request.file_metas),
files_to_remove: Vec::new(),
compaction_time_window: None,
flushed_entry_id: Some(request.flushed_entry_id),
flushed_sequence: Some(request.flushed_sequence),
};
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
if let Err(e) = region.manifest_manager.update(action_list).await {
error!(e; "Failed to write manifest, region: {}", region_id);
request.on_failure(e);
return;
}

// Apply edit to region's version.
region.version_control.apply_edit(
edit,
&request.memtables_to_remove,
region.file_purger.clone(),
);
region.update_flush_millis();

// Delete wal.
info!(
"Region {} flush finished, tries to bump wal to {}",
region_id, request.flushed_entry_id
);
if let Err(e) = self.wal.obsolete(region_id, request.flushed_entry_id).await {
error!(e; "Failed to write wal, region: {}", region_id);
request.on_failure(e);
return;
}

// Notifies waiters.
request.on_success();

// Handle pending requests for the region.
if let Some((ddl_requests, write_requests)) =
self.flush_scheduler.on_flush_success(region_id)
{
// Perform DDLs first because they require empty memtables.
self.handle_ddl_requests(ddl_requests).await;
// Handle pending write requests, we don't stall these requests.
self.handle_write_requests(write_requests, false).await;
}

// Handle stalled requests.
let stalled = std::mem::take(&mut self.stalled_requests);
// We already stalled these requests, don't stall them again.
self.handle_write_requests(stalled.requests, false).await;

self.listener.on_flush_success(region_id);
}
}

impl<S> RegionWorkerLoop<S> {
/// Handles manual flush request.
pub(crate) async fn handle_flush_request(
Expand Down Expand Up @@ -191,3 +124,70 @@ impl<S> RegionWorkerLoop<S> {
}
}
}

impl<S: LogStore> RegionWorkerLoop<S> {
/// On region flush job finished.
pub(crate) async fn handle_flush_finished(
&mut self,
region_id: RegionId,
mut request: FlushFinished,
) {
let Some(region) = self.regions.writable_region_or(region_id, &mut request) else {
return;
};

// Write region edit to manifest.
let edit = RegionEdit {
files_to_add: std::mem::take(&mut request.file_metas),
files_to_remove: Vec::new(),
compaction_time_window: None,
flushed_entry_id: Some(request.flushed_entry_id),
flushed_sequence: Some(request.flushed_sequence),
};
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
if let Err(e) = region.manifest_manager.update(action_list).await {
error!(e; "Failed to write manifest, region: {}", region_id);
request.on_failure(e);
return;
}

// Apply edit to region's version.
region.version_control.apply_edit(
edit,
&request.memtables_to_remove,
region.file_purger.clone(),
);
region.update_flush_millis();

// Delete wal.
info!(
"Region {} flush finished, tries to bump wal to {}",
region_id, request.flushed_entry_id
);
if let Err(e) = self.wal.obsolete(region_id, request.flushed_entry_id).await {
error!(e; "Failed to write wal, region: {}", region_id);
request.on_failure(e);
return;
}

// Notifies waiters.
request.on_success();

// Handle pending requests for the region.
if let Some((ddl_requests, write_requests)) =
self.flush_scheduler.on_flush_success(region_id)
{
// Perform DDLs first because they require empty memtables.
self.handle_ddl_requests(ddl_requests).await;
// Handle pending write requests, we don't stall these requests.
self.handle_write_requests(write_requests, false).await;
}

// Handle stalled requests.
let stalled = std::mem::take(&mut self.stalled_requests);
// We already stalled these requests, don't stall them again.
self.handle_write_requests(stalled.requests, false).await;

self.listener.on_flush_success(region_id);
}
}