Skip to content

Commit

Permalink
remove async from ingest_batch and seal_current_epoch
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Dec 27, 2023
1 parent 5897e54 commit 50cd6e2
Show file tree
Hide file tree
Showing 29 changed files with 159 additions and 391 deletions.
7 changes: 1 addition & 6 deletions src/storage/hummock_test/benches/bench_hummock_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,7 @@ fn criterion_benchmark(c: &mut Criterion) {
))
.unwrap();
}
runtime
.block_on(
hummock_storage
.seal_current_epoch(HummockEpoch::MAX, SealCurrentEpochOptions::for_test()),
)
.unwrap();
hummock_storage.seal_current_epoch(HummockEpoch::MAX, SealCurrentEpochOptions::for_test());

c.bench_function("bench-hummock-iter", move |b| {
b.iter(|| {
Expand Down
17 changes: 5 additions & 12 deletions src/storage/hummock_test/src/bin/replay/replay_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,18 +205,11 @@ impl LocalReplay for LocalReplayImpl {
.map_err(|_| TraceError::Other("init failed"))
}

async fn seal_current_epoch(
&mut self,
next_epoch: u64,
opts: TracedSealCurrentEpochOptions,
) -> Result<()> {
self.0
.seal_current_epoch(
next_epoch,
opts.try_into().expect("should not fail to convert"),
)
.await
.map_err(|_| TraceError::Other("seal current epoch failed"))
fn seal_current_epoch(&mut self, next_epoch: u64, opts: TracedSealCurrentEpochOptions) {
self.0.seal_current_epoch(
next_epoch,
opts.try_into().expect("should not fail to convert"),
);
}

fn epoch(&self) -> u64 {
Expand Down
40 changes: 8 additions & 32 deletions src/storage/hummock_test/src/compactor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,9 @@ pub(crate) mod tests {
.await
.unwrap();
if i + 1 < epochs.len() {
local
.seal_current_epoch(epochs[i + 1], SealCurrentEpochOptions::for_test())
.await
.unwrap();
local.seal_current_epoch(epochs[i + 1], SealCurrentEpochOptions::for_test());
} else {
local
.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test())
.await
.unwrap();
local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test());
}
let ssts = storage
.seal_and_sync_epoch(epoch)
Expand Down Expand Up @@ -555,10 +549,7 @@ pub(crate) mod tests {
.unwrap();
}
local.flush(Vec::new()).await.unwrap();
local
.seal_current_epoch(epoch + 1, SealCurrentEpochOptions::for_test())
.await
.unwrap();
local.seal_current_epoch(epoch + 1, SealCurrentEpochOptions::for_test());

flush_and_commit(&hummock_meta_client, storage, epoch).await;
}
Expand Down Expand Up @@ -743,14 +734,8 @@ pub(crate) mod tests {
.insert(TableKey(prefix.freeze()), val.clone(), None)
.unwrap();
storage.flush(Vec::new()).await.unwrap();
storage
.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test())
.await
.unwrap();
other
.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test())
.await
.unwrap();
storage.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test());
other.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test());

let ssts = global_storage
.seal_and_sync_epoch(epoch)
Expand Down Expand Up @@ -940,10 +925,7 @@ pub(crate) mod tests {
.insert(TableKey(prefix.freeze()), val.clone(), None)
.unwrap();
local.flush(Vec::new()).await.unwrap();
local
.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test())
.await
.unwrap();
local.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test());

let ssts = storage
.seal_and_sync_epoch(epoch)
Expand Down Expand Up @@ -1141,10 +1123,7 @@ pub(crate) mod tests {
.insert(TableKey(Bytes::from(ramdom_key)), val.clone(), None)
.unwrap();
local.flush(Vec::new()).await.unwrap();
local
.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test())
.await
.unwrap();
local.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test());
let ssts = storage
.seal_and_sync_epoch(epoch)
.await
Expand Down Expand Up @@ -1315,10 +1294,7 @@ pub(crate) mod tests {
.flush(vec![prefix_key_range(1u16), prefix_key_range(2u16)])
.await
.unwrap();
local
.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test())
.await
.unwrap();
local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test());

flush_and_commit(&hummock_meta_client, &storage, 130).await;

Expand Down
22 changes: 8 additions & 14 deletions src/storage/hummock_test/src/failpoint_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,10 @@ async fn test_failpoints_state_store_read_upload() {
.await
.unwrap();

local
.seal_current_epoch(
3,
risingwave_storage::store::SealCurrentEpochOptions::for_test(),
)
.await
.unwrap();
local.seal_current_epoch(
3,
risingwave_storage::store::SealCurrentEpochOptions::for_test(),
);

// Get the value after flushing to remote.
let anchor_prefix_hint = {
Expand Down Expand Up @@ -137,13 +134,10 @@ async fn test_failpoints_state_store_read_upload() {
.await
.unwrap();

local
.seal_current_epoch(
u64::MAX,
risingwave_storage::store::SealCurrentEpochOptions::for_test(),
)
.await
.unwrap();
local.seal_current_epoch(
u64::MAX,
risingwave_storage::store::SealCurrentEpochOptions::for_test(),
);

// sync epoch1 test the read_error
let ssts = hummock_storage
Expand Down
101 changes: 31 additions & 70 deletions src/storage/hummock_test/src/hummock_storage_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,7 @@ async fn test_storage_basic() {
assert_eq!(value, None);

let epoch2 = epoch1 + 1;
hummock_storage
.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test())
.await
.unwrap();
hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test());
hummock_storage
.ingest_batch(
batch2,
Expand Down Expand Up @@ -200,10 +197,7 @@ async fn test_storage_basic() {

// Write the third batch.
let epoch3 = epoch2 + 1;
hummock_storage
.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test())
.await
.unwrap();
hummock_storage.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test());
hummock_storage
.ingest_batch(
batch3,
Expand Down Expand Up @@ -516,10 +510,7 @@ async fn test_state_store_sync() {
.unwrap();

let epoch2 = epoch1 + 1;
hummock_storage
.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test())
.await
.unwrap();
hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test());

// ingest more 8B then will trigger a sync behind the scene
let mut batch3 = vec![(
Expand Down Expand Up @@ -751,10 +742,7 @@ async fn test_delete_get() {
.await
.unwrap();
let epoch2 = initial_epoch + 2;
hummock_storage
.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test())
.await
.unwrap();
hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test());
let batch2 = vec![(
gen_key_from_str(VirtualNode::ZERO, "bb"),
StorageValue::new_delete(),
Expand Down Expand Up @@ -834,10 +822,7 @@ async fn test_multiple_epoch_sync() {
.unwrap();

let epoch2 = initial_epoch + 2;
hummock_storage
.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test())
.await
.unwrap();
hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test());
let batch2 = vec![(
gen_key_from_str(VirtualNode::ZERO, "bb"),
StorageValue::new_delete(),
Expand All @@ -855,10 +840,7 @@ async fn test_multiple_epoch_sync() {
.unwrap();

let epoch3 = initial_epoch + 3;
hummock_storage
.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test())
.await
.unwrap();
hummock_storage.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test());
let batch3 = vec![
(
gen_key_from_str(VirtualNode::ZERO, "bb"),
Expand Down Expand Up @@ -991,10 +973,7 @@ async fn test_iter_with_min_epoch() {
.unwrap();

let epoch2 = (32 * 1000) << 16;
hummock_storage
.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test())
.await
.unwrap();
hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test());
// epoch 2 write
let batch_epoch2: Vec<(TableKey<Bytes>, StorageValue)> = (20..30)
.map(|index| (gen_key(index), StorageValue::new_put(gen_val(index))))
Expand Down Expand Up @@ -1215,10 +1194,7 @@ async fn test_hummock_version_reader() {
.await
.unwrap();

hummock_storage
.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test())
.await
.unwrap();
hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test());
hummock_storage
.ingest_batch(
batch_epoch2,
Expand All @@ -1231,10 +1207,7 @@ async fn test_hummock_version_reader() {
.await
.unwrap();

hummock_storage
.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test())
.await
.unwrap();
hummock_storage.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test());
hummock_storage
.ingest_batch(
batch_epoch3,
Expand Down Expand Up @@ -1612,10 +1585,7 @@ async fn test_get_with_min_epoch() {
.unwrap();

let epoch2 = (32 * 1000) << 16;
hummock_storage
.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test())
.await
.unwrap();
hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test());
// epoch 2 write
let batch_epoch2: Vec<(TableKey<Bytes>, StorageValue)> = (20..30)
.map(|index| (gen_key(index), StorageValue::new_put(gen_val(index))))
Expand Down Expand Up @@ -1938,19 +1908,16 @@ async fn test_table_watermark() {
(&mut local2, vnode_bitmap2.clone()),
] {
local.flush(vec![]).await.unwrap();
local
.seal_current_epoch(
epoch2,
SealCurrentEpochOptions::new(
vec![VnodeWatermark::new(
Arc::new(vnode_bitmap),
gen_inner_key(watermark1),
)],
WatermarkDirection::Ascending,
),
)
.await
.unwrap();
local.seal_current_epoch(
epoch2,
SealCurrentEpochOptions::new(
vec![VnodeWatermark::new(
Arc::new(vnode_bitmap),
gen_inner_key(watermark1),
)],
WatermarkDirection::Ascending,
),
);
}

// test read after seal with watermark1
Expand Down Expand Up @@ -2042,10 +2009,7 @@ async fn test_table_watermark() {
local.insert(key, value, None).unwrap();
}
local.flush(vec![]).await.unwrap();
local
.seal_current_epoch(epoch3, SealCurrentEpochOptions::no_watermark())
.await
.unwrap();
local.seal_current_epoch(epoch3, SealCurrentEpochOptions::no_watermark());
}

let indexes_after_epoch2 = || gen_range().filter(|index| index % 3 == 0 || index % 3 == 1);
Expand Down Expand Up @@ -2284,19 +2248,16 @@ async fn test_table_watermark() {
(&mut local2, vnode_bitmap2.clone()),
] {
// regress watermark
local
.seal_current_epoch(
epoch4,
SealCurrentEpochOptions::new(
vec![VnodeWatermark::new(
Arc::new(vnode_bitmap),
gen_inner_key(5),
)],
WatermarkDirection::Ascending,
),
)
.await
.unwrap();
local.seal_current_epoch(
epoch4,
SealCurrentEpochOptions::new(
vec![VnodeWatermark::new(
Arc::new(vnode_bitmap),
gen_inner_key(5),
)],
WatermarkDirection::Ascending,
),
);
}

test_global_read(test_env.storage.clone(), epoch3).await;
Expand Down
20 changes: 4 additions & 16 deletions src/storage/hummock_test/src/snapshot_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,7 @@ async fn test_snapshot_inner(
.await
.unwrap();
let epoch2 = epoch1 + 1;
local
.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test())
.await
.unwrap();
local.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test());
if enable_sync {
let ssts = hummock_storage
.seal_and_sync_epoch(epoch1)
Expand Down Expand Up @@ -181,10 +178,7 @@ async fn test_snapshot_inner(
.await
.unwrap();
let epoch3 = epoch2 + 1;
local
.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test())
.await
.unwrap();
local.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test());
if enable_sync {
let ssts = hummock_storage
.seal_and_sync_epoch(epoch2)
Expand Down Expand Up @@ -229,10 +223,7 @@ async fn test_snapshot_inner(
)
.await
.unwrap();
local
.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test())
.await
.unwrap();
local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test());
if enable_sync {
let ssts = hummock_storage
.seal_and_sync_epoch(epoch3)
Expand Down Expand Up @@ -295,10 +286,7 @@ async fn test_snapshot_range_scan_inner(
)
.await
.unwrap();
local
.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test())
.await
.unwrap();
local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test());
if enable_sync {
let ssts = hummock_storage
.seal_and_sync_epoch(epoch)
Expand Down
Loading

0 comments on commit 50cd6e2

Please sign in to comment.