diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 7b9f4abaa218..eb795f23b2d7 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -15,6 +15,7 @@ //! Flush related utilities and structs. use std::collections::HashMap; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use common_query::Output; @@ -62,23 +63,90 @@ pub trait WriteBufferManager: Send + Sync + std::fmt::Debug { pub type WriteBufferManagerRef = Arc; -// TODO(yingwen): Implements the manager. +/// Default [WriteBufferManager] implementation. +/// +/// Inspired by RocksDB's WriteBufferManager. +/// #[derive(Debug)] -pub struct WriteBufferManagerImpl {} +pub struct WriteBufferManagerImpl { + /// Write buffer size for the engine. + global_write_buffer_size: usize, + /// Mutable memtable memory size limit. + mutable_limit: usize, + /// Memory in used (e.g. used by mutable and immutable memtables). + memory_used: AtomicUsize, + /// Memory that hasn't been scheduled to free (e.g. used by mutable memtables). + memory_active: AtomicUsize, +} + +impl WriteBufferManagerImpl { + /// Returns a new manager with specific `global_write_buffer_size`. + pub fn new(global_write_buffer_size: usize) -> Self { + Self { + global_write_buffer_size, + mutable_limit: Self::get_mutable_limit(global_write_buffer_size), + memory_used: AtomicUsize::new(0), + memory_active: AtomicUsize::new(0), + } + } + + /// Returns memory usage of mutable memtables. + pub(crate) fn mutable_usage(&self) -> usize { + self.memory_active.load(Ordering::Relaxed) + } + + /// Returns the size limit for mutable memtables. + fn get_mutable_limit(global_write_buffer_size: usize) -> usize { + global_write_buffer_size * 7 / 8 + } +} impl WriteBufferManager for WriteBufferManagerImpl { fn should_flush_engine(&self) -> bool { + let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed); + if mutable_memtable_memory_usage > self.mutable_limit { + info!( + "Engine should flush (over mutable limit), mutable_usage: {}, mutable_limit: {}.", + mutable_memtable_memory_usage, self.mutable_limit, + ); + return true; + } + + let memory_usage = self.memory_used.load(Ordering::Relaxed); + // If the memory exceeds the buffer size, we trigger more aggressive + // flush. But if already more than half memory is being flushed, + // triggering more flush may not help. We will hold it instead. + if memory_usage >= self.global_write_buffer_size + && mutable_memtable_memory_usage >= self.global_write_buffer_size / 2 + { + info!( + "Engine should flush (over total limit), memory_usage: {}, global_write_buffer_size: {}, \ + mutable_usage: {}.", + memory_usage, + self.global_write_buffer_size, + mutable_memtable_memory_usage, + ); + return true; + } + false } - fn reserve_mem(&self, _mem: usize) {} + fn reserve_mem(&self, mem: usize) { + self.memory_used.fetch_add(mem, Ordering::Relaxed); + self.memory_active.fetch_add(mem, Ordering::Relaxed); + } - fn schedule_free_mem(&self, _mem: usize) {} + fn schedule_free_mem(&self, mem: usize) { + self.memory_active.fetch_sub(mem, Ordering::Relaxed); + } - fn free_mem(&self, _mem: usize) {} + fn free_mem(&self, mem: usize) { + self.memory_used.fetch_sub(mem, Ordering::Relaxed); + } fn memory_usage(&self) -> usize { - 0 + self.memory_used.load(Ordering::Relaxed) } } @@ -466,3 +534,59 @@ impl FlushStatus { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_get_mutable_limit() { + assert_eq!(7, WriteBufferManagerImpl::get_mutable_limit(8)); + assert_eq!(8, WriteBufferManagerImpl::get_mutable_limit(10)); + assert_eq!(56, WriteBufferManagerImpl::get_mutable_limit(64)); + assert_eq!(0, WriteBufferManagerImpl::get_mutable_limit(0)); + } + + #[test] + fn test_over_mutable_limit() { + // Mutable limit is 800. + let manager = WriteBufferManagerImpl::new(1000); + manager.reserve_mem(500); + assert!(!manager.should_flush_engine()); + + // More than mutable limit. + manager.reserve_mem(400); + assert!(manager.should_flush_engine()); + + // Freezes mutable. + manager.schedule_free_mem(500); + assert!(!manager.should_flush_engine()); + assert_eq!(900, manager.memory_used.load(Ordering::Relaxed)); + assert_eq!(400, manager.memory_active.load(Ordering::Relaxed)); + + // Releases immutable. + manager.free_mem(500); + assert_eq!(400, manager.memory_used.load(Ordering::Relaxed)); + assert_eq!(400, manager.memory_active.load(Ordering::Relaxed)); + } + + #[test] + fn test_over_global() { + // Mutable limit is 800. + let manager = WriteBufferManagerImpl::new(1000); + manager.reserve_mem(1100); + // Global usage is still 1100. + manager.schedule_free_mem(200); + assert!(manager.should_flush_engine()); + + // More than global limit, but mutable (1100-200-450=450) is not enough (< 500). + manager.schedule_free_mem(450); + assert!(!manager.should_flush_engine()); + + // Now mutable is enough. + manager.reserve_mem(50); + assert!(manager.should_flush_engine()); + manager.reserve_mem(100); + assert!(manager.should_flush_engine()); + } +} diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index ab5b2054d6f3..82675e9ed3eb 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -159,7 +159,7 @@ impl AllocTracker { /// Tracks `bytes` memory is allocated. pub(crate) fn on_allocation(&self, bytes: usize) { - let _ = self.bytes_allocated.fetch_add(bytes, Ordering::Relaxed); + self.bytes_allocated.fetch_add(bytes, Ordering::Relaxed); increment_gauge!(WRITE_BUFFER_BYTES, bytes as f64); if let Some(write_buffer_manager) = &self.write_buffer_manager { write_buffer_manager.reserve_mem(bytes); @@ -219,3 +219,61 @@ impl MemtableBuilder for DefaultMemtableBuilder { )) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::flush::{WriteBufferManager, WriteBufferManagerImpl}; + + #[test] + fn test_alloc_tracker_without_manager() { + let tracker = AllocTracker::new(None); + assert_eq!(0, tracker.bytes_allocated()); + tracker.on_allocation(100); + assert_eq!(100, tracker.bytes_allocated()); + tracker.on_allocation(200); + assert_eq!(300, tracker.bytes_allocated()); + + tracker.done_allocating(); + assert_eq!(300, tracker.bytes_allocated()); + } + + #[test] + fn test_alloc_tracker_with_manager() { + let manager = Arc::new(WriteBufferManagerImpl::new(1000)); + { + let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef)); + + tracker.on_allocation(100); + assert_eq!(100, tracker.bytes_allocated()); + assert_eq!(100, manager.memory_usage()); + assert_eq!(100, manager.mutable_usage()); + + for _ in 0..2 { + // Done allocating won't free the same memory multiple times. + tracker.done_allocating(); + assert_eq!(100, manager.memory_usage()); + assert_eq!(0, manager.mutable_usage()); + } + } + + assert_eq!(0, manager.memory_usage()); + assert_eq!(0, manager.mutable_usage()); + } + + #[test] + fn test_alloc_tracker_without_done_allocating() { + let manager = Arc::new(WriteBufferManagerImpl::new(1000)); + { + let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef)); + + tracker.on_allocation(100); + assert_eq!(100, tracker.bytes_allocated()); + assert_eq!(100, manager.memory_usage()); + assert_eq!(100, manager.mutable_usage()); + } + + assert_eq!(0, manager.memory_usage()); + assert_eq!(0, manager.mutable_usage()); + } +} diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 842d89627dd8..f59d0b2389f2 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -33,6 +33,7 @@ use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; use crate::error::{ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result}; +use crate::flush::WriteBufferManagerRef; use crate::memtable::{ AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRef, MemtableStats, @@ -43,15 +44,31 @@ use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; /// Initial vector builder capacity. const INITIAL_BUILDER_CAPACITY: usize = 32; +/// Builder to build [TimeSeriesMemtable]. #[derive(Debug, Default)] pub struct TimeSeriesMemtableBuilder { id: AtomicU32, + write_buffer_manager: Option, +} + +impl TimeSeriesMemtableBuilder { + /// Creates a new builder with specific `write_buffer_manager`. + pub fn new(write_buffer_manager: Option) -> Self { + Self { + id: AtomicU32::new(0), + write_buffer_manager, + } + } } impl MemtableBuilder for TimeSeriesMemtableBuilder { fn build(&self, metadata: &RegionMetadataRef) -> MemtableRef { let id = self.id.fetch_add(1, Ordering::Relaxed); - Arc::new(TimeSeriesMemtable::new(metadata.clone(), id)) + Arc::new(TimeSeriesMemtable::new( + metadata.clone(), + id, + self.write_buffer_manager.clone(), + )) } } @@ -67,7 +84,11 @@ pub struct TimeSeriesMemtable { } impl TimeSeriesMemtable { - pub fn new(region_metadata: RegionMetadataRef, id: MemtableId) -> Self { + pub fn new( + region_metadata: RegionMetadataRef, + id: MemtableId, + write_buffer_manager: Option, + ) -> Self { let row_codec = McmpRowCodec::new( region_metadata .primary_key_columns() @@ -80,7 +101,7 @@ impl TimeSeriesMemtable { region_metadata, series_set, row_codec, - alloc_tracker: AllocTracker::default(), + alloc_tracker: AllocTracker::new(write_buffer_manager), max_timestamp: AtomicI64::new(i64::MIN), min_timestamp: AtomicI64::new(i64::MAX), } @@ -873,7 +894,7 @@ mod tests { common_telemetry::init_default_ut_logging(); let schema = schema_for_test(); let kvs = build_key_values(&schema, "hello".to_string(), 42, 100); - let memtable = TimeSeriesMemtable::new(schema, 42); + let memtable = TimeSeriesMemtable::new(schema, 42, None); memtable.write(&kvs).unwrap(); let expected_ts = kvs @@ -904,7 +925,7 @@ mod tests { common_telemetry::init_default_ut_logging(); let schema = schema_for_test(); let kvs = build_key_values(&schema, "hello".to_string(), 42, 100); - let memtable = TimeSeriesMemtable::new(schema, 42); + let memtable = TimeSeriesMemtable::new(schema, 42, None); memtable.write(&kvs).unwrap(); let iter = memtable.iter(Some(&[3]), &[]); diff --git a/src/mito2/src/memtable/version.rs b/src/mito2/src/memtable/version.rs index 37e80108ddae..19a9da4a9dcf 100644 --- a/src/mito2/src/memtable/version.rs +++ b/src/mito2/src/memtable/version.rs @@ -86,8 +86,7 @@ impl MemtableVersion { } /// Returns the memory usage of the mutable memtable. - pub(crate) fn mutable_bytes_usage(&self) -> usize { - // TODO(yingwen): Get memtable usage. - 0 + pub(crate) fn mutable_usage(&self) -> usize { + self.mutable.stats().estimated_bytes } } diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index aed26de4fdc0..8b9691235cc7 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -104,7 +104,9 @@ impl WorkerGroup { ) -> WorkerGroup { assert!(config.num_workers.is_power_of_two()); let config = Arc::new(config); - let write_buffer_manager = Arc::new(WriteBufferManagerImpl {}); + let write_buffer_manager = Arc::new(WriteBufferManagerImpl::new( + config.global_write_buffer_size.as_bytes() as usize, + )); let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs)); let workers = (0..config.num_workers) @@ -198,7 +200,9 @@ impl WorkerStarter { wal: Wal::new(self.log_store), object_store: self.object_store, running: running.clone(), - memtable_builder: Arc::new(TimeSeriesMemtableBuilder::default()), + memtable_builder: Arc::new(TimeSeriesMemtableBuilder::new(Some( + self.write_buffer_manager.clone(), + ))), scheduler: self.scheduler.clone(), write_buffer_manager: self.write_buffer_manager, flush_scheduler: FlushScheduler::new(self.scheduler), diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 7d059da8e1d8..453eea9ddfb7 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -140,7 +140,7 @@ impl RegionWorkerLoop { } let version = region.version(); - let region_mutable_size = version.memtables.mutable_bytes_usage(); + let region_mutable_size = version.memtables.mutable_usage(); // Tracks region with max mutable memtable size. if region_mutable_size > max_mutable_size { max_mem_region = Some(region);