Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(storage): fix flush small files when the capacity of shared-buffer is full #15832

Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,9 @@ pub struct StorageConfig {
#[serde(default = "default::storage::max_concurrent_compaction_task_number")]
pub max_concurrent_compaction_task_number: u64,

#[serde(default = "default::storage::max_upload_task_number")]
pub max_upload_task_number: u64,
Little-Wallace marked this conversation as resolved.
Show resolved Hide resolved

#[serde(default = "default::storage::max_preload_wait_time_mill")]
pub max_preload_wait_time_mill: u64,

Expand Down Expand Up @@ -1309,6 +1312,10 @@ pub mod default {
16
}

pub fn max_upload_task_number() -> u64 {
4
}

pub fn max_preload_wait_time_mill() -> u64 {
0
}
Expand Down
1 change: 1 addition & 0 deletions src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ This page is automatically generated by `./risedev generate-example-config`
| max_preload_io_retry_times | | 3 |
| max_preload_wait_time_mill | | 0 |
| max_sub_compaction | Max sub compaction task numbers | 4 |
| max_upload_task_number | | 4 |
| max_version_pinning_duration_sec | | 10800 |
| mem_table_spill_threshold | The spill threshold for mem table. | 4194304 |
| meta_cache_capacity_mb | DEPRECATED: This config will be deprecated in the future version, use `storage.cache.meta_cache_capacity_mb` instead. | |
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ sstable_id_remote_fetch_number = 10
min_sst_size_for_streaming_upload = 33554432
max_sub_compaction = 4
max_concurrent_compaction_task_number = 16
max_upload_task_number = 4
max_preload_wait_time_mill = 0
max_version_pinning_duration_sec = 10800
compactor_max_sst_key_count = 2097152
Expand Down
96 changes: 46 additions & 50 deletions src/storage/hummock_test/src/hummock_read_version_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,14 @@ async fn test_read_version_basic() {
Bound::Included(Bytes::from(key.to_vec())),
));

let (staging_imm_iter, staging_sst_iter) =
let staging_data_iter =
read_version
.staging()
.prune_overlap(epoch, TableId::default(), &key_range);

let staging_imm = staging_imm_iter.cloned().collect_vec();
let staging_imm = staging_data_iter.cloned().collect_vec();

assert_eq!(1, staging_imm.len());
assert_eq!(0, staging_sst_iter.count());
assert!(staging_imm.iter().any(|imm| imm.min_epoch() <= epoch));
}

Expand Down Expand Up @@ -108,39 +107,35 @@ async fn test_read_version_basic() {
Bound::Included(Bytes::from(key.to_vec())),
Bound::Included(Bytes::from(key.to_vec())),
));
let (staging_imm_iter, staging_sst_iter) =
let staging_data_iter =
read_version
.staging()
.prune_overlap(epoch, TableId::default(), &key_range);

let staging_imm = staging_imm_iter.cloned().collect_vec();
let staging_imm = staging_data_iter.cloned().collect_vec();

assert_eq!(1, staging_imm.len() as u64);
assert_eq!(0, staging_sst_iter.count());
assert!(staging_imm.iter().any(|imm| imm.min_epoch() <= epoch));
}
}

