Skip to content

Commit

Permalink
feat: Impl write buffer manager for mito2 (#2309)
Browse files Browse the repository at this point in the history
* feat: add write buffer manager to builder

* feat: impl WriteBufferManager

* feat: impl MemtableVersion::mutable_usage

* chore: Address CR comments

Co-authored-by: JeremyHi <[email protected]>

* refactor: rename mutable_limitation to mutable_limit

---------

Co-authored-by: JeremyHi <[email protected]>
  • Loading branch information
2 people authored and waynexia committed Sep 12, 2023
1 parent 3504d82 commit 50220f8
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 18 deletions.
136 changes: 130 additions & 6 deletions src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! Flush related utilities and structs.
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use common_query::Output;
Expand Down Expand Up @@ -62,23 +63,90 @@ pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {

pub type WriteBufferManagerRef = Arc<dyn WriteBufferManager>;

// TODO(yingwen): Implements the manager.
/// Default [WriteBufferManager] implementation.
///
/// Inspired by RocksDB's WriteBufferManager.
/// <https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_buffer_manager.h>
#[derive(Debug)]
pub struct WriteBufferManagerImpl {}
pub struct WriteBufferManagerImpl {
/// Write buffer size for the engine.
global_write_buffer_size: usize,
/// Mutable memtable memory size limit.
mutable_limit: usize,
/// Memory in used (e.g. used by mutable and immutable memtables).
memory_used: AtomicUsize,
/// Memory that hasn't been scheduled to free (e.g. used by mutable memtables).
memory_active: AtomicUsize,
}

impl WriteBufferManagerImpl {
/// Returns a new manager with specific `global_write_buffer_size`.
pub fn new(global_write_buffer_size: usize) -> Self {
Self {
global_write_buffer_size,
mutable_limit: Self::get_mutable_limit(global_write_buffer_size),
memory_used: AtomicUsize::new(0),
memory_active: AtomicUsize::new(0),
}
}

/// Returns memory usage of mutable memtables.
pub(crate) fn mutable_usage(&self) -> usize {
self.memory_active.load(Ordering::Relaxed)
}

/// Returns the size limit for mutable memtables.
fn get_mutable_limit(global_write_buffer_size: usize) -> usize {
global_write_buffer_size * 7 / 8
}
}

impl WriteBufferManager for WriteBufferManagerImpl {
fn should_flush_engine(&self) -> bool {
let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed);
if mutable_memtable_memory_usage > self.mutable_limit {
info!(
"Engine should flush (over mutable limit), mutable_usage: {}, mutable_limit: {}.",
mutable_memtable_memory_usage, self.mutable_limit,
);
return true;
}

let memory_usage = self.memory_used.load(Ordering::Relaxed);
// If the memory exceeds the buffer size, we trigger more aggressive
// flush. But if already more than half memory is being flushed,
// triggering more flush may not help. We will hold it instead.
if memory_usage >= self.global_write_buffer_size
&& mutable_memtable_memory_usage >= self.global_write_buffer_size / 2
{
info!(
"Engine should flush (over total limit), memory_usage: {}, global_write_buffer_size: {}, \
mutable_usage: {}.",
memory_usage,
self.global_write_buffer_size,
mutable_memtable_memory_usage,
);
return true;
}

false
}

fn reserve_mem(&self, _mem: usize) {}
fn reserve_mem(&self, mem: usize) {
self.memory_used.fetch_add(mem, Ordering::Relaxed);
self.memory_active.fetch_add(mem, Ordering::Relaxed);
}

fn schedule_free_mem(&self, _mem: usize) {}
fn schedule_free_mem(&self, mem: usize) {
self.memory_active.fetch_sub(mem, Ordering::Relaxed);
}

fn free_mem(&self, _mem: usize) {}
fn free_mem(&self, mem: usize) {
self.memory_used.fetch_sub(mem, Ordering::Relaxed);
}

fn memory_usage(&self) -> usize {
0
self.memory_used.load(Ordering::Relaxed)
}
}

Expand Down Expand Up @@ -466,3 +534,59 @@ impl FlushStatus {
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_get_mutable_limit() {
assert_eq!(7, WriteBufferManagerImpl::get_mutable_limit(8));
assert_eq!(8, WriteBufferManagerImpl::get_mutable_limit(10));
assert_eq!(56, WriteBufferManagerImpl::get_mutable_limit(64));
assert_eq!(0, WriteBufferManagerImpl::get_mutable_limit(0));
}

#[test]
fn test_over_mutable_limit() {
// Mutable limit is 800.
let manager = WriteBufferManagerImpl::new(1000);
manager.reserve_mem(500);
assert!(!manager.should_flush_engine());

// More than mutable limit.
manager.reserve_mem(400);
assert!(manager.should_flush_engine());

// Freezes mutable.
manager.schedule_free_mem(500);
assert!(!manager.should_flush_engine());
assert_eq!(900, manager.memory_used.load(Ordering::Relaxed));
assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));

// Releases immutable.
manager.free_mem(500);
assert_eq!(400, manager.memory_used.load(Ordering::Relaxed));
assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
}

