diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 8454f70c4223..8200e1778b71 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -17,6 +17,8 @@ #[cfg(test)] mod alter_test; #[cfg(test)] +mod basic_test; +#[cfg(test)] mod close_test; #[cfg(test)] mod compaction_test; @@ -32,8 +34,6 @@ pub(crate) mod listener; mod open_test; #[cfg(test)] mod projection_test; -#[cfg(test)] -mod tests; use std::sync::Arc; diff --git a/src/mito2/src/engine/tests.rs b/src/mito2/src/engine/basic_test.rs similarity index 100% rename from src/mito2/src/engine/tests.rs rename to src/mito2/src/engine/basic_test.rs diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index cda4a1d3fca6..15dc80de0e14 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -101,7 +101,8 @@ impl WriteBufferManagerImpl { /// Returns the size limit for mutable memtables. fn get_mutable_limit(global_write_buffer_size: usize) -> usize { - global_write_buffer_size * 7 / 8 + // Reserves half of the write buffer for mutable memtable. + global_write_buffer_size / 2 } } @@ -591,17 +592,17 @@ mod tests { #[test] fn test_get_mutable_limit() { - assert_eq!(7, WriteBufferManagerImpl::get_mutable_limit(8)); - assert_eq!(8, WriteBufferManagerImpl::get_mutable_limit(10)); - assert_eq!(56, WriteBufferManagerImpl::get_mutable_limit(64)); + assert_eq!(4, WriteBufferManagerImpl::get_mutable_limit(8)); + assert_eq!(5, WriteBufferManagerImpl::get_mutable_limit(10)); + assert_eq!(32, WriteBufferManagerImpl::get_mutable_limit(64)); assert_eq!(0, WriteBufferManagerImpl::get_mutable_limit(0)); } #[test] fn test_over_mutable_limit() { - // Mutable limit is 800. + // Mutable limit is 500. let manager = WriteBufferManagerImpl::new(1000); - manager.reserve_mem(500); + manager.reserve_mem(400); assert!(!manager.should_flush_engine()); assert!(!manager.should_stall()); @@ -610,20 +611,20 @@ mod tests { assert!(manager.should_flush_engine()); // Freezes mutable. - manager.schedule_free_mem(500); + manager.schedule_free_mem(400); assert!(!manager.should_flush_engine()); - assert_eq!(900, manager.memory_used.load(Ordering::Relaxed)); + assert_eq!(800, manager.memory_used.load(Ordering::Relaxed)); assert_eq!(400, manager.memory_active.load(Ordering::Relaxed)); // Releases immutable. - manager.free_mem(500); + manager.free_mem(400); assert_eq!(400, manager.memory_used.load(Ordering::Relaxed)); assert_eq!(400, manager.memory_active.load(Ordering::Relaxed)); } #[test] fn test_over_global() { - // Mutable limit is 800. + // Mutable limit is 500. let manager = WriteBufferManagerImpl::new(1000); manager.reserve_mem(1100); assert!(manager.should_stall()); diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 3c6b1deabd27..36eaec042eb1 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -26,73 +26,6 @@ use crate::region::MitoRegionRef; use crate::request::{FlushFailed, FlushFinished, OnFailure, OptionOutputTx}; use crate::worker::RegionWorkerLoop; -impl RegionWorkerLoop { - /// On region flush job finished. - pub(crate) async fn handle_flush_finished( - &mut self, - region_id: RegionId, - mut request: FlushFinished, - ) { - let Some(region) = self.regions.writable_region_or(region_id, &mut request) else { - return; - }; - - // Write region edit to manifest. - let edit = RegionEdit { - files_to_add: std::mem::take(&mut request.file_metas), - files_to_remove: Vec::new(), - compaction_time_window: None, - flushed_entry_id: Some(request.flushed_entry_id), - flushed_sequence: Some(request.flushed_sequence), - }; - let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())); - if let Err(e) = region.manifest_manager.update(action_list).await { - error!(e; "Failed to write manifest, region: {}", region_id); - request.on_failure(e); - return; - } - - // Apply edit to region's version. - region.version_control.apply_edit( - edit, - &request.memtables_to_remove, - region.file_purger.clone(), - ); - region.update_flush_millis(); - - // Delete wal. - info!( - "Region {} flush finished, tries to bump wal to {}", - region_id, request.flushed_entry_id - ); - if let Err(e) = self.wal.obsolete(region_id, request.flushed_entry_id).await { - error!(e; "Failed to write wal, region: {}", region_id); - request.on_failure(e); - return; - } - - // Notifies waiters. - request.on_success(); - - // Handle pending requests for the region. - if let Some((ddl_requests, write_requests)) = - self.flush_scheduler.on_flush_success(region_id) - { - // Perform DDLs first because they require empty memtables. - self.handle_ddl_requests(ddl_requests).await; - // Handle pending write requests, we don't stall these requests. - self.handle_write_requests(write_requests, false).await; - } - - // Handle stalled requests. - let stalled = std::mem::take(&mut self.stalled_requests); - // We already stalled these requests, don't stall them again. - self.handle_write_requests(stalled.requests, false).await; - - self.listener.on_flush_success(region_id); - } -} - impl RegionWorkerLoop { /// Handles manual flush request. pub(crate) async fn handle_flush_request( @@ -191,3 +124,70 @@ impl RegionWorkerLoop { } } } + +impl RegionWorkerLoop { + /// On region flush job finished. + pub(crate) async fn handle_flush_finished( + &mut self, + region_id: RegionId, + mut request: FlushFinished, + ) { + let Some(region) = self.regions.writable_region_or(region_id, &mut request) else { + return; + }; + + // Write region edit to manifest. + let edit = RegionEdit { + files_to_add: std::mem::take(&mut request.file_metas), + files_to_remove: Vec::new(), + compaction_time_window: None, + flushed_entry_id: Some(request.flushed_entry_id), + flushed_sequence: Some(request.flushed_sequence), + }; + let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())); + if let Err(e) = region.manifest_manager.update(action_list).await { + error!(e; "Failed to write manifest, region: {}", region_id); + request.on_failure(e); + return; + } + + // Apply edit to region's version. + region.version_control.apply_edit( + edit, + &request.memtables_to_remove, + region.file_purger.clone(), + ); + region.update_flush_millis(); + + // Delete wal. + info!( + "Region {} flush finished, tries to bump wal to {}", + region_id, request.flushed_entry_id + ); + if let Err(e) = self.wal.obsolete(region_id, request.flushed_entry_id).await { + error!(e; "Failed to write wal, region: {}", region_id); + request.on_failure(e); + return; + } + + // Notifies waiters. + request.on_success(); + + // Handle pending requests for the region. + if let Some((ddl_requests, write_requests)) = + self.flush_scheduler.on_flush_success(region_id) + { + // Perform DDLs first because they require empty memtables. + self.handle_ddl_requests(ddl_requests).await; + // Handle pending write requests, we don't stall these requests. + self.handle_write_requests(write_requests, false).await; + } + + // Handle stalled requests. + let stalled = std::mem::take(&mut self.stalled_requests); + // We already stalled these requests, don't stall them again. + self.handle_write_requests(stalled.requests, false).await; + + self.listener.on_flush_success(region_id); + } +}