Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(storage): fix flush small files when the capacity of shared-buffer is full #15832

Closed
96 changes: 46 additions & 50 deletions src/storage/hummock_test/src/hummock_read_version_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,14 @@ async fn test_read_version_basic() {
Bound::Included(Bytes::from(key.to_vec())),
));

let (staging_imm_iter, staging_sst_iter) =
let staging_data_iter =
read_version
.staging()
.prune_overlap(epoch, TableId::default(), &key_range);

let staging_imm = staging_imm_iter.cloned().collect_vec();
let staging_imm = staging_data_iter.cloned().collect_vec();

assert_eq!(1, staging_imm.len());
assert_eq!(0, staging_sst_iter.count());
assert!(staging_imm.iter().any(|imm| imm.min_epoch() <= epoch));
}

Expand Down Expand Up @@ -108,39 +107,35 @@ async fn test_read_version_basic() {
Bound::Included(Bytes::from(key.to_vec())),
Bound::Included(Bytes::from(key.to_vec())),
));
let (staging_imm_iter, staging_sst_iter) =
let staging_data_iter =
read_version
.staging()
.prune_overlap(epoch, TableId::default(), &key_range);

let staging_imm = staging_imm_iter.cloned().collect_vec();
let staging_imm = staging_data_iter.cloned().collect_vec();

assert_eq!(1, staging_imm.len() as u64);
assert_eq!(0, staging_sst_iter.count());
assert!(staging_imm.iter().any(|imm| imm.min_epoch() <= epoch));
}
}

{
// test clean imm with sst update info
let staging = read_version.staging();
assert_eq!(6, staging.imm.len());
assert_eq!(6, staging.data.len());
let batch_id_vec_for_clear = staging
.imm
.data
.iter()
.rev()
.map(|imm| imm.batch_id())
.flat_map(|data| data.to_imm().map(|imm| imm.batch_id()))
.take(3)
.rev()
.collect::<Vec<_>>();

let epoch_id_vec_for_clear = staging
.imm
.data
.iter()
.rev()
.flat_map(|data| data.to_imm())
.map(|imm| imm.min_epoch())
.take(3)
.rev()
.collect::<Vec<_>>();

let dummy_sst = Arc::new(StagingSstableInfo::new(
Expand Down Expand Up @@ -195,13 +190,11 @@ async fn test_read_version_basic() {
// imm(0, 1, 2) => sst{sst_object_id: 1}
// staging => {imm(3, 4, 5), sst[{sst_object_id: 1}, {sst_object_id: 2}]}
let staging = read_version.staging();
assert_eq!(3, read_version.staging().imm.len());
assert_eq!(1, read_version.staging().sst.len());
assert_eq!(2, read_version.staging().sst[0].sstable_infos().len());
assert_eq!(4, read_version.staging().data.len());
let remain_batch_id_vec = staging
.imm
.data
.iter()
.map(|imm| imm.batch_id())
.flat_map(|data| data.to_imm().map(|imm| imm.batch_id()))
.collect::<Vec<_>>();
assert!(remain_batch_id_vec.iter().any(|batch_id| *batch_id > 2));
}
Expand All @@ -215,20 +208,25 @@ async fn test_read_version_basic() {
Bound::Included(Bytes::from(key_range_right)),
));

let (staging_imm_iter, staging_sst_iter) =
let staging_data_iter =
read_version
.staging()
.prune_overlap(epoch, TableId::default(), &key_range);

let staging_imm = staging_imm_iter.cloned().collect_vec();
assert_eq!(1, staging_imm.len());
let staging_data = staging_data_iter.cloned().collect_vec();
assert_eq!(2, staging_data.len());

assert_eq!(test_epoch(4), staging_imm[0].min_epoch());
assert_eq!(test_epoch(4), staging_data[0].min_epoch());

let staging_ssts = staging_sst_iter.cloned().collect_vec();
assert_eq!(2, staging_ssts.len());
assert_eq!(1, staging_ssts[0].get_object_id());
assert_eq!(2, staging_ssts[1].get_object_id());
match &staging_data[1] {
StagingData::Sst(sst) => {
assert_eq!(1, sst.sstable_infos()[0].sst_info.get_object_id());
assert_eq!(2, sst.sstable_infos()[1].sst_info.get_object_id());
}
StagingData::ImmMem(_) => {
unreachable!("can not be immemtable");
}
}
}

{
Expand All @@ -240,18 +238,18 @@ async fn test_read_version_basic() {
Bound::Included(Bytes::from(key_range_right)),
));

let (staging_imm_iter, staging_sst_iter) =
let staging_data_iter =
read_version
.staging()
.prune_overlap(epoch, TableId::default(), &key_range);

let staging_imm = staging_imm_iter.cloned().collect_vec();
assert_eq!(1, staging_imm.len());
assert_eq!(test_epoch(4), staging_imm[0].min_epoch());
let staging_data = staging_data_iter.cloned().collect_vec();
assert_eq!(2, staging_data.len());
assert_eq!(test_epoch(4), staging_data[0].min_epoch());

let staging_ssts = staging_sst_iter.cloned().collect_vec();
assert_eq!(1, staging_ssts.len());
assert_eq!(2, staging_ssts[0].get_object_id());
// let staging_ssts = staging_sst_iter.cloned().collect_vec();
// assert_eq!(1, staging_ssts.len());
// assert_eq!(2, staging_ssts[0].get_object_id());
}
}

Expand Down Expand Up @@ -294,23 +292,22 @@ async fn test_read_filter_basic() {
let key = Bytes::from(iterator_test_table_key_of(epoch as usize));
let key_range = map_table_key_range((Bound::Included(key.clone()), Bound::Included(key)));

let (staging_imm, staging_sst) = {
let staging_data = {
let read_guard = read_version.read();
let (staging_imm_iter, staging_sst_iter) = {
read_guard
.staging()
.prune_overlap(epoch, TableId::default(), &key_range)
};

(
staging_imm_iter.cloned().collect_vec(),
staging_sst_iter.cloned().collect_vec(),
)
read_guard
.staging()
.prune_overlap(epoch, TableId::default(), &key_range)
.cloned()
.collect_vec()
};

assert_eq!(1, staging_imm.len());
assert_eq!(0, staging_sst.len());
assert!(staging_imm.iter().any(|imm| imm.min_epoch() <= epoch));
assert_eq!(1, staging_data.len());
assert!(staging_data.iter().any(|data| {
match data {
StagingData::ImmMem(imm) => imm.min_epoch() <= epoch,
StagingData::Sst(_) => unreachable!("can not be sstable"),
}
}));

// test read_filter_for_version
{
Expand All @@ -320,10 +317,9 @@ async fn test_read_filter_basic() {
.unwrap();

assert_eq!(1, hummock_read_snapshot.0.len());
assert_eq!(0, hummock_read_snapshot.1.len());
assert_eq!(
read_version.read().committed().max_committed_epoch(),
hummock_read_snapshot.2.max_committed_epoch()
hummock_read_snapshot.1.max_committed_epoch()
);
}
}
Expand Down
8 changes: 3 additions & 5 deletions src/storage/hummock_test/src/hummock_storage_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,8 +539,7 @@ async fn test_state_store_sync() {
{
// after sync 1 epoch
let read_version = hummock_storage.read_version();
assert_eq!(1, read_version.read().staging().imm.len());
assert!(read_version.read().staging().sst.is_empty());
assert_eq!(1, read_version.read().staging().data.len());
}

{
Expand Down Expand Up @@ -581,8 +580,7 @@ async fn test_state_store_sync() {
{
// after sync all epoch
let read_version = hummock_storage.read_version();
assert!(read_version.read().staging().imm.is_empty());
assert!(read_version.read().staging().sst.is_empty());
assert!(read_version.read().staging().data.is_empty());
}

{
Expand Down Expand Up @@ -1461,7 +1459,7 @@ async fn test_hummock_version_reader() {
.read()
.committed()
.max_committed_epoch(),
read_snapshot.2.max_committed_epoch()
read_snapshot.1.max_committed_epoch()
);

let iter = hummock_version_reader
Expand Down
Loading
Loading