#[test]
fn test_over_global() {
// Mutable limit is 800.
let manager = WriteBufferManagerImpl::new(1000);
manager.reserve_mem(1100);
// Global usage is still 1100.
manager.schedule_free_mem(200);
assert!(manager.should_flush_engine());

// More than global limit, but mutable (1100-200-450=450) is not enough (< 500).
manager.schedule_free_mem(450);
assert!(!manager.should_flush_engine());

// Now mutable is enough.
manager.reserve_mem(50);
assert!(manager.should_flush_engine());
manager.reserve_mem(100);
assert!(manager.should_flush_engine());
}
}
60 changes: 59 additions & 1 deletion src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl AllocTracker {

/// Tracks `bytes` memory is allocated.
pub(crate) fn on_allocation(&self, bytes: usize) {
let _ = self.bytes_allocated.fetch_add(bytes, Ordering::Relaxed);
self.bytes_allocated.fetch_add(bytes, Ordering::Relaxed);
increment_gauge!(WRITE_BUFFER_BYTES, bytes as f64);
if let Some(write_buffer_manager) = &self.write_buffer_manager {
write_buffer_manager.reserve_mem(bytes);
Expand Down Expand Up @@ -219,3 +219,61 @@ impl MemtableBuilder for DefaultMemtableBuilder {
))
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::flush::{WriteBufferManager, WriteBufferManagerImpl};

#[test]
fn test_alloc_tracker_without_manager() {
let tracker = AllocTracker::new(None);
assert_eq!(0, tracker.bytes_allocated());
tracker.on_allocation(100);
assert_eq!(100, tracker.bytes_allocated());
tracker.on_allocation(200);
assert_eq!(300, tracker.bytes_allocated());

tracker.done_allocating();
assert_eq!(300, tracker.bytes_allocated());
}

#[test]
fn test_alloc_tracker_with_manager() {
let manager = Arc::new(WriteBufferManagerImpl::new(1000));
{
let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));

tracker.on_allocation(100);
assert_eq!(100, tracker.bytes_allocated());
assert_eq!(100, manager.memory_usage());
assert_eq!(100, manager.mutable_usage());

for _ in 0..2 {
// Done allocating won't free the same memory multiple times.
tracker.done_allocating();
assert_eq!(100, manager.memory_usage());
assert_eq!(0, manager.mutable_usage());
}
}

assert_eq!(0, manager.memory_usage());
assert_eq!(0, manager.mutable_usage());
}

#[test]
fn test_alloc_tracker_without_done_allocating() {
let manager = Arc::new(WriteBufferManagerImpl::new(1000));
{
let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));

tracker.on_allocation(100);
assert_eq!(100, tracker.bytes_allocated());
assert_eq!(100, manager.memory_usage());
assert_eq!(100, manager.mutable_usage());
}

assert_eq!(0, manager.memory_usage());
assert_eq!(0, manager.mutable_usage());
}
}
31 changes: 26 additions & 5 deletions src/mito2/src/memtable/time_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;

use crate::error::{ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result};
use crate::flush::WriteBufferManagerRef;
use crate::memtable::{
AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId,
MemtableRef, MemtableStats,
Expand All @@ -43,15 +44,31 @@ use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
/// Initial vector builder capacity.
const INITIAL_BUILDER_CAPACITY: usize = 32;

/// Builder to build [TimeSeriesMemtable].
#[derive(Debug, Default)]
pub struct TimeSeriesMemtableBuilder {
id: AtomicU32,
write_buffer_manager: Option<WriteBufferManagerRef>,
}

impl TimeSeriesMemtableBuilder {
/// Creates a new builder with specific `write_buffer_manager`.
pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> Self {
Self {
id: AtomicU32::new(0),
write_buffer_manager,
}
}
}

impl MemtableBuilder for TimeSeriesMemtableBuilder {
fn build(&self, metadata: &RegionMetadataRef) -> MemtableRef {
let id = self.id.fetch_add(1, Ordering::Relaxed);
Arc::new(TimeSeriesMemtable::new(metadata.clone(), id))
Arc::new(TimeSeriesMemtable::new(
metadata.clone(),
id,
self.write_buffer_manager.clone(),
))
}
}

Expand All @@ -67,7 +84,11 @@ pub struct TimeSeriesMemtable {
}

impl TimeSeriesMemtable {
pub fn new(region_metadata: RegionMetadataRef, id: MemtableId) -> Self {
pub fn new(
region_metadata: RegionMetadataRef,
id: MemtableId,
write_buffer_manager: Option<WriteBufferManagerRef>,
) -> Self {
let row_codec = McmpRowCodec::new(
region_metadata
.primary_key_columns()
Expand All @@ -80,7 +101,7 @@ impl TimeSeriesMemtable {
region_metadata,
series_set,
row_codec,
alloc_tracker: AllocTracker::default(),
alloc_tracker: AllocTracker::new(write_buffer_manager),
max_timestamp: AtomicI64::new(i64::MIN),
min_timestamp: AtomicI64::new(i64::MAX),
}
Expand Down Expand Up @@ -873,7 +894,7 @@ mod tests {
common_telemetry::init_default_ut_logging();
let schema = schema_for_test();
let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
let memtable = TimeSeriesMemtable::new(schema, 42);
let memtable = TimeSeriesMemtable::new(schema, 42, None);
memtable.write(&kvs).unwrap();

let expected_ts = kvs
Expand Down Expand Up @@ -904,7 +925,7 @@ mod tests {
common_telemetry::init_default_ut_logging();
let schema = schema_for_test();
let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
let memtable = TimeSeriesMemtable::new(schema, 42);
let memtable = TimeSeriesMemtable::new(schema, 42, None);
memtable.write(&kvs).unwrap();

let iter = memtable.iter(Some(&[3]), &[]);
Expand Down
5 changes: 2 additions & 3 deletions src/mito2/src/memtable/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ impl MemtableVersion {
}

/// Returns the memory usage of the mutable memtable.
pub(crate) fn mutable_bytes_usage(&self) -> usize {
// TODO(yingwen): Get memtable usage.
0
pub(crate) fn mutable_usage(&self) -> usize {
self.mutable.stats().estimated_bytes
}
}
8 changes: 6 additions & 2 deletions src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ impl WorkerGroup {
) -> WorkerGroup {
assert!(config.num_workers.is_power_of_two());
let config = Arc::new(config);
let write_buffer_manager = Arc::new(WriteBufferManagerImpl {});
let write_buffer_manager = Arc::new(WriteBufferManagerImpl::new(
config.global_write_buffer_size.as_bytes() as usize,
));
let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));

let workers = (0..config.num_workers)
Expand Down Expand Up @@ -198,7 +200,9 @@ impl<S: LogStore> WorkerStarter<S> {
wal: Wal::new(self.log_store),
object_store: self.object_store,
running: running.clone(),
memtable_builder: Arc::new(TimeSeriesMemtableBuilder::default()),
memtable_builder: Arc::new(TimeSeriesMemtableBuilder::new(Some(
self.write_buffer_manager.clone(),
))),
scheduler: self.scheduler.clone(),
write_buffer_manager: self.write_buffer_manager,
flush_scheduler: FlushScheduler::new(self.scheduler),
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/worker/handle_flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl<S> RegionWorkerLoop<S> {
}

let version = region.version();
let region_mutable_size = version.memtables.mutable_bytes_usage();
let region_mutable_size = version.memtables.mutable_usage();
// Tracks region with max mutable memtable size.
if region_mutable_size > max_mutable_size {
max_mem_region = Some(region);
Expand Down

0 comments on commit 50220f8

Please sign in to comment.