From 26992d58cd8ab67bcdc0b5c65a7f49d3cad80d7e Mon Sep 17 00:00:00 2001
From: Yingwen <realevenyag@gmail.com>
Date: Thu, 14 Sep 2023 16:24:14 +0800
Subject: [PATCH] chore: decrease mutable write buffer limit (#2390)

* chore: set mutable limit to half of the global write buffer size

* refactor: put handle_flush_finished after handle_flush_request

* refactor: rename tests.rs to basic_test.rs

* style: fmt code
---
 src/mito2/src/engine.rs                       |   4 +-
 .../src/engine/{tests.rs => basic_test.rs}    |   0
 src/mito2/src/flush.rs                        |  21 +--
 src/mito2/src/worker/handle_flush.rs          | 134 +++++++++---------
 4 files changed, 80 insertions(+), 79 deletions(-)
 rename src/mito2/src/engine/{tests.rs => basic_test.rs} (100%)

diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs
index 8454f70c4223..8200e1778b71 100644
--- a/src/mito2/src/engine.rs
+++ b/src/mito2/src/engine.rs
@@ -17,6 +17,8 @@
 #[cfg(test)]
 mod alter_test;
 #[cfg(test)]
+mod basic_test;
+#[cfg(test)]
 mod close_test;
 #[cfg(test)]
 mod compaction_test;
@@ -32,8 +34,6 @@ pub(crate) mod listener;
 mod open_test;
 #[cfg(test)]
 mod projection_test;
-#[cfg(test)]
-mod tests;
 
 use std::sync::Arc;
 
diff --git a/src/mito2/src/engine/tests.rs b/src/mito2/src/engine/basic_test.rs
similarity index 100%
rename from src/mito2/src/engine/tests.rs
rename to src/mito2/src/engine/basic_test.rs
diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs
index cda4a1d3fca6..15dc80de0e14 100644
--- a/src/mito2/src/flush.rs
+++ b/src/mito2/src/flush.rs
@@ -101,7 +101,8 @@ impl WriteBufferManagerImpl {
 
     /// Returns the size limit for mutable memtables.
     fn get_mutable_limit(global_write_buffer_size: usize) -> usize {
-        global_write_buffer_size * 7 / 8
+        // Reserves half of the write buffer for mutable memtable.
+        global_write_buffer_size / 2
     }
 }
 
@@ -591,17 +592,17 @@ mod tests {
 
     #[test]
     fn test_get_mutable_limit() {
-        assert_eq!(7, WriteBufferManagerImpl::get_mutable_limit(8));
-        assert_eq!(8, WriteBufferManagerImpl::get_mutable_limit(10));
-        assert_eq!(56, WriteBufferManagerImpl::get_mutable_limit(64));
+        assert_eq!(4, WriteBufferManagerImpl::get_mutable_limit(8));
+        assert_eq!(5, WriteBufferManagerImpl::get_mutable_limit(10));
+        assert_eq!(32, WriteBufferManagerImpl::get_mutable_limit(64));
         assert_eq!(0, WriteBufferManagerImpl::get_mutable_limit(0));
     }
 
     #[test]
     fn test_over_mutable_limit() {
-        // Mutable limit is 800.
+        // Mutable limit is 500.
         let manager = WriteBufferManagerImpl::new(1000);
-        manager.reserve_mem(500);
+        manager.reserve_mem(400);
         assert!(!manager.should_flush_engine());
         assert!(!manager.should_stall());
 
@@ -610,20 +611,20 @@ mod tests {
         assert!(manager.should_flush_engine());
 
         // Freezes mutable.
-        manager.schedule_free_mem(500);
+        manager.schedule_free_mem(400);
         assert!(!manager.should_flush_engine());
-        assert_eq!(900, manager.memory_used.load(Ordering::Relaxed));
+        assert_eq!(800, manager.memory_used.load(Ordering::Relaxed));
         assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
 
         // Releases immutable.
-        manager.free_mem(500);
+        manager.free_mem(400);
         assert_eq!(400, manager.memory_used.load(Ordering::Relaxed));
         assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
     }
 
     #[test]
     fn test_over_global() {
-        // Mutable limit is 800.
+        // Mutable limit is 500.
         let manager = WriteBufferManagerImpl::new(1000);
         manager.reserve_mem(1100);
         assert!(manager.should_stall());
diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs
index 3c6b1deabd27..36eaec042eb1 100644
--- a/src/mito2/src/worker/handle_flush.rs
+++ b/src/mito2/src/worker/handle_flush.rs
@@ -26,73 +26,6 @@ use crate::region::MitoRegionRef;
 use crate::request::{FlushFailed, FlushFinished, OnFailure, OptionOutputTx};
 use crate::worker::RegionWorkerLoop;
 
-impl<S: LogStore> RegionWorkerLoop<S> {
-    /// On region flush job finished.
-    pub(crate) async fn handle_flush_finished(
-        &mut self,
-        region_id: RegionId,
-        mut request: FlushFinished,
-    ) {
-        let Some(region) = self.regions.writable_region_or(region_id, &mut request) else {
-            return;
-        };
-
-        // Write region edit to manifest.
-        let edit = RegionEdit {
-            files_to_add: std::mem::take(&mut request.file_metas),
-            files_to_remove: Vec::new(),
-            compaction_time_window: None,
-            flushed_entry_id: Some(request.flushed_entry_id),
-            flushed_sequence: Some(request.flushed_sequence),
-        };
-        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
-        if let Err(e) = region.manifest_manager.update(action_list).await {
-            error!(e; "Failed to write manifest, region: {}", region_id);
-            request.on_failure(e);
-            return;
-        }
-
-        // Apply edit to region's version.
-        region.version_control.apply_edit(
-            edit,
-            &request.memtables_to_remove,
-            region.file_purger.clone(),
-        );
-        region.update_flush_millis();
-
-        // Delete wal.
-        info!(
-            "Region {} flush finished, tries to bump wal to {}",
-            region_id, request.flushed_entry_id
-        );
-        if let Err(e) = self.wal.obsolete(region_id, request.flushed_entry_id).await {
-            error!(e; "Failed to write wal, region: {}", region_id);
-            request.on_failure(e);
-            return;
-        }
-
-        // Notifies waiters.
-        request.on_success();
-
-        // Handle pending requests for the region.
-        if let Some((ddl_requests, write_requests)) =
-            self.flush_scheduler.on_flush_success(region_id)
-        {
-            // Perform DDLs first because they require empty memtables.
-            self.handle_ddl_requests(ddl_requests).await;
-            // Handle pending write requests, we don't stall these requests.
-            self.handle_write_requests(write_requests, false).await;
-        }
-
-        // Handle stalled requests.
-        let stalled = std::mem::take(&mut self.stalled_requests);
-        // We already stalled these requests, don't stall them again.
-        self.handle_write_requests(stalled.requests, false).await;
-
-        self.listener.on_flush_success(region_id);
-    }
-}
-
 impl<S> RegionWorkerLoop<S> {
     /// Handles manual flush request.
     pub(crate) async fn handle_flush_request(
@@ -191,3 +124,70 @@ impl<S> RegionWorkerLoop<S> {
         }
     }
 }
+
+impl<S: LogStore> RegionWorkerLoop<S> {
+    /// On region flush job finished.
+    pub(crate) async fn handle_flush_finished(
+        &mut self,
+        region_id: RegionId,
+        mut request: FlushFinished,
+    ) {
+        let Some(region) = self.regions.writable_region_or(region_id, &mut request) else {
+            return;
+        };
+
+        // Write region edit to manifest.
+        let edit = RegionEdit {
+            files_to_add: std::mem::take(&mut request.file_metas),
+            files_to_remove: Vec::new(),
+            compaction_time_window: None,
+            flushed_entry_id: Some(request.flushed_entry_id),
+            flushed_sequence: Some(request.flushed_sequence),
+        };
+        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
+        if let Err(e) = region.manifest_manager.update(action_list).await {
+            error!(e; "Failed to write manifest, region: {}", region_id);
+            request.on_failure(e);
+            return;
+        }
+
+        // Apply edit to region's version.
+        region.version_control.apply_edit(
+            edit,
+            &request.memtables_to_remove,
+            region.file_purger.clone(),
+        );
+        region.update_flush_millis();
+
+        // Delete wal.
+        info!(
+            "Region {} flush finished, tries to bump wal to {}",
+            region_id, request.flushed_entry_id
+        );
+        if let Err(e) = self.wal.obsolete(region_id, request.flushed_entry_id).await {
+            error!(e; "Failed to write wal, region: {}", region_id);
+            request.on_failure(e);
+            return;
+        }
+
+        // Notifies waiters.
+        request.on_success();
+
+        // Handle pending requests for the region.
+        if let Some((ddl_requests, write_requests)) =
+            self.flush_scheduler.on_flush_success(region_id)
+        {
+            // Perform DDLs first because they require empty memtables.
+            self.handle_ddl_requests(ddl_requests).await;
+            // Handle pending write requests, we don't stall these requests.
+            self.handle_write_requests(write_requests, false).await;
+        }
+
+        // Handle stalled requests.
+        let stalled = std::mem::take(&mut self.stalled_requests);
+        // We already stalled these requests, don't stall them again.
+        self.handle_write_requests(stalled.requests, false).await;
+
+        self.listener.on_flush_success(region_id);
+    }
+}