{
// test clean imm with sst update info
let staging = read_version.staging();
assert_eq!(6, staging.imm.len());
assert_eq!(6, staging.data.len());
let batch_id_vec_for_clear = staging
.imm
.data
.iter()
.rev()
.map(|imm| imm.batch_id())
.flat_map(|data| data.to_imm().map(|imm| imm.batch_id()))
.take(3)
.rev()
.collect::<Vec<_>>();

let epoch_id_vec_for_clear = staging
.imm
.data
.iter()
.rev()
.flat_map(|data| data.to_imm())
.map(|imm| imm.min_epoch())
.take(3)
.rev()
.collect::<Vec<_>>();

let dummy_sst = Arc::new(StagingSstableInfo::new(
Expand Down Expand Up @@ -195,13 +190,11 @@ async fn test_read_version_basic() {
// imm(0, 1, 2) => sst{sst_object_id: 1}
// staging => {imm(3, 4, 5), sst[{sst_object_id: 1}, {sst_object_id: 2}]}
let staging = read_version.staging();
assert_eq!(3, read_version.staging().imm.len());
assert_eq!(1, read_version.staging().sst.len());
assert_eq!(2, read_version.staging().sst[0].sstable_infos().len());
assert_eq!(4, read_version.staging().data.len());
let remain_batch_id_vec = staging
.imm
.data
.iter()
.map(|imm| imm.batch_id())
.flat_map(|data| data.to_imm().map(|imm| imm.batch_id()))
.collect::<Vec<_>>();
assert!(remain_batch_id_vec.iter().any(|batch_id| *batch_id > 2));
}
Expand All @@ -215,20 +208,25 @@ async fn test_read_version_basic() {
Bound::Included(Bytes::from(key_range_right)),
));

let (staging_imm_iter, staging_sst_iter) =
let staging_data_iter =
read_version
.staging()
.prune_overlap(epoch, TableId::default(), &key_range);

let staging_imm = staging_imm_iter.cloned().collect_vec();
assert_eq!(1, staging_imm.len());
let staging_data = staging_data_iter.cloned().collect_vec();
assert_eq!(2, staging_data.len());

assert_eq!(test_epoch(4), staging_imm[0].min_epoch());
assert_eq!(test_epoch(4), staging_data[0].min_epoch());

let staging_ssts = staging_sst_iter.cloned().collect_vec();
assert_eq!(2, staging_ssts.len());
assert_eq!(1, staging_ssts[0].get_object_id());
assert_eq!(2, staging_ssts[1].get_object_id());
match &staging_data[1] {
StagingData::Sst(sst) => {
assert_eq!(1, sst.sstable_infos()[0].sst_info.get_object_id());
assert_eq!(2, sst.sstable_infos()[1].sst_info.get_object_id());
}
StagingData::ImmMem(_) => {
unreachable!("can not be immemtable");
}
}
}

{
Expand All @@ -240,18 +238,18 @@ async fn test_read_version_basic() {
Bound::Included(Bytes::from(key_range_right)),
));

let (staging_imm_iter, staging_sst_iter) =
let staging_data_iter =
read_version
.staging()
.prune_overlap(epoch, TableId::default(), &key_range);

let staging_imm = staging_imm_iter.cloned().collect_vec();
assert_eq!(1, staging_imm.len());
assert_eq!(test_epoch(4), staging_imm[0].min_epoch());
let staging_data = staging_data_iter.cloned().collect_vec();
assert_eq!(2, staging_data.len());
assert_eq!(test_epoch(4), staging_data[0].min_epoch());

let staging_ssts = staging_sst_iter.cloned().collect_vec();
assert_eq!(1, staging_ssts.len());
assert_eq!(2, staging_ssts[0].get_object_id());
// let staging_ssts = staging_sst_iter.cloned().collect_vec();
// assert_eq!(1, staging_ssts.len());
// assert_eq!(2, staging_ssts[0].get_object_id());
}
}

Expand Down Expand Up @@ -294,23 +292,22 @@ async fn test_read_filter_basic() {
let key = Bytes::from(iterator_test_table_key_of(epoch as usize));
let key_range = map_table_key_range((Bound::Included(key.clone()), Bound::Included(key)));

let (staging_imm, staging_sst) = {
let staging_data = {
let read_guard = read_version.read();
let (staging_imm_iter, staging_sst_iter) = {
read_guard
.staging()
.prune_overlap(epoch, TableId::default(), &key_range)
};

(
staging_imm_iter.cloned().collect_vec(),
staging_sst_iter.cloned().collect_vec(),
)
read_guard
.staging()
.prune_overlap(epoch, TableId::default(), &key_range)
.cloned()
.collect_vec()
};

assert_eq!(1, staging_imm.len());
assert_eq!(0, staging_sst.len());
assert!(staging_imm.iter().any(|imm| imm.min_epoch() <= epoch));
assert_eq!(1, staging_data.len());
assert!(staging_data.iter().any(|data| {
match data {
StagingData::ImmMem(imm) => imm.min_epoch() <= epoch,
StagingData::Sst(_) => unreachable!("can not be sstable"),
}
}));

// test read_filter_for_version
{
Expand All @@ -320,10 +317,9 @@ async fn test_read_filter_basic() {
.unwrap();

assert_eq!(1, hummock_read_snapshot.0.len());
assert_eq!(0, hummock_read_snapshot.1.len());
assert_eq!(
read_version.read().committed().max_committed_epoch(),
hummock_read_snapshot.2.max_committed_epoch()
hummock_read_snapshot.1.max_committed_epoch()
);
}
}
Expand Down
8 changes: 3 additions & 5 deletions src/storage/hummock_test/src/hummock_storage_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,8 +539,7 @@ async fn test_state_store_sync() {
{
// after sync 1 epoch
let read_version = hummock_storage.read_version();
assert_eq!(1, read_version.read().staging().imm.len());
assert!(read_version.read().staging().sst.is_empty());
assert_eq!(1, read_version.read().staging().data.len());
}

{
Expand Down Expand Up @@ -581,8 +580,7 @@ async fn test_state_store_sync() {
{
// after sync all epoch
let read_version = hummock_storage.read_version();
assert!(read_version.read().staging().imm.is_empty());
assert!(read_version.read().staging().sst.is_empty());
assert!(read_version.read().staging().data.is_empty());
}

{
Expand Down Expand Up @@ -1461,7 +1459,7 @@ async fn test_hummock_version_reader() {
.read()
.committed()
.max_committed_epoch(),
read_snapshot.2.max_committed_epoch()
read_snapshot.1.max_committed_epoch()
);

let iter = hummock_version_reader
Expand Down
Loading