Skip to content

Commit

Permalink
fix flush small files
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace committed Mar 25, 2024
1 parent fb7fadb commit a144d7c
Show file tree
Hide file tree
Showing 8 changed files with 397 additions and 772 deletions.
96 changes: 46 additions & 50 deletions src/storage/hummock_test/src/hummock_read_version_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,14 @@ async fn test_read_version_basic() {
Bound::Included(Bytes::from(key.to_vec())),
));

let (staging_imm_iter, staging_sst_iter) =
let staging_data_iter =
read_version
.staging()
.prune_overlap(epoch, TableId::default(), &key_range);

let staging_imm = staging_imm_iter.cloned().collect_vec();
let staging_imm = staging_data_iter.cloned().collect_vec();

assert_eq!(1, staging_imm.len());
assert_eq!(0, staging_sst_iter.count());
assert!(staging_imm.iter().any(|imm| imm.min_epoch() <= epoch));
}

Expand Down Expand Up @@ -108,39 +107,35 @@ async fn test_read_version_basic() {
Bound::Included(Bytes::from(key.to_vec())),
Bound::Included(Bytes::from(key.to_vec())),
));
let (staging_imm_iter, staging_sst_iter) =
let staging_data_iter =
read_version
.staging()
.prune_overlap(epoch, TableId::default(), &key_range);

let staging_imm = staging_imm_iter.cloned().collect_vec();
let staging_imm = staging_data_iter.cloned().collect_vec();

assert_eq!(1, staging_imm.len() as u64);
assert_eq!(0, staging_sst_iter.count());
assert!(staging_imm.iter().any(|imm| imm.min_epoch() <= epoch));
}
}

{
// test clean imm with sst update info
let staging = read_version.staging();
assert_eq!(6, staging.imm.len());
assert_eq!(6, staging.data.len());
let batch_id_vec_for_clear = staging
.imm
.data
.iter()
.rev()
.map(|imm| imm.batch_id())
.flat_map(|data| data.to_imm().map(|imm| imm.batch_id()))
.take(3)
.rev()
.collect::<Vec<_>>();

let epoch_id_vec_for_clear = staging
.imm
.data
.iter()
.rev()
.flat_map(|data| data.to_imm())
.map(|imm| imm.min_epoch())
.take(3)
.rev()
.collect::<Vec<_>>();

let dummy_sst = Arc::new(StagingSstableInfo::new(
Expand Down Expand Up @@ -195,13 +190,11 @@ async fn test_read_version_basic() {
// imm(0, 1, 2) => sst{sst_object_id: 1}
// staging => {imm(3, 4, 5), sst[{sst_object_id: 1}, {sst_object_id: 2}]}
let staging = read_version.staging();
assert_eq!(3, read_version.staging().imm.len());
assert_eq!(1, read_version.staging().sst.len());
assert_eq!(2, read_version.staging().sst[0].sstable_infos().len());
assert_eq!(4, read_version.staging().data.len());
let remain_batch_id_vec = staging
.imm
.data
.iter()
.map(|imm| imm.batch_id())
.flat_map(|data| data.to_imm().map(|imm| imm.batch_id()))
.collect::<Vec<_>>();
assert!(remain_batch_id_vec.iter().any(|batch_id| *batch_id > 2));
}
Expand All @@ -215,20 +208,25 @@ async fn test_read_version_basic() {
Bound::Included(Bytes::from(key_range_right)),
));

let (staging_imm_iter, staging_sst_iter) =
let staging_data_iter =
read_version
.staging()
.prune_overlap(epoch, TableId::default(), &key_range);

let staging_imm = staging_imm_iter.cloned().collect_vec();
assert_eq!(1, staging_imm.len());
let staging_data = staging_data_iter.cloned().collect_vec();
assert_eq!(2, staging_data.len());

assert_eq!(test_epoch(4), staging_imm[0].min_epoch());
assert_eq!(test_epoch(4), staging_data[0].min_epoch());

let staging_ssts = staging_sst_iter.cloned().collect_vec();
assert_eq!(2, staging_ssts.len());
assert_eq!(1, staging_ssts[0].get_object_id());
assert_eq!(2, staging_ssts[1].get_object_id());
match &staging_data[1] {
StagingData::Sst(sst) => {
assert_eq!(1, sst.sstable_infos()[0].sst_info.get_object_id());
assert_eq!(2, sst.sstable_infos()[1].sst_info.get_object_id());
}
StagingData::ImmMem(_) => {
unreachable!("can not be immemtable");
}
}
}

{
Expand All @@ -240,18 +238,18 @@ async fn test_read_version_basic() {
Bound::Included(Bytes::from(key_range_right)),
));

let (staging_imm_iter, staging_sst_iter) =
let staging_data_iter =
read_version
.staging()
.prune_overlap(epoch, TableId::default(), &key_range);

let staging_imm = staging_imm_iter.cloned().collect_vec();
assert_eq!(1, staging_imm.len());
assert_eq!(test_epoch(4), staging_imm[0].min_epoch());
let staging_data = staging_data_iter.cloned().collect_vec();
assert_eq!(2, staging_data.len());
assert_eq!(test_epoch(4), staging_data[0].min_epoch());

let staging_ssts = staging_sst_iter.cloned().collect_vec();
assert_eq!(1, staging_ssts.len());
assert_eq!(2, staging_ssts[0].get_object_id());
// let staging_ssts = staging_sst_iter.cloned().collect_vec();
// assert_eq!(1, staging_ssts.len());
// assert_eq!(2, staging_ssts[0].get_object_id());
}
}

Expand Down Expand Up @@ -294,23 +292,22 @@ async fn test_read_filter_basic() {
let key = Bytes::from(iterator_test_table_key_of(epoch as usize));
let key_range = map_table_key_range((Bound::Included(key.clone()), Bound::Included(key)));

let (staging_imm, staging_sst) = {
let staging_data = {
let read_guard = read_version.read();
let (staging_imm_iter, staging_sst_iter) = {
read_guard
.staging()
.prune_overlap(epoch, TableId::default(), &key_range)
};

(
staging_imm_iter.cloned().collect_vec(),
staging_sst_iter.cloned().collect_vec(),
)
read_guard
.staging()
.prune_overlap(epoch, TableId::default(), &key_range)
.cloned()
.collect_vec()
};

assert_eq!(1, staging_imm.len());
assert_eq!(0, staging_sst.len());
assert!(staging_imm.iter().any(|imm| imm.min_epoch() <= epoch));
assert_eq!(1, staging_data.len());
assert!(staging_data.iter().any(|data| {
match data {
StagingData::ImmMem(imm) => imm.min_epoch() <= epoch,
StagingData::Sst(_) => unreachable!("can not be sstable"),
}
}));

// test read_filter_for_version
{
Expand All @@ -320,10 +317,9 @@ async fn test_read_filter_basic() {
.unwrap();

assert_eq!(1, hummock_read_snapshot.0.len());
assert_eq!(0, hummock_read_snapshot.1.len());
assert_eq!(
read_version.read().committed().max_committed_epoch(),
hummock_read_snapshot.2.max_committed_epoch()
hummock_read_snapshot.1.max_committed_epoch()
);
}
}
Expand Down
8 changes: 3 additions & 5 deletions src/storage/hummock_test/src/hummock_storage_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,8 +539,7 @@ async fn test_state_store_sync() {
{
// after sync 1 epoch
let read_version = hummock_storage.read_version();
assert_eq!(1, read_version.read().staging().imm.len());
assert!(read_version.read().staging().sst.is_empty());
assert_eq!(1, read_version.read().staging().data.len());
}

{
Expand Down Expand Up @@ -581,8 +580,7 @@ async fn test_state_store_sync() {
{
// after sync all epoch
let read_version = hummock_storage.read_version();
assert!(read_version.read().staging().imm.is_empty());
assert!(read_version.read().staging().sst.is_empty());
assert!(read_version.read().staging().data.is_empty());
}

{
Expand Down Expand Up @@ -1461,7 +1459,7 @@ async fn test_hummock_version_reader() {
.read()
.committed()
.max_committed_epoch(),
read_snapshot.2.max_committed_epoch()
read_snapshot.1.max_committed_epoch()
);

let iter = hummock_version_reader
Expand Down
Loading

0 comments on commit a144d7c

Please sign in to comment.