Skip to content

Commit

Permalink
feat(storage): support setting minimum spill batch flush size (#16962)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Jun 3, 2024
1 parent 3732975 commit 144c7cc
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 24 deletions.
9 changes: 9 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,11 @@ pub struct StorageConfig {
#[serde(default = "default::storage::shared_buffer_flush_ratio")]
pub shared_buffer_flush_ratio: f32,

/// The minimum total flush size of shared buffer spill. When a shared buffer spilled is trigger,
/// the total flush size across multiple epochs should be at least higher than this size.
#[serde(default = "default::storage::shared_buffer_min_batch_flush_size_mb")]
pub shared_buffer_min_batch_flush_size_mb: usize,

/// The threshold for the number of immutable memtables to merge to a new imm.
#[serde(default = "default::storage::imm_merge_threshold")]
#[deprecated]
Expand Down Expand Up @@ -1370,6 +1375,10 @@ pub mod default {
0.8
}

pub fn shared_buffer_min_batch_flush_size_mb() -> usize {
800
}

pub fn imm_merge_threshold() -> usize {
0 // disable
}
Expand Down
1 change: 1 addition & 0 deletions src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ This page is automatically generated by `./risedev generate-example-config`
| share_buffers_sync_parallelism | parallelism while syncing share buffers into L0 SST. Should NOT be 0. | 1 |
| shared_buffer_capacity_mb | Maximum shared buffer size, writes attempting to exceed the capacity will stall until there is enough space. | |
| shared_buffer_flush_ratio | The shared buffer will start flushing data to object when the ratio of memory usage to the shared buffer capacity exceed such ratio. | 0.800000011920929 |
| shared_buffer_min_batch_flush_size_mb | The minimum total flush size of shared buffer spill. When a shared buffer spilled is trigger, the total flush size across multiple epochs should be at least higher than this size. | 800 |
| sstable_id_remote_fetch_number | Number of SST ids fetched from meta per RPC | 10 |
| write_conflict_detection_enabled | Whether to enable write conflict detection | true |

Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ stream_high_join_amplification_threshold = 2048
share_buffers_sync_parallelism = 1
share_buffer_compaction_worker_threads_number = 4
shared_buffer_flush_ratio = 0.800000011920929
shared_buffer_min_batch_flush_size_mb = 800
imm_merge_threshold = 0
write_conflict_detection_enabled = true
max_prefetch_block_number = 16
Expand Down
25 changes: 22 additions & 3 deletions src/storage/src/hummock/event_handler/hummock_event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ use crate::opts::StorageOpts;
#[derive(Clone)]
pub struct BufferTracker {
flush_threshold: usize,
min_batch_flush_size: usize,
global_buffer: Arc<MemoryLimiter>,
global_upload_task_size: GenericGauge<AtomicU64>,
}
Expand All @@ -75,25 +76,34 @@ impl BufferTracker {
) -> Self {
let capacity = config.shared_buffer_capacity_mb * (1 << 20);
let flush_threshold = (capacity as f32 * config.shared_buffer_flush_ratio) as usize;
let shared_buffer_min_batch_flush_size =
config.shared_buffer_min_batch_flush_size_mb * (1 << 20);
assert!(
flush_threshold < capacity,
"flush_threshold {} should be less or equal to capacity {}",
flush_threshold,
capacity
);
Self::new(capacity, flush_threshold, global_upload_task_size)
Self::new(
capacity,
flush_threshold,
global_upload_task_size,
shared_buffer_min_batch_flush_size,
)
}

pub fn new(
fn new(
capacity: usize,
flush_threshold: usize,
global_upload_task_size: GenericGauge<AtomicU64>,
min_batch_flush_size: usize,
) -> Self {
assert!(capacity >= flush_threshold);
Self {
flush_threshold,
global_buffer: Arc::new(MemoryLimiter::new(capacity as u64)),
global_upload_task_size,
min_batch_flush_size,
}
}

Expand All @@ -118,9 +128,18 @@ impl BufferTracker {

/// Return true when the buffer size minus current upload task size is still greater than the
/// flush threshold.
pub fn need_more_flush(&self) -> bool {
pub fn need_flush(&self) -> bool {
self.get_buffer_size() > self.flush_threshold + self.global_upload_task_size.get() as usize
}

pub fn need_more_flush(&self, curr_batch_flush_size: usize) -> bool {
curr_batch_flush_size < self.min_batch_flush_size || self.need_flush()
}

#[cfg(test)]
pub(crate) fn flush_threshold(&self) -> usize {
self.flush_threshold
}
}

#[derive(Clone)]
Expand Down
113 changes: 92 additions & 21 deletions src/storage/src/hummock/event_handler/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ struct UnsealedEpochData {
}

impl UnsealedEpochData {
fn flush(&mut self, context: &UploaderContext) {
fn flush(&mut self, context: &UploaderContext) -> usize {
let imms: HashMap<_, _> = take(&mut self.imms)
.into_iter()
.map(|(id, imms)| (id, imms.into_iter().collect_vec()))
Expand All @@ -303,7 +303,11 @@ impl UnsealedEpochData {
.spill_task_size_from_unsealed
.inc_by(task.task_info.task_size as u64);
info!("Spill unsealed data. Task: {}", task.get_task_info());
let size = task.task_info.task_size;
self.spilled_data.add_task(task);
size
} else {
0
}
}

Expand Down Expand Up @@ -446,14 +450,15 @@ impl SealedData {
}

// Flush can be triggered by either a sync_epoch or a spill (`may_flush`) request.
fn flush(&mut self, context: &UploaderContext, is_spilled: bool) {
let payload: HashMap<_, _> = take(&mut self.imms_by_table_shard)
fn flush(&mut self, context: &UploaderContext, is_spilled: bool) -> usize {
let payload: HashMap<_, Vec<_>> = take(&mut self.imms_by_table_shard)
.into_iter()
.map(|(id, imms)| (id, imms.into_iter().collect()))
.collect();

if !payload.is_empty() {
let task = UploadingTask::new(payload, context);
let size = task.task_info.task_size;
if is_spilled {
context.stats.spill_task_counts_from_sealed.inc();
context
Expand All @@ -463,6 +468,9 @@ impl SealedData {
info!("Spill sealed data. Task: {}", task.get_task_info());
}
self.spilled_data.add_task(task);
size
} else {
0
}
}

Expand Down Expand Up @@ -805,19 +813,33 @@ impl HummockUploader {
}
}

pub(crate) fn may_flush(&mut self) {
if self.context.buffer_tracker.need_more_flush() {
self.sealed_data.flush(&self.context, true);
}
pub(crate) fn may_flush(&mut self) -> bool {
if self.context.buffer_tracker.need_flush() {
let mut curr_batch_flush_size = 0;
if self.context.buffer_tracker.need_flush() {
curr_batch_flush_size += self.sealed_data.flush(&self.context, true);
}

if self.context.buffer_tracker.need_more_flush() {
// iterate from older epoch to newer epoch
for unsealed_data in self.unsealed_data.values_mut() {
unsealed_data.flush(&self.context);
if !self.context.buffer_tracker.need_more_flush() {
break;
if self
.context
.buffer_tracker
.need_more_flush(curr_batch_flush_size)
{
// iterate from older epoch to newer epoch
for unsealed_data in self.unsealed_data.values_mut() {
curr_batch_flush_size += unsealed_data.flush(&self.context);
if !self
.context
.buffer_tracker
.need_more_flush(curr_batch_flush_size)
{
break;
}
}
}
curr_batch_flush_size > 0
} else {
false
}
}

Expand Down Expand Up @@ -1361,14 +1383,16 @@ mod tests {
assert_eq!(epoch6, uploader.max_sealed_epoch);
}

fn prepare_uploader_order_test() -> (
fn prepare_uploader_order_test(
config: &StorageOpts,
skip_schedule: bool,
) -> (
BufferTracker,
HummockUploader,
impl Fn(HashMap<LocalInstanceId, Vec<ImmId>>) -> (BoxFuture<'static, ()>, oneshot::Sender<()>),
) {
// flush threshold is 0. Flush anyway
let buffer_tracker =
BufferTracker::new(usize::MAX, 0, GenericGauge::new("test", "test").unwrap());
let gauge = GenericGauge::new("test", "test").unwrap();
let buffer_tracker = BufferTracker::from_storage_opts(config, gauge);
// (the started task send the imm ids of payload, the started task wait for finish notify)
#[allow(clippy::type_complexity)]
let task_notifier_holder: Arc<
Expand Down Expand Up @@ -1399,14 +1423,17 @@ mod tests {
Arc::new({
move |_, task_info: UploadTaskInfo| {
let task_notifier_holder = task_notifier_holder.clone();
let (start_tx, finish_rx) = task_notifier_holder.lock().pop_back().unwrap();
let task_item = task_notifier_holder.lock().pop_back();
let start_epoch = *task_info.epochs.last().unwrap();
let end_epoch = *task_info.epochs.first().unwrap();
assert!(end_epoch >= start_epoch);
spawn(async move {
let ssts = gen_sstable_info(start_epoch, end_epoch);
start_tx.send(task_info).unwrap();
finish_rx.await.unwrap();
if !skip_schedule {
let (start_tx, finish_rx) = task_item.unwrap();
start_tx.send(task_info).unwrap();
finish_rx.await.unwrap();
}
Ok(UploadTaskOutput {
new_value_ssts: ssts,
old_value_ssts: vec![],
Expand Down Expand Up @@ -1434,7 +1461,13 @@ mod tests {

#[tokio::test]
async fn test_uploader_finish_in_order() {
let (buffer_tracker, mut uploader, new_task_notifier) = prepare_uploader_order_test();
let config = StorageOpts {
shared_buffer_capacity_mb: 1024 * 1024,
shared_buffer_flush_ratio: 0.0,
..Default::default()
};
let (buffer_tracker, mut uploader, new_task_notifier) =
prepare_uploader_order_test(&config, false);

let epoch1 = INITIAL_EPOCH.next_epoch();
let epoch2 = epoch1.next_epoch();
Expand Down Expand Up @@ -1647,4 +1680,42 @@ mod tests {
// epoch2: sst([imm2])
// epoch4: sst([imm4, imm3_3]), sst([imm3_2]), sst([imm3_1])
}

#[tokio::test]
async fn test_uploader_frequently_flush() {
let config = StorageOpts {
shared_buffer_capacity_mb: 10,
shared_buffer_flush_ratio: 0.8,
// This test will fail when we set it to 0
shared_buffer_min_batch_flush_size_mb: 1,
..Default::default()
};
let (buffer_tracker, mut uploader, _new_task_notifier) =
prepare_uploader_order_test(&config, true);

let epoch1 = INITIAL_EPOCH.next_epoch();
let epoch2 = epoch1.next_epoch();
let flush_threshold = buffer_tracker.flush_threshold();
let memory_limiter = buffer_tracker.get_memory_limiter().clone();

// imm2 contains data in newer epoch, but added first
let mut total_memory = 0;
while total_memory < flush_threshold {
let imm = gen_imm_with_limiter(epoch2, Some(memory_limiter.as_ref())).await;
total_memory += imm.size();
if total_memory > flush_threshold {
break;
}
uploader.add_imm(imm);
}
let imm = gen_imm_with_limiter(epoch1, Some(memory_limiter.as_ref())).await;
uploader.add_imm(imm);
assert!(uploader.may_flush());

for _ in 0..10 {
let imm = gen_imm_with_limiter(epoch1, Some(memory_limiter.as_ref())).await;
uploader.add_imm(imm);
assert!(!uploader.may_flush());
}
}
}
4 changes: 4 additions & 0 deletions src/storage/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ pub struct StorageOpts {
/// The shared buffer will start flushing data to object when the ratio of memory usage to the
/// shared buffer capacity exceed such ratio.
pub shared_buffer_flush_ratio: f32,
/// The minimum total flush size of shared buffer spill. When a shared buffer spill is trigger,
/// the total flush size across multiple epochs should be at least higher than this size.
pub shared_buffer_min_batch_flush_size_mb: usize,
/// Remote directory for storing data and metadata objects.
pub data_directory: String,
/// Whether to enable write conflict detection
Expand Down Expand Up @@ -155,6 +158,7 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt
.share_buffer_compaction_worker_threads_number,
shared_buffer_capacity_mb: s.shared_buffer_capacity_mb,
shared_buffer_flush_ratio: c.storage.shared_buffer_flush_ratio,
shared_buffer_min_batch_flush_size_mb: c.storage.shared_buffer_min_batch_flush_size_mb,
data_directory: p.data_directory().to_string(),
write_conflict_detection_enabled: c.storage.write_conflict_detection_enabled,
block_cache_capacity_mb: s.block_cache_capacity_mb,
Expand Down

0 comments on commit 144c7cc

Please sign in to comment.