From aa59557537dd7a57247cc807d9a70424960e0f2f Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 25 Dec 2023 18:42:57 +0800 Subject: [PATCH 1/6] feat(storage): support table watermark filter for in-mem state store --- .../hummock_sdk/src/table_watermark.rs | 4 + .../benches/bench_hummock_iter.rs | 7 +- .../src/bin/replay/replay_impl.rs | 17 ++- .../hummock_test/src/compactor_tests.rs | 40 +++++-- .../hummock_test/src/hummock_storage_tests.rs | 101 ++++++++++++------ .../hummock_test/src/state_store_tests.rs | 10 +- src/storage/hummock_trace/src/replay/mod.rs | 8 +- .../hummock_trace/src/replay/worker.rs | 2 +- .../hummock/store/local_hummock_storage.rs | 9 +- src/storage/src/mem_table.rs | 92 +++++++++++----- src/storage/src/monitor/monitored_store.rs | 8 +- src/storage/src/monitor/traced_store.rs | 6 +- src/storage/src/panic_store.rs | 6 +- src/storage/src/store.rs | 6 +- src/storage/src/store_impl.rs | 34 ++++-- src/stream/src/common/table/state_table.rs | 16 ++- .../executor/backfill/arrangement_backfill.rs | 8 +- src/stream/src/executor/backfill/utils.rs | 6 +- .../src/executor/dedup/append_only_dedup.rs | 4 +- src/stream/src/executor/dynamic_filter.rs | 8 +- src/stream/src/executor/now.rs | 2 +- src/stream/src/executor/simple_agg.rs | 9 +- src/stream/src/executor/sort.rs | 4 +- src/stream/src/executor/watermark_filter.rs | 2 +- .../src/delete_range_runner.rs | 4 +- 25 files changed, 301 insertions(+), 112 deletions(-) diff --git a/src/storage/hummock_sdk/src/table_watermark.rs b/src/storage/hummock_sdk/src/table_watermark.rs index 2f76cff6cc5e4..76a3b5fb4d680 100644 --- a/src/storage/hummock_sdk/src/table_watermark.rs +++ b/src/storage/hummock_sdk/src/table_watermark.rs @@ -321,6 +321,10 @@ impl VnodeWatermark { pub fn vnode_bitmap(&self) -> &Bitmap { &self.vnode_bitmap } + + pub fn watermark(&self) -> &Bytes { + &self.watermark + } } #[derive(Clone, Debug)] diff --git a/src/storage/hummock_test/benches/bench_hummock_iter.rs b/src/storage/hummock_test/benches/bench_hummock_iter.rs index 2e2efa14f872f..e5ddfd9baf545 100644 --- a/src/storage/hummock_test/benches/bench_hummock_iter.rs +++ b/src/storage/hummock_test/benches/bench_hummock_iter.rs @@ -99,7 +99,12 @@ fn criterion_benchmark(c: &mut Criterion) { )) .unwrap(); } - hummock_storage.seal_current_epoch(HummockEpoch::MAX, SealCurrentEpochOptions::for_test()); + runtime + .block_on( + hummock_storage + .seal_current_epoch(HummockEpoch::MAX, SealCurrentEpochOptions::for_test()), + ) + .unwrap(); c.bench_function("bench-hummock-iter", move |b| { b.iter(|| { diff --git a/src/storage/hummock_test/src/bin/replay/replay_impl.rs b/src/storage/hummock_test/src/bin/replay/replay_impl.rs index 3b414bca4b3dd..3ace63fb38c8f 100644 --- a/src/storage/hummock_test/src/bin/replay/replay_impl.rs +++ b/src/storage/hummock_test/src/bin/replay/replay_impl.rs @@ -205,11 +205,18 @@ impl LocalReplay for LocalReplayImpl { .map_err(|_| TraceError::Other("init failed")) } - fn seal_current_epoch(&mut self, next_epoch: u64, opts: TracedSealCurrentEpochOptions) { - self.0.seal_current_epoch( - next_epoch, - opts.try_into().expect("should not fail to convert"), - ); + async fn seal_current_epoch( + &mut self, + next_epoch: u64, + opts: TracedSealCurrentEpochOptions, + ) -> Result<()> { + self.0 + .seal_current_epoch( + next_epoch, + opts.try_into().expect("should not fail to convert"), + ) + .await + .map_err(|_| TraceError::Other("seal current epoch failed")) } fn epoch(&self) -> u64 { diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 4edd044d5838c..d924ada256826 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -165,9 +165,15 @@ pub(crate) mod tests { .await .unwrap(); if i + 1 < epochs.len() { - local.seal_current_epoch(epochs[i + 1], SealCurrentEpochOptions::for_test()); + local + .seal_current_epoch(epochs[i + 1], SealCurrentEpochOptions::for_test()) + .await + .unwrap(); } else { - local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); + local + .seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); } let ssts = storage .seal_and_sync_epoch(epoch) @@ -549,7 +555,10 @@ pub(crate) mod tests { .unwrap(); } local.flush(Vec::new()).await.unwrap(); - local.seal_current_epoch(epoch + 1, SealCurrentEpochOptions::for_test()); + local + .seal_current_epoch(epoch + 1, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); flush_and_commit(&hummock_meta_client, storage, epoch).await; } @@ -734,8 +743,14 @@ pub(crate) mod tests { .insert(TableKey(prefix.freeze()), val.clone(), None) .unwrap(); storage.flush(Vec::new()).await.unwrap(); - storage.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()); - other.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()); + storage + .seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); + other + .seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); let ssts = global_storage .seal_and_sync_epoch(epoch) @@ -925,7 +940,10 @@ pub(crate) mod tests { .insert(TableKey(prefix.freeze()), val.clone(), None) .unwrap(); local.flush(Vec::new()).await.unwrap(); - local.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()); + local + .seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); let ssts = storage .seal_and_sync_epoch(epoch) @@ -1123,7 +1141,10 @@ pub(crate) mod tests { .insert(TableKey(Bytes::from(ramdom_key)), val.clone(), None) .unwrap(); local.flush(Vec::new()).await.unwrap(); - local.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()); + local + .seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); let ssts = storage .seal_and_sync_epoch(epoch) .await @@ -1294,7 +1315,10 @@ pub(crate) mod tests { .flush(vec![prefix_key_range(1u16), prefix_key_range(2u16)]) .await .unwrap(); - local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); + local + .seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); flush_and_commit(&hummock_meta_client, &storage, 130).await; diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index 9c5e7fac402a9..d794fa4374a26 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -164,7 +164,10 @@ async fn test_storage_basic() { assert_eq!(value, None); let epoch2 = epoch1 + 1; - hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); + hummock_storage + .seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); hummock_storage .ingest_batch( batch2, @@ -197,7 +200,10 @@ async fn test_storage_basic() { // Write the third batch. let epoch3 = epoch2 + 1; - hummock_storage.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()); + hummock_storage + .seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); hummock_storage .ingest_batch( batch3, @@ -510,7 +516,10 @@ async fn test_state_store_sync() { .unwrap(); let epoch2 = epoch1 + 1; - hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); + hummock_storage + .seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); // ingest more 8B then will trigger a sync behind the scene let mut batch3 = vec![( @@ -742,7 +751,10 @@ async fn test_delete_get() { .await .unwrap(); let epoch2 = initial_epoch + 2; - hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); + hummock_storage + .seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); let batch2 = vec![( gen_key_from_str(VirtualNode::ZERO, "bb"), StorageValue::new_delete(), @@ -822,7 +834,10 @@ async fn test_multiple_epoch_sync() { .unwrap(); let epoch2 = initial_epoch + 2; - hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); + hummock_storage + .seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); let batch2 = vec![( gen_key_from_str(VirtualNode::ZERO, "bb"), StorageValue::new_delete(), @@ -840,7 +855,10 @@ async fn test_multiple_epoch_sync() { .unwrap(); let epoch3 = initial_epoch + 3; - hummock_storage.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()); + hummock_storage + .seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); let batch3 = vec![ ( gen_key_from_str(VirtualNode::ZERO, "bb"), @@ -973,7 +991,10 @@ async fn test_iter_with_min_epoch() { .unwrap(); let epoch2 = (32 * 1000) << 16; - hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); + hummock_storage + .seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); // epoch 2 write let batch_epoch2: Vec<(TableKey, StorageValue)> = (20..30) .map(|index| (gen_key(index), StorageValue::new_put(gen_val(index)))) @@ -1194,7 +1215,10 @@ async fn test_hummock_version_reader() { .await .unwrap(); - hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); + hummock_storage + .seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); hummock_storage .ingest_batch( batch_epoch2, @@ -1207,7 +1231,10 @@ async fn test_hummock_version_reader() { .await .unwrap(); - hummock_storage.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()); + hummock_storage + .seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); hummock_storage .ingest_batch( batch_epoch3, @@ -1585,7 +1612,10 @@ async fn test_get_with_min_epoch() { .unwrap(); let epoch2 = (32 * 1000) << 16; - hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); + hummock_storage + .seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); // epoch 2 write let batch_epoch2: Vec<(TableKey, StorageValue)> = (20..30) .map(|index| (gen_key(index), StorageValue::new_put(gen_val(index)))) @@ -1908,16 +1938,19 @@ async fn test_table_watermark() { (&mut local2, vnode_bitmap2.clone()), ] { local.flush(vec![]).await.unwrap(); - local.seal_current_epoch( - epoch2, - SealCurrentEpochOptions::new( - vec![VnodeWatermark::new( - Arc::new(vnode_bitmap), - gen_inner_key(watermark1), - )], - WatermarkDirection::Ascending, - ), - ); + local + .seal_current_epoch( + epoch2, + SealCurrentEpochOptions::new( + vec![VnodeWatermark::new( + Arc::new(vnode_bitmap), + gen_inner_key(watermark1), + )], + WatermarkDirection::Ascending, + ), + ) + .await + .unwrap(); } // test read after seal with watermark1 @@ -2009,7 +2042,10 @@ async fn test_table_watermark() { local.insert(key, value, None).unwrap(); } local.flush(vec![]).await.unwrap(); - local.seal_current_epoch(epoch3, SealCurrentEpochOptions::no_watermark()); + local + .seal_current_epoch(epoch3, SealCurrentEpochOptions::no_watermark()) + .await + .unwrap(); } let indexes_after_epoch2 = || gen_range().filter(|index| index % 3 == 0 || index % 3 == 1); @@ -2248,16 +2284,19 @@ async fn test_table_watermark() { (&mut local2, vnode_bitmap2.clone()), ] { // regress watermark - local.seal_current_epoch( - epoch4, - SealCurrentEpochOptions::new( - vec![VnodeWatermark::new( - Arc::new(vnode_bitmap), - gen_inner_key(5), - )], - WatermarkDirection::Ascending, - ), - ); + local + .seal_current_epoch( + epoch4, + SealCurrentEpochOptions::new( + vec![VnodeWatermark::new( + Arc::new(vnode_bitmap), + gen_inner_key(5), + )], + WatermarkDirection::Ascending, + ), + ) + .await + .unwrap(); } test_global_read(test_env.storage.clone(), epoch3).await; diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index 9babe12b9054b..813e1b8860101 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -1289,7 +1289,10 @@ async fn test_gc_watermark_and_clear_shared_buffer() { ); let epoch2 = initial_epoch + 2; - local_hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); + local_hummock_storage + .seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); local_hummock_storage .delete( gen_key_from_str(VirtualNode::ZERO, "bb"), @@ -1312,7 +1315,10 @@ async fn test_gc_watermark_and_clear_shared_buffer() { .min() .unwrap() }; - local_hummock_storage.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); + local_hummock_storage + .seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); let sync_result1 = hummock_storage.seal_and_sync_epoch(epoch1).await.unwrap(); let min_object_id_epoch1 = min_object_id(&sync_result1); assert_eq!( diff --git a/src/storage/hummock_trace/src/replay/mod.rs b/src/storage/hummock_trace/src/replay/mod.rs index df6c191f31764..c98f3653bc410 100644 --- a/src/storage/hummock_trace/src/replay/mod.rs +++ b/src/storage/hummock_trace/src/replay/mod.rs @@ -60,7 +60,11 @@ pub(crate) enum WorkerId { #[async_trait::async_trait] pub trait LocalReplay: LocalReplayRead + ReplayWrite + Send + Sync { async fn init(&mut self, options: TracedInitOptions) -> Result<()>; - fn seal_current_epoch(&mut self, next_epoch: u64, opts: TracedSealCurrentEpochOptions); + async fn seal_current_epoch( + &mut self, + next_epoch: u64, + opts: TracedSealCurrentEpochOptions, + ) -> Result<()>; fn is_dirty(&self) -> bool; fn epoch(&self) -> u64; async fn flush( @@ -185,7 +189,7 @@ mock! { #[async_trait::async_trait] impl LocalReplay for LocalReplayInterface{ async fn init(&mut self, options: TracedInitOptions) -> Result<()>; - fn seal_current_epoch(&mut self, next_epoch: u64, opts: TracedSealCurrentEpochOptions); + async fn seal_current_epoch(&mut self, next_epoch: u64, opts: TracedSealCurrentEpochOptions) -> Result<()>; fn is_dirty(&self) -> bool; fn epoch(&self) -> u64; async fn flush(&mut self, delete_ranges: Vec<(Bound, Bound)>) -> Result; diff --git a/src/storage/hummock_trace/src/replay/worker.rs b/src/storage/hummock_trace/src/replay/worker.rs index f77543cf92b9d..26a43ca92ec1d 100644 --- a/src/storage/hummock_trace/src/replay/worker.rs +++ b/src/storage/hummock_trace/src/replay/worker.rs @@ -347,7 +347,7 @@ impl ReplayWorker { Operation::SealCurrentEpoch { epoch, opts } => { assert_ne!(storage_type, StorageType::Global); let local_storage = local_storages.get_mut(&storage_type).unwrap(); - local_storage.seal_current_epoch(epoch, opts); + local_storage.seal_current_epoch(epoch, opts).await.unwrap(); } Operation::ValidateReadEpoch(epoch) => { assert_eq!(storage_type, StorageType::Global); diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index 72c24cc5b1cb7..a633ef0a87c9e 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -394,7 +394,11 @@ impl LocalStateStore for LocalHummockStorage { Ok(()) } - fn seal_current_epoch(&mut self, next_epoch: u64, mut opts: SealCurrentEpochOptions) { + async fn seal_current_epoch( + &mut self, + next_epoch: u64, + mut opts: SealCurrentEpochOptions, + ) -> StorageResult<()> { assert!(!self.is_dirty()); let prev_epoch = self .epoch @@ -425,7 +429,8 @@ impl LocalStateStore for LocalHummockStorage { epoch: prev_epoch, opts, }) - .expect("should be able to send") + .expect("should be able to send"); + Ok(()) } } diff --git a/src/storage/src/mem_table.rs b/src/storage/src/mem_table.rs index eca66bfba2f74..c4e93e4e98f43 100644 --- a/src/storage/src/mem_table.rs +++ b/src/storage/src/mem_table.rs @@ -16,17 +16,19 @@ use std::cmp::Ordering; use std::collections::btree_map::Entry; use std::collections::BTreeMap; use std::future::Future; -use std::ops::Bound::{Included, Unbounded}; +use std::ops::Bound::{Excluded, Included, Unbounded}; use std::ops::{Bound, RangeBounds}; use bytes::Bytes; use futures::{pin_mut, StreamExt}; use futures_async_stream::try_stream; +use itertools::Itertools; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::estimate_size::{EstimateSize, KvSize}; -use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange}; +use risingwave_common::hash::VnodeBitmapExt; +use risingwave_hummock_sdk::key::{prefixed_range_with_vnode, FullKey, TableKey, TableKeyRange}; +use risingwave_hummock_sdk::table_watermark::WatermarkDirection; use thiserror::Error; -use tracing::warn; use crate::error::{StorageError, StorageResult}; use crate::hummock::iterator::{FromRustIterator, RustIteratorBuilder}; @@ -429,6 +431,10 @@ pub struct MemtableLocalStateStore { table_id: TableId, is_consistent_op: bool, table_option: TableOption, + + /// buffer the delete_ranges passed from `flush` and + /// write to `inner` on `seal_current_epoch` + delete_ranges: Vec<(Bound, Bound)>, } impl MemtableLocalStateStore { @@ -440,6 +446,7 @@ impl MemtableLocalStateStore { table_id: option.table_id, is_consistent_op: option.is_consistent_op, table_option: option.table_option, + delete_ranges: Vec::new(), } } @@ -515,6 +522,58 @@ impl LocalStateStore for MemtableLocalState &mut self, delete_ranges: Vec<(Bound, Bound)>, ) -> StorageResult { + self.delete_ranges.extend(delete_ranges); + Ok(0) + } + + fn epoch(&self) -> u64 { + self.epoch.expect("should have set the epoch") + } + + fn is_dirty(&self) -> bool { + self.mem_table.is_dirty() + } + + #[allow(clippy::unused_async)] + async fn init(&mut self, options: InitOptions) -> StorageResult<()> { + assert!( + self.epoch.replace(options.epoch.curr).is_none(), + "local state store of table id {:?} is init for more than once", + self.table_id + ); + Ok(()) + } + + async fn seal_current_epoch( + &mut self, + next_epoch: u64, + opts: SealCurrentEpochOptions, + ) -> StorageResult<()> { + let delete_ranges = { + let mut delete_ranges = self.delete_ranges.drain(..).collect_vec(); + // when table_watermark is specified, ignore the + if let Some((direction, watermarks)) = opts.table_watermarks { + delete_ranges.extend(watermarks.iter().flat_map(|vnode_watermark| { + let inner_range = match direction { + WatermarkDirection::Ascending => { + (Unbounded, Excluded(vnode_watermark.watermark().clone())) + } + WatermarkDirection::Descending => { + (Excluded(vnode_watermark.watermark().clone()), Unbounded) + } + }; + vnode_watermark + .vnode_bitmap() + .iter_vnodes() + .map(move |vnode| { + let (start, end) = + prefixed_range_with_vnode(inner_range.clone(), vnode); + (start.map(|key| key.0.clone()), end.map(|key| key.0.clone())) + }) + })) + } + delete_ranges + }; debug_assert!(delete_ranges .iter() .map(|(key, _)| key) @@ -580,28 +639,7 @@ impl LocalStateStore for MemtableLocalState table_id: self.table_id, }, ) - .await - } - - fn epoch(&self) -> u64 { - self.epoch.expect("should have set the epoch") - } - - fn is_dirty(&self) -> bool { - self.mem_table.is_dirty() - } - - #[allow(clippy::unused_async)] - async fn init(&mut self, options: InitOptions) -> StorageResult<()> { - assert!( - self.epoch.replace(options.epoch.curr).is_none(), - "local state store of table id {:?} is init for more than once", - self.table_id - ); - Ok(()) - } - - fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) { + .await?; assert!(!self.is_dirty()); let prev_epoch = self .epoch @@ -613,9 +651,7 @@ impl LocalStateStore for MemtableLocalState next_epoch, prev_epoch ); - if opts.table_watermarks.is_some() { - warn!("table watermark only supported in hummock state store"); - } + Ok(()) } async fn try_flush(&mut self) -> StorageResult<()> { diff --git a/src/storage/src/monitor/monitored_store.rs b/src/storage/src/monitor/monitored_store.rs index a4944a7a99195..5fcf410a8508e 100644 --- a/src/storage/src/monitor/monitored_store.rs +++ b/src/storage/src/monitor/monitored_store.rs @@ -280,9 +280,13 @@ impl LocalStateStore for MonitoredStateStore { self.inner.init(options).await } - fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) { + async fn seal_current_epoch( + &mut self, + next_epoch: u64, + opts: SealCurrentEpochOptions, + ) -> StorageResult<()> { // TODO: may collect metrics - self.inner.seal_current_epoch(next_epoch, opts) + self.inner.seal_current_epoch(next_epoch, opts).await } fn try_flush(&mut self) -> impl Future> + Send + '_ { diff --git a/src/storage/src/monitor/traced_store.rs b/src/storage/src/monitor/traced_store.rs index cd65931a7e602..341a32131ced9 100644 --- a/src/storage/src/monitor/traced_store.rs +++ b/src/storage/src/monitor/traced_store.rs @@ -208,7 +208,11 @@ impl LocalStateStore for TracedStateStore { self.inner.init(options).await } - fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) { + async fn seal_current_epoch( + &mut self, + next_epoch: u64, + opts: SealCurrentEpochOptions, + ) -> StorageResult<()> { let _span = TraceSpan::new_seal_current_epoch_span( next_epoch, TracedSealCurrentEpochOptions::from(opts.clone()), diff --git a/src/storage/src/panic_store.rs b/src/storage/src/panic_store.rs index ffeb35e77bf5d..5e22e9ecd5a5e 100644 --- a/src/storage/src/panic_store.rs +++ b/src/storage/src/panic_store.rs @@ -130,7 +130,11 @@ impl LocalStateStore for PanicStateStore { panic!("should not operate on the panic state store!"); } - fn seal_current_epoch(&mut self, _next_epoch: u64, _opts: SealCurrentEpochOptions) { + async fn seal_current_epoch( + &mut self, + _next_epoch: u64, + _opts: SealCurrentEpochOptions, + ) -> StorageResult<()> { panic!("should not operate on the panic state store!") } diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index cf5211d7069e5..fe0610031dbb0 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -256,7 +256,11 @@ pub trait LocalStateStore: StaticSendSync { /// Updates the monotonically increasing write epoch to `new_epoch`. /// All writes after this function is called will be tagged with `new_epoch`. In other words, /// the previous write epoch is sealed. - fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions); + fn seal_current_epoch( + &mut self, + next_epoch: u64, + opts: SealCurrentEpochOptions, + ) -> impl Future> + Send + '_; /// Check existence of a given `key_range`. /// It is better to provide `prefix_hint` in `read_options`, which will be used diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index 3d63eb7064d0f..c831aef6663fe 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -445,11 +445,17 @@ pub mod verify { Ok(()) } - fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) { + async fn seal_current_epoch( + &mut self, + next_epoch: u64, + opts: SealCurrentEpochOptions, + ) -> StorageResult<()> { if let Some(expected) = &mut self.expected { - expected.seal_current_epoch(next_epoch, opts.clone()); + expected + .seal_current_epoch(next_epoch, opts.clone()) + .await?; } - self.actual.seal_current_epoch(next_epoch, opts); + self.actual.seal_current_epoch(next_epoch, opts).await } fn epoch(&self) -> u64 { @@ -789,7 +795,11 @@ pub mod boxed_state_store { async fn init(&mut self, epoch: InitOptions) -> StorageResult<()>; - fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions); + async fn seal_current_epoch( + &mut self, + next_epoch: u64, + opts: SealCurrentEpochOptions, + ) -> StorageResult<()>; } #[async_trait::async_trait] @@ -854,8 +864,12 @@ pub mod boxed_state_store { self.init(options).await } - fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) { - self.seal_current_epoch(next_epoch, opts) + async fn seal_current_epoch( + &mut self, + next_epoch: u64, + opts: SealCurrentEpochOptions, + ) -> StorageResult<()> { + self.seal_current_epoch(next_epoch, opts).await } } @@ -927,8 +941,12 @@ pub mod boxed_state_store { self.deref_mut().init(options) } - fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) { - self.deref_mut().seal_current_epoch(next_epoch, opts) + async fn seal_current_epoch( + &mut self, + next_epoch: u64, + opts: SealCurrentEpochOptions, + ) -> StorageResult<()> { + self.deref_mut().seal_current_epoch(next_epoch, opts).await } } diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index d887684686977..9d58292171492 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -1041,7 +1041,8 @@ where if !self.is_dirty() { // If the state table is not modified, go fast path. self.local_store - .seal_current_epoch(new_epoch.curr, SealCurrentEpochOptions::no_watermark()); + .seal_current_epoch(new_epoch.curr, SealCurrentEpochOptions::no_watermark()) + .await?; return Ok(()); } else { self.seal_current_epoch(new_epoch.curr) @@ -1103,14 +1104,19 @@ where // TODO(st1page): maybe we should extract a pub struct to do it /// just specially used by those state table read-only and after the call the data /// in the epoch will be visible - pub fn commit_no_data_expected(&mut self, new_epoch: EpochPair) { + pub async fn commit_no_data_expected( + &mut self, + new_epoch: EpochPair, + ) -> StreamExecutorResult<()> { assert_eq!(self.epoch(), new_epoch.prev); assert!(!self.is_dirty()); // Tick the watermark buffer here because state table is expected to be committed once // per epoch. self.watermark_buffer_strategy.tick(); self.local_store - .seal_current_epoch(new_epoch.curr, SealCurrentEpochOptions::no_watermark()); + .seal_current_epoch(new_epoch.curr, SealCurrentEpochOptions::no_watermark()) + .await?; + Ok(()) } /// Write to state store. @@ -1209,7 +1215,9 @@ where } None => SealCurrentEpochOptions::no_watermark(), }; - self.local_store.seal_current_epoch(next_epoch, seal_opt); + self.local_store + .seal_current_epoch(next_epoch, seal_opt) + .await?; Ok(()) } diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs index 28fcaa8862faa..fecc79cf679b6 100644 --- a/src/stream/src/executor/backfill/arrangement_backfill.rs +++ b/src/stream/src/executor/backfill/arrangement_backfill.rs @@ -377,7 +377,9 @@ where } if upstream_chunk_buffer_is_empty { - upstream_table.commit_no_data_expected(barrier.epoch) + upstream_table + .commit_no_data_expected(barrier.epoch) + .await?; } else { upstream_table.commit(barrier.epoch).await?; } @@ -496,7 +498,9 @@ where "backfill_finished_after_barrier" ); if let Message::Barrier(barrier) = &msg { - self.state_table.commit_no_data_expected(barrier.epoch); + self.state_table + .commit_no_data_expected(barrier.epoch) + .await?; } yield msg; } diff --git a/src/stream/src/executor/backfill/utils.rs b/src/stream/src/executor/backfill/utils.rs index d344b23c294dc..e39c9b5fd92dd 100644 --- a/src/stream/src/executor/backfill/utils.rs +++ b/src/stream/src/executor/backfill/utils.rs @@ -462,7 +462,7 @@ pub(crate) async fn flush_data( let vnodes = table.vnodes().clone(); if let Some(old_state) = old_state { if old_state[1..] == current_partial_state[1..] { - table.commit_no_data_expected(epoch); + table.commit_no_data_expected(epoch).await?; return Ok(()); } else { vnodes.iter_vnodes_scalar().for_each(|vnode| { @@ -688,7 +688,7 @@ pub(crate) async fn persist_state_per_vnode( flush_data(table, epoch, old_state, current_state).await?; *old_state = Some(current_state.into()); } else { - table.commit_no_data_expected(epoch); + table.commit_no_data_expected(epoch).await?; } Ok(()) } diff --git a/src/stream/src/executor/dedup/append_only_dedup.rs b/src/stream/src/executor/dedup/append_only_dedup.rs index 3c44e64cb2aea..0f3e9d10fc045 100644 --- a/src/stream/src/executor/dedup/append_only_dedup.rs +++ b/src/stream/src/executor/dedup/append_only_dedup.rs @@ -140,7 +140,9 @@ impl AppendOnlyDedupExecutor { self.state_table.commit(barrier.epoch).await?; commit_data = false; } else { - self.state_table.commit_no_data_expected(barrier.epoch); + self.state_table + .commit_no_data_expected(barrier.epoch) + .await?; } if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(self.ctx.id) { diff --git a/src/stream/src/executor/dynamic_filter.rs b/src/stream/src/executor/dynamic_filter.rs index 406acb2536ba4..e859d646bebdd 100644 --- a/src/stream/src/executor/dynamic_filter.rs +++ b/src/stream/src/executor/dynamic_filter.rs @@ -464,12 +464,16 @@ impl DynamicFilterExecutor NowExecutor { initialized = true; } else if paused { // Assert that no data is updated. - state_table.commit_no_data_expected(barrier.epoch); + state_table.commit_no_data_expected(barrier.epoch).await?; } else { state_table.commit(barrier.epoch).await?; } diff --git a/src/stream/src/executor/simple_agg.rs b/src/stream/src/executor/simple_agg.rs index 0d33a7dc3074e..104ffeb001793 100644 --- a/src/stream/src/executor/simple_agg.rs +++ b/src/stream/src/executor/simple_agg.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use futures::future::try_join_all; use futures::StreamExt; use futures_async_stream::try_stream; use risingwave_common::array::StreamChunk; @@ -228,9 +229,11 @@ impl SimpleAggExecutor { } else { // No state is changed. // Call commit on state table to increment the epoch. - this.all_state_tables_mut().for_each(|table| { - table.commit_no_data_expected(epoch); - }); + try_join_all( + this.all_state_tables_mut() + .map(|table| table.commit_no_data_expected(epoch)), + ) + .await?; None }; diff --git a/src/stream/src/executor/sort.rs b/src/stream/src/executor/sort.rs index 42412d4f22587..60fba19314409 100644 --- a/src/stream/src/executor/sort.rs +++ b/src/stream/src/executor/sort.rs @@ -148,7 +148,9 @@ impl SortExecutor { if vars.buffer_changed { this.buffer_table.commit(barrier.epoch).await?; } else { - this.buffer_table.commit_no_data_expected(barrier.epoch); + this.buffer_table + .commit_no_data_expected(barrier.epoch) + .await?; } vars.buffer_changed = false; diff --git a/src/stream/src/executor/watermark_filter.rs b/src/stream/src/executor/watermark_filter.rs index 29418e82c0fd5..b920840cf741d 100644 --- a/src/stream/src/executor/watermark_filter.rs +++ b/src/stream/src/executor/watermark_filter.rs @@ -256,7 +256,7 @@ impl WatermarkFilterExecutor { } table.commit(barrier.epoch).await?; } else { - table.commit_no_data_expected(barrier.epoch); + table.commit_no_data_expected(barrier.epoch).await?; } if barrier.kind.is_checkpoint() { diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index d2acd7c754c74..f739a0d1cf345 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -425,7 +425,9 @@ impl NormalState { .await .map_err(|e| format!("{:?}", e))?; self.storage - .seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()); + .seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()) + .await + .map_err(|e| format!("{:?}", e))?; Ok(()) } From c921687096bc451cee48505d5a65ea6c771a566d Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 25 Dec 2023 19:12:09 +0800 Subject: [PATCH 2/6] fix compile --- .../hummock_test/src/failpoint_tests.rs | 4 +- .../hummock_test/src/sync_point_tests.rs | 44 ++++++++++++------- src/storage/src/monitor/traced_store.rs | 2 +- 3 files changed, 31 insertions(+), 19 deletions(-) diff --git a/src/storage/hummock_test/src/failpoint_tests.rs b/src/storage/hummock_test/src/failpoint_tests.rs index cbfec13e354fe..d56d2c8565d44 100644 --- a/src/storage/hummock_test/src/failpoint_tests.rs +++ b/src/storage/hummock_test/src/failpoint_tests.rs @@ -98,7 +98,7 @@ async fn test_failpoints_state_store_read_upload() { local.seal_current_epoch( 3, risingwave_storage::store::SealCurrentEpochOptions::for_test(), - ); + ).await.unwrap(); // Get the value after flushing to remote. let anchor_prefix_hint = { @@ -137,7 +137,7 @@ async fn test_failpoints_state_store_read_upload() { local.seal_current_epoch( u64::MAX, risingwave_storage::store::SealCurrentEpochOptions::for_test(), - ); + ).await.unwrap(); // sync epoch1 test the read_error let ssts = hummock_storage diff --git a/src/storage/hummock_test/src/sync_point_tests.rs b/src/storage/hummock_test/src/sync_point_tests.rs index aa862d80085f7..dbc61b84b0756 100644 --- a/src/storage/hummock_test/src/sync_point_tests.rs +++ b/src/storage/hummock_test/src/sync_point_tests.rs @@ -298,10 +298,13 @@ async fn test_syncpoints_get_in_delete_range_boundary() { ) .unwrap(); local.flush(Vec::new()).await.unwrap(); - local.seal_current_epoch( - 101, - risingwave_storage::store::SealCurrentEpochOptions::for_test(), - ); + local + .seal_current_epoch( + 101, + risingwave_storage::store::SealCurrentEpochOptions::for_test(), + ) + .await + .unwrap(); flush_and_commit(&hummock_meta_client, &storage, 100).await; compact_once( hummock_manager_ref.clone(), @@ -332,10 +335,13 @@ async fn test_syncpoints_get_in_delete_range_boundary() { )]) .await .unwrap(); - local.seal_current_epoch( - 102, - risingwave_storage::store::SealCurrentEpochOptions::for_test(), - ); + local + .seal_current_epoch( + 102, + risingwave_storage::store::SealCurrentEpochOptions::for_test(), + ) + .await + .unwrap(); flush_and_commit(&hummock_meta_client, &storage, 101).await; compact_once( hummock_manager_ref.clone(), @@ -366,10 +372,13 @@ async fn test_syncpoints_get_in_delete_range_boundary() { )]) .await .unwrap(); - local.seal_current_epoch( - 103, - risingwave_storage::store::SealCurrentEpochOptions::for_test(), - ); + local + .seal_current_epoch( + 103, + risingwave_storage::store::SealCurrentEpochOptions::for_test(), + ) + .await + .unwrap(); flush_and_commit(&hummock_meta_client, &storage, 102).await; // move this two file to the same level. compact_once( @@ -395,10 +404,13 @@ async fn test_syncpoints_get_in_delete_range_boundary() { ) .unwrap(); local.flush(Vec::new()).await.unwrap(); - local.seal_current_epoch( - u64::MAX, - risingwave_storage::store::SealCurrentEpochOptions::for_test(), - ); + local + .seal_current_epoch( + u64::MAX, + risingwave_storage::store::SealCurrentEpochOptions::for_test(), + ) + .await + .unwrap(); flush_and_commit(&hummock_meta_client, &storage, 103).await; // move this two file to the same level. compact_once( diff --git a/src/storage/src/monitor/traced_store.rs b/src/storage/src/monitor/traced_store.rs index 341a32131ced9..eddaf592fa747 100644 --- a/src/storage/src/monitor/traced_store.rs +++ b/src/storage/src/monitor/traced_store.rs @@ -218,7 +218,7 @@ impl LocalStateStore for TracedStateStore { TracedSealCurrentEpochOptions::from(opts.clone()), self.storage_type, ); - self.inner.seal_current_epoch(next_epoch, opts) + self.inner.seal_current_epoch(next_epoch, opts).await } async fn try_flush(&mut self) -> StorageResult<()> { From 04407c90d1f5a35002a46f5c28c42aee6857348c Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 25 Dec 2023 19:21:47 +0800 Subject: [PATCH 3/6] fmt --- .../hummock_test/src/failpoint_tests.rs | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/src/storage/hummock_test/src/failpoint_tests.rs b/src/storage/hummock_test/src/failpoint_tests.rs index d56d2c8565d44..edbffbac40ad0 100644 --- a/src/storage/hummock_test/src/failpoint_tests.rs +++ b/src/storage/hummock_test/src/failpoint_tests.rs @@ -95,10 +95,13 @@ async fn test_failpoints_state_store_read_upload() { .await .unwrap(); - local.seal_current_epoch( - 3, - risingwave_storage::store::SealCurrentEpochOptions::for_test(), - ).await.unwrap(); + local + .seal_current_epoch( + 3, + risingwave_storage::store::SealCurrentEpochOptions::for_test(), + ) + .await + .unwrap(); // Get the value after flushing to remote. let anchor_prefix_hint = { @@ -134,10 +137,13 @@ async fn test_failpoints_state_store_read_upload() { .await .unwrap(); - local.seal_current_epoch( - u64::MAX, - risingwave_storage::store::SealCurrentEpochOptions::for_test(), - ).await.unwrap(); + local + .seal_current_epoch( + u64::MAX, + risingwave_storage::store::SealCurrentEpochOptions::for_test(), + ) + .await + .unwrap(); // sync epoch1 test the read_error let ssts = hummock_storage From bf3b7d2dfdf9928b348f030dfd1a84b209e79a5c Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 25 Dec 2023 19:39:26 +0800 Subject: [PATCH 4/6] fix --- .../src/common/log_store_impl/kv_log_store/writer.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs index fa380a77b6aee..4a56481b8f7b3 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs @@ -160,10 +160,12 @@ impl LogWriter for KvLogStoreWriter { } self.state_store.flush(vec![]).await?; let watermark = watermark.into_iter().collect_vec(); - self.state_store.seal_current_epoch( - next_epoch, - SealCurrentEpochOptions::new(watermark, WatermarkDirection::Ascending), - ); + self.state_store + .seal_current_epoch( + next_epoch, + SealCurrentEpochOptions::new(watermark, WatermarkDirection::Ascending), + ) + .await?; self.tx.barrier(epoch, is_checkpoint, next_epoch); self.seq_id = FIRST_SEQ_ID; Ok(()) From 1555d243dda84aa0153dea10925faa9e470b9859 Mon Sep 17 00:00:00 2001 From: William Wen Date: Tue, 26 Dec 2023 14:56:45 +0800 Subject: [PATCH 5/6] fix ut --- .../hummock_test/src/snapshot_tests.rs | 20 ++- .../hummock_test/src/state_store_tests.rs | 60 +++++++-- src/storage/src/mem_table.rs | 117 +++++++++--------- 3 files changed, 123 insertions(+), 74 deletions(-) diff --git a/src/storage/hummock_test/src/snapshot_tests.rs b/src/storage/hummock_test/src/snapshot_tests.rs index 61e20c3799a37..5458f1dbed5f3 100644 --- a/src/storage/hummock_test/src/snapshot_tests.rs +++ b/src/storage/hummock_test/src/snapshot_tests.rs @@ -133,7 +133,10 @@ async fn test_snapshot_inner( .await .unwrap(); let epoch2 = epoch1 + 1; - local.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); + local + .seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); if enable_sync { let ssts = hummock_storage .seal_and_sync_epoch(epoch1) @@ -178,7 +181,10 @@ async fn test_snapshot_inner( .await .unwrap(); let epoch3 = epoch2 + 1; - local.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()); + local + .seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); if enable_sync { let ssts = hummock_storage .seal_and_sync_epoch(epoch2) @@ -223,7 +229,10 @@ async fn test_snapshot_inner( ) .await .unwrap(); - local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); + local + .seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); if enable_sync { let ssts = hummock_storage .seal_and_sync_epoch(epoch3) @@ -286,7 +295,10 @@ async fn test_snapshot_range_scan_inner( ) .await .unwrap(); - local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); + local + .seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); if enable_sync { let ssts = hummock_storage .seal_and_sync_epoch(epoch) diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index 813e1b8860101..b4dac4a0269e5 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -160,7 +160,10 @@ async fn test_basic_inner( .unwrap(); let epoch2 = epoch1 + 1; - local.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); + local + .seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); // Get the value after flushing to remote. let value = hummock_storage @@ -218,7 +221,10 @@ async fn test_basic_inner( .unwrap(); let epoch3 = epoch2 + 1; - local.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()); + local + .seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); // Get the value after flushing to remote. let value = hummock_storage @@ -249,7 +255,10 @@ async fn test_basic_inner( .await .unwrap(); - local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); + local + .seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); // Get the value after flushing to remote. let value = hummock_storage @@ -481,7 +490,10 @@ async fn test_state_store_sync_inner( // ); epoch += 1; - local.seal_current_epoch(epoch, SealCurrentEpochOptions::for_test()); + local + .seal_current_epoch(epoch, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); // ingest more 8B then will trigger a sync behind the scene let mut batch3 = vec![( @@ -509,7 +521,10 @@ async fn test_state_store_sync_inner( // hummock_storage.shared_buffer_manager().size() as u64 // ); - local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); + local + .seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); // trigger a sync hummock_storage @@ -975,7 +990,10 @@ async fn test_write_anytime_inner( assert_new_value(epoch1).await; let epoch2 = epoch1 + 1; - local.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); + local + .seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); // Write to epoch2 local @@ -989,7 +1007,10 @@ async fn test_write_anytime_inner( ) .await .unwrap(); - local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); + local + .seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); // Assert epoch 1 unchanged assert_new_value(epoch1).await; // Assert epoch 2 correctness @@ -1058,7 +1079,10 @@ async fn test_delete_get_inner( meta_client.commit_epoch(epoch1, ssts).await.unwrap(); let epoch2 = initial_epoch + 2; - local.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); + local + .seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); let batch2 = vec![( gen_key_from_str(VirtualNode::ZERO, "bb"), StorageValue::new_delete(), @@ -1074,7 +1098,10 @@ async fn test_delete_get_inner( ) .await .unwrap(); - local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); + local + .seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); let ssts = hummock_storage .seal_and_sync_epoch(epoch2) .await @@ -1137,7 +1164,10 @@ async fn test_multiple_epoch_sync_inner( .unwrap(); let epoch2 = initial_epoch + 2; - local.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); + local + .seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); let batch2 = vec![( gen_key_from_str(VirtualNode::ZERO, "bb"), StorageValue::new_delete(), @@ -1165,7 +1195,10 @@ async fn test_multiple_epoch_sync_inner( StorageValue::new_put("555"), ), ]; - local.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()); + local + .seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); local .ingest_batch( batch3, @@ -1177,7 +1210,10 @@ async fn test_multiple_epoch_sync_inner( ) .await .unwrap(); - local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); + local + .seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()) + .await + .unwrap(); let test_get = || { let hummock_storage_clone = &hummock_storage; async move { diff --git a/src/storage/src/mem_table.rs b/src/storage/src/mem_table.rs index c4e93e4e98f43..90b77029dcbb2 100644 --- a/src/storage/src/mem_table.rs +++ b/src/storage/src/mem_table.rs @@ -431,10 +431,6 @@ pub struct MemtableLocalStateStore { table_id: TableId, is_consistent_op: bool, table_option: TableOption, - - /// buffer the delete_ranges passed from `flush` and - /// write to `inner` on `seal_current_epoch` - delete_ranges: Vec<(Bound, Bound)>, } impl MemtableLocalStateStore { @@ -446,7 +442,6 @@ impl MemtableLocalStateStore { table_id: option.table_id, is_consistent_op: option.is_consistent_op, table_option: option.table_option, - delete_ranges: Vec::new(), } } @@ -522,58 +517,6 @@ impl LocalStateStore for MemtableLocalState &mut self, delete_ranges: Vec<(Bound, Bound)>, ) -> StorageResult { - self.delete_ranges.extend(delete_ranges); - Ok(0) - } - - fn epoch(&self) -> u64 { - self.epoch.expect("should have set the epoch") - } - - fn is_dirty(&self) -> bool { - self.mem_table.is_dirty() - } - - #[allow(clippy::unused_async)] - async fn init(&mut self, options: InitOptions) -> StorageResult<()> { - assert!( - self.epoch.replace(options.epoch.curr).is_none(), - "local state store of table id {:?} is init for more than once", - self.table_id - ); - Ok(()) - } - - async fn seal_current_epoch( - &mut self, - next_epoch: u64, - opts: SealCurrentEpochOptions, - ) -> StorageResult<()> { - let delete_ranges = { - let mut delete_ranges = self.delete_ranges.drain(..).collect_vec(); - // when table_watermark is specified, ignore the - if let Some((direction, watermarks)) = opts.table_watermarks { - delete_ranges.extend(watermarks.iter().flat_map(|vnode_watermark| { - let inner_range = match direction { - WatermarkDirection::Ascending => { - (Unbounded, Excluded(vnode_watermark.watermark().clone())) - } - WatermarkDirection::Descending => { - (Excluded(vnode_watermark.watermark().clone()), Unbounded) - } - }; - vnode_watermark - .vnode_bitmap() - .iter_vnodes() - .map(move |vnode| { - let (start, end) = - prefixed_range_with_vnode(inner_range.clone(), vnode); - (start.map(|key| key.0.clone()), end.map(|key| key.0.clone())) - }) - })) - } - delete_ranges - }; debug_assert!(delete_ranges .iter() .map(|(key, _)| key) @@ -639,7 +582,32 @@ impl LocalStateStore for MemtableLocalState table_id: self.table_id, }, ) - .await?; + .await + } + + fn epoch(&self) -> u64 { + self.epoch.expect("should have set the epoch") + } + + fn is_dirty(&self) -> bool { + self.mem_table.is_dirty() + } + + #[allow(clippy::unused_async)] + async fn init(&mut self, options: InitOptions) -> StorageResult<()> { + assert!( + self.epoch.replace(options.epoch.curr).is_none(), + "local state store of table id {:?} is init for more than once", + self.table_id + ); + Ok(()) + } + + async fn seal_current_epoch( + &mut self, + next_epoch: u64, + opts: SealCurrentEpochOptions, + ) -> StorageResult<()> { assert!(!self.is_dirty()); let prev_epoch = self .epoch @@ -651,6 +619,39 @@ impl LocalStateStore for MemtableLocalState next_epoch, prev_epoch ); + if let Some((direction, watermarks)) = opts.table_watermarks { + let delete_ranges = watermarks + .iter() + .flat_map(|vnode_watermark| { + let inner_range = match direction { + WatermarkDirection::Ascending => { + (Unbounded, Excluded(vnode_watermark.watermark().clone())) + } + WatermarkDirection::Descending => { + (Excluded(vnode_watermark.watermark().clone()), Unbounded) + } + }; + vnode_watermark + .vnode_bitmap() + .iter_vnodes() + .map(move |vnode| { + let (start, end) = + prefixed_range_with_vnode(inner_range.clone(), vnode); + (start.map(|key| key.0.clone()), end.map(|key| key.0.clone())) + }) + }) + .collect_vec(); + self.inner + .ingest_batch( + Vec::new(), + delete_ranges, + WriteOptions { + epoch: self.epoch(), + table_id: self.table_id, + }, + ) + .await?; + } Ok(()) } From 50cd6e2d7e2c5fe5dfe90382e23ac71dbbd05c58 Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 27 Dec 2023 20:08:40 +0800 Subject: [PATCH 6/6] remove async from ingest_batch and seal_current_epoch --- .../benches/bench_hummock_iter.rs | 7 +- .../src/bin/replay/replay_impl.rs | 17 +-- .../hummock_test/src/compactor_tests.rs | 40 ++----- .../hummock_test/src/failpoint_tests.rs | 22 ++-- .../hummock_test/src/hummock_storage_tests.rs | 101 ++++++------------ .../hummock_test/src/snapshot_tests.rs | 20 +--- .../hummock_test/src/state_store_tests.rs | 70 +++--------- .../hummock_test/src/sync_point_tests.rs | 44 +++----- src/storage/hummock_trace/src/replay/mod.rs | 8 +- .../hummock_trace/src/replay/worker.rs | 2 +- .../hummock/store/local_hummock_storage.rs | 9 +- src/storage/src/mem_table.rs | 46 ++++---- src/storage/src/memory.rs | 5 +- src/storage/src/monitor/monitored_store.rs | 8 +- src/storage/src/monitor/traced_store.rs | 8 +- src/storage/src/panic_store.rs | 9 +- src/storage/src/store.rs | 8 +- src/storage/src/store_impl.rs | 53 +++------ .../log_store_impl/kv_log_store/writer.rs | 10 +- src/stream/src/common/table/state_table.rs | 16 +-- .../executor/backfill/arrangement_backfill.rs | 8 +- src/stream/src/executor/backfill/utils.rs | 6 +- .../src/executor/dedup/append_only_dedup.rs | 4 +- src/stream/src/executor/dynamic_filter.rs | 8 +- src/stream/src/executor/now.rs | 2 +- src/stream/src/executor/simple_agg.rs | 9 +- src/stream/src/executor/sort.rs | 4 +- src/stream/src/executor/watermark_filter.rs | 2 +- .../src/delete_range_runner.rs | 4 +- 29 files changed, 159 insertions(+), 391 deletions(-) diff --git a/src/storage/hummock_test/benches/bench_hummock_iter.rs b/src/storage/hummock_test/benches/bench_hummock_iter.rs index e5ddfd9baf545..2e2efa14f872f 100644 --- a/src/storage/hummock_test/benches/bench_hummock_iter.rs +++ b/src/storage/hummock_test/benches/bench_hummock_iter.rs @@ -99,12 +99,7 @@ fn criterion_benchmark(c: &mut Criterion) { )) .unwrap(); } - runtime - .block_on( - hummock_storage - .seal_current_epoch(HummockEpoch::MAX, SealCurrentEpochOptions::for_test()), - ) - .unwrap(); + hummock_storage.seal_current_epoch(HummockEpoch::MAX, SealCurrentEpochOptions::for_test()); c.bench_function("bench-hummock-iter", move |b| { b.iter(|| { diff --git a/src/storage/hummock_test/src/bin/replay/replay_impl.rs b/src/storage/hummock_test/src/bin/replay/replay_impl.rs index 3ace63fb38c8f..3b414bca4b3dd 100644 --- a/src/storage/hummock_test/src/bin/replay/replay_impl.rs +++ b/src/storage/hummock_test/src/bin/replay/replay_impl.rs @@ -205,18 +205,11 @@ impl LocalReplay for LocalReplayImpl { .map_err(|_| TraceError::Other("init failed")) } - async fn seal_current_epoch( - &mut self, - next_epoch: u64, - opts: TracedSealCurrentEpochOptions, - ) -> Result<()> { - self.0 - .seal_current_epoch( - next_epoch, - opts.try_into().expect("should not fail to convert"), - ) - .await - .map_err(|_| TraceError::Other("seal current epoch failed")) + fn seal_current_epoch(&mut self, next_epoch: u64, opts: TracedSealCurrentEpochOptions) { + self.0.seal_current_epoch( + next_epoch, + opts.try_into().expect("should not fail to convert"), + ); } fn epoch(&self) -> u64 { diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index e820b18eb2a19..be0ebe204d745 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -165,15 +165,9 @@ pub(crate) mod tests { .await .unwrap(); if i + 1 < epochs.len() { - local - .seal_current_epoch(epochs[i + 1], SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + local.seal_current_epoch(epochs[i + 1], SealCurrentEpochOptions::for_test()); } else { - local - .seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); } let ssts = storage .seal_and_sync_epoch(epoch) @@ -555,10 +549,7 @@ pub(crate) mod tests { .unwrap(); } local.flush(Vec::new()).await.unwrap(); - local - .seal_current_epoch(epoch + 1, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + local.seal_current_epoch(epoch + 1, SealCurrentEpochOptions::for_test()); flush_and_commit(&hummock_meta_client, storage, epoch).await; } @@ -743,14 +734,8 @@ pub(crate) mod tests { .insert(TableKey(prefix.freeze()), val.clone(), None) .unwrap(); storage.flush(Vec::new()).await.unwrap(); - storage - .seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); - other - .seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + storage.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()); + other.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()); let ssts = global_storage .seal_and_sync_epoch(epoch) @@ -940,10 +925,7 @@ pub(crate) mod tests { .insert(TableKey(prefix.freeze()), val.clone(), None) .unwrap(); local.flush(Vec::new()).await.unwrap(); - local - .seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + local.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()); let ssts = storage .seal_and_sync_epoch(epoch) @@ -1141,10 +1123,7 @@ pub(crate) mod tests { .insert(TableKey(Bytes::from(ramdom_key)), val.clone(), None) .unwrap(); local.flush(Vec::new()).await.unwrap(); - local - .seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + local.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()); let ssts = storage .seal_and_sync_epoch(epoch) .await @@ -1315,10 +1294,7 @@ pub(crate) mod tests { .flush(vec![prefix_key_range(1u16), prefix_key_range(2u16)]) .await .unwrap(); - local - .seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); flush_and_commit(&hummock_meta_client, &storage, 130).await; diff --git a/src/storage/hummock_test/src/failpoint_tests.rs b/src/storage/hummock_test/src/failpoint_tests.rs index edbffbac40ad0..cbfec13e354fe 100644 --- a/src/storage/hummock_test/src/failpoint_tests.rs +++ b/src/storage/hummock_test/src/failpoint_tests.rs @@ -95,13 +95,10 @@ async fn test_failpoints_state_store_read_upload() { .await .unwrap(); - local - .seal_current_epoch( - 3, - risingwave_storage::store::SealCurrentEpochOptions::for_test(), - ) - .await - .unwrap(); + local.seal_current_epoch( + 3, + risingwave_storage::store::SealCurrentEpochOptions::for_test(), + ); // Get the value after flushing to remote. let anchor_prefix_hint = { @@ -137,13 +134,10 @@ async fn test_failpoints_state_store_read_upload() { .await .unwrap(); - local - .seal_current_epoch( - u64::MAX, - risingwave_storage::store::SealCurrentEpochOptions::for_test(), - ) - .await - .unwrap(); + local.seal_current_epoch( + u64::MAX, + risingwave_storage::store::SealCurrentEpochOptions::for_test(), + ); // sync epoch1 test the read_error let ssts = hummock_storage diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index d794fa4374a26..9c5e7fac402a9 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -164,10 +164,7 @@ async fn test_storage_basic() { assert_eq!(value, None); let epoch2 = epoch1 + 1; - hummock_storage - .seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); hummock_storage .ingest_batch( batch2, @@ -200,10 +197,7 @@ async fn test_storage_basic() { // Write the third batch. let epoch3 = epoch2 + 1; - hummock_storage - .seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + hummock_storage.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()); hummock_storage .ingest_batch( batch3, @@ -516,10 +510,7 @@ async fn test_state_store_sync() { .unwrap(); let epoch2 = epoch1 + 1; - hummock_storage - .seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); // ingest more 8B then will trigger a sync behind the scene let mut batch3 = vec![( @@ -751,10 +742,7 @@ async fn test_delete_get() { .await .unwrap(); let epoch2 = initial_epoch + 2; - hummock_storage - .seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); let batch2 = vec![( gen_key_from_str(VirtualNode::ZERO, "bb"), StorageValue::new_delete(), @@ -834,10 +822,7 @@ async fn test_multiple_epoch_sync() { .unwrap(); let epoch2 = initial_epoch + 2; - hummock_storage - .seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); let batch2 = vec![( gen_key_from_str(VirtualNode::ZERO, "bb"), StorageValue::new_delete(), @@ -855,10 +840,7 @@ async fn test_multiple_epoch_sync() { .unwrap(); let epoch3 = initial_epoch + 3; - hummock_storage - .seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + hummock_storage.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()); let batch3 = vec![ ( gen_key_from_str(VirtualNode::ZERO, "bb"), @@ -991,10 +973,7 @@ async fn test_iter_with_min_epoch() { .unwrap(); let epoch2 = (32 * 1000) << 16; - hummock_storage - .seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); // epoch 2 write let batch_epoch2: Vec<(TableKey, StorageValue)> = (20..30) .map(|index| (gen_key(index), StorageValue::new_put(gen_val(index)))) @@ -1215,10 +1194,7 @@ async fn test_hummock_version_reader() { .await .unwrap(); - hummock_storage - .seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); hummock_storage .ingest_batch( batch_epoch2, @@ -1231,10 +1207,7 @@ async fn test_hummock_version_reader() { .await .unwrap(); - hummock_storage - .seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + hummock_storage.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()); hummock_storage .ingest_batch( batch_epoch3, @@ -1612,10 +1585,7 @@ async fn test_get_with_min_epoch() { .unwrap(); let epoch2 = (32 * 1000) << 16; - hummock_storage - .seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); // epoch 2 write let batch_epoch2: Vec<(TableKey, StorageValue)> = (20..30) .map(|index| (gen_key(index), StorageValue::new_put(gen_val(index)))) @@ -1938,19 +1908,16 @@ async fn test_table_watermark() { (&mut local2, vnode_bitmap2.clone()), ] { local.flush(vec![]).await.unwrap(); - local - .seal_current_epoch( - epoch2, - SealCurrentEpochOptions::new( - vec![VnodeWatermark::new( - Arc::new(vnode_bitmap), - gen_inner_key(watermark1), - )], - WatermarkDirection::Ascending, - ), - ) - .await - .unwrap(); + local.seal_current_epoch( + epoch2, + SealCurrentEpochOptions::new( + vec![VnodeWatermark::new( + Arc::new(vnode_bitmap), + gen_inner_key(watermark1), + )], + WatermarkDirection::Ascending, + ), + ); } // test read after seal with watermark1 @@ -2042,10 +2009,7 @@ async fn test_table_watermark() { local.insert(key, value, None).unwrap(); } local.flush(vec![]).await.unwrap(); - local - .seal_current_epoch(epoch3, SealCurrentEpochOptions::no_watermark()) - .await - .unwrap(); + local.seal_current_epoch(epoch3, SealCurrentEpochOptions::no_watermark()); } let indexes_after_epoch2 = || gen_range().filter(|index| index % 3 == 0 || index % 3 == 1); @@ -2284,19 +2248,16 @@ async fn test_table_watermark() { (&mut local2, vnode_bitmap2.clone()), ] { // regress watermark - local - .seal_current_epoch( - epoch4, - SealCurrentEpochOptions::new( - vec![VnodeWatermark::new( - Arc::new(vnode_bitmap), - gen_inner_key(5), - )], - WatermarkDirection::Ascending, - ), - ) - .await - .unwrap(); + local.seal_current_epoch( + epoch4, + SealCurrentEpochOptions::new( + vec![VnodeWatermark::new( + Arc::new(vnode_bitmap), + gen_inner_key(5), + )], + WatermarkDirection::Ascending, + ), + ); } test_global_read(test_env.storage.clone(), epoch3).await; diff --git a/src/storage/hummock_test/src/snapshot_tests.rs b/src/storage/hummock_test/src/snapshot_tests.rs index 5458f1dbed5f3..61e20c3799a37 100644 --- a/src/storage/hummock_test/src/snapshot_tests.rs +++ b/src/storage/hummock_test/src/snapshot_tests.rs @@ -133,10 +133,7 @@ async fn test_snapshot_inner( .await .unwrap(); let epoch2 = epoch1 + 1; - local - .seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + local.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); if enable_sync { let ssts = hummock_storage .seal_and_sync_epoch(epoch1) @@ -181,10 +178,7 @@ async fn test_snapshot_inner( .await .unwrap(); let epoch3 = epoch2 + 1; - local - .seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + local.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()); if enable_sync { let ssts = hummock_storage .seal_and_sync_epoch(epoch2) @@ -229,10 +223,7 @@ async fn test_snapshot_inner( ) .await .unwrap(); - local - .seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); if enable_sync { let ssts = hummock_storage .seal_and_sync_epoch(epoch3) @@ -295,10 +286,7 @@ async fn test_snapshot_range_scan_inner( ) .await .unwrap(); - local - .seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); if enable_sync { let ssts = hummock_storage .seal_and_sync_epoch(epoch) diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index b4dac4a0269e5..9babe12b9054b 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -160,10 +160,7 @@ async fn test_basic_inner( .unwrap(); let epoch2 = epoch1 + 1; - local - .seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + local.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); // Get the value after flushing to remote. let value = hummock_storage @@ -221,10 +218,7 @@ async fn test_basic_inner( .unwrap(); let epoch3 = epoch2 + 1; - local - .seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + local.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()); // Get the value after flushing to remote. let value = hummock_storage @@ -255,10 +249,7 @@ async fn test_basic_inner( .await .unwrap(); - local - .seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); // Get the value after flushing to remote. let value = hummock_storage @@ -490,10 +481,7 @@ async fn test_state_store_sync_inner( // ); epoch += 1; - local - .seal_current_epoch(epoch, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + local.seal_current_epoch(epoch, SealCurrentEpochOptions::for_test()); // ingest more 8B then will trigger a sync behind the scene let mut batch3 = vec![( @@ -521,10 +509,7 @@ async fn test_state_store_sync_inner( // hummock_storage.shared_buffer_manager().size() as u64 // ); - local - .seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); // trigger a sync hummock_storage @@ -990,10 +975,7 @@ async fn test_write_anytime_inner( assert_new_value(epoch1).await; let epoch2 = epoch1 + 1; - local - .seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + local.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); // Write to epoch2 local @@ -1007,10 +989,7 @@ async fn test_write_anytime_inner( ) .await .unwrap(); - local - .seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); // Assert epoch 1 unchanged assert_new_value(epoch1).await; // Assert epoch 2 correctness @@ -1079,10 +1058,7 @@ async fn test_delete_get_inner( meta_client.commit_epoch(epoch1, ssts).await.unwrap(); let epoch2 = initial_epoch + 2; - local - .seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + local.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); let batch2 = vec![( gen_key_from_str(VirtualNode::ZERO, "bb"), StorageValue::new_delete(), @@ -1098,10 +1074,7 @@ async fn test_delete_get_inner( ) .await .unwrap(); - local - .seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); let ssts = hummock_storage .seal_and_sync_epoch(epoch2) .await @@ -1164,10 +1137,7 @@ async fn test_multiple_epoch_sync_inner( .unwrap(); let epoch2 = initial_epoch + 2; - local - .seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + local.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); let batch2 = vec![( gen_key_from_str(VirtualNode::ZERO, "bb"), StorageValue::new_delete(), @@ -1195,10 +1165,7 @@ async fn test_multiple_epoch_sync_inner( StorageValue::new_put("555"), ), ]; - local - .seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + local.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()); local .ingest_batch( batch3, @@ -1210,10 +1177,7 @@ async fn test_multiple_epoch_sync_inner( ) .await .unwrap(); - local - .seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); let test_get = || { let hummock_storage_clone = &hummock_storage; async move { @@ -1325,10 +1289,7 @@ async fn test_gc_watermark_and_clear_shared_buffer() { ); let epoch2 = initial_epoch + 2; - local_hummock_storage - .seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + local_hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); local_hummock_storage .delete( gen_key_from_str(VirtualNode::ZERO, "bb"), @@ -1351,10 +1312,7 @@ async fn test_gc_watermark_and_clear_shared_buffer() { .min() .unwrap() }; - local_hummock_storage - .seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()) - .await - .unwrap(); + local_hummock_storage.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); let sync_result1 = hummock_storage.seal_and_sync_epoch(epoch1).await.unwrap(); let min_object_id_epoch1 = min_object_id(&sync_result1); assert_eq!( diff --git a/src/storage/hummock_test/src/sync_point_tests.rs b/src/storage/hummock_test/src/sync_point_tests.rs index 3dfc39e4369b7..37a7db52e7e1a 100644 --- a/src/storage/hummock_test/src/sync_point_tests.rs +++ b/src/storage/hummock_test/src/sync_point_tests.rs @@ -297,13 +297,10 @@ async fn test_syncpoints_get_in_delete_range_boundary() { ) .unwrap(); local.flush(Vec::new()).await.unwrap(); - local - .seal_current_epoch( - 101, - risingwave_storage::store::SealCurrentEpochOptions::for_test(), - ) - .await - .unwrap(); + local.seal_current_epoch( + 101, + risingwave_storage::store::SealCurrentEpochOptions::for_test(), + ); flush_and_commit(&hummock_meta_client, &storage, 100).await; compact_once( hummock_manager_ref.clone(), @@ -334,13 +331,10 @@ async fn test_syncpoints_get_in_delete_range_boundary() { )]) .await .unwrap(); - local - .seal_current_epoch( - 102, - risingwave_storage::store::SealCurrentEpochOptions::for_test(), - ) - .await - .unwrap(); + local.seal_current_epoch( + 102, + risingwave_storage::store::SealCurrentEpochOptions::for_test(), + ); flush_and_commit(&hummock_meta_client, &storage, 101).await; compact_once( hummock_manager_ref.clone(), @@ -371,13 +365,10 @@ async fn test_syncpoints_get_in_delete_range_boundary() { )]) .await .unwrap(); - local - .seal_current_epoch( - 103, - risingwave_storage::store::SealCurrentEpochOptions::for_test(), - ) - .await - .unwrap(); + local.seal_current_epoch( + 103, + risingwave_storage::store::SealCurrentEpochOptions::for_test(), + ); flush_and_commit(&hummock_meta_client, &storage, 102).await; // move this two file to the same level. compact_once( @@ -403,13 +394,10 @@ async fn test_syncpoints_get_in_delete_range_boundary() { ) .unwrap(); local.flush(Vec::new()).await.unwrap(); - local - .seal_current_epoch( - u64::MAX, - risingwave_storage::store::SealCurrentEpochOptions::for_test(), - ) - .await - .unwrap(); + local.seal_current_epoch( + u64::MAX, + risingwave_storage::store::SealCurrentEpochOptions::for_test(), + ); flush_and_commit(&hummock_meta_client, &storage, 103).await; // move this two file to the same level. compact_once( diff --git a/src/storage/hummock_trace/src/replay/mod.rs b/src/storage/hummock_trace/src/replay/mod.rs index c98f3653bc410..df6c191f31764 100644 --- a/src/storage/hummock_trace/src/replay/mod.rs +++ b/src/storage/hummock_trace/src/replay/mod.rs @@ -60,11 +60,7 @@ pub(crate) enum WorkerId { #[async_trait::async_trait] pub trait LocalReplay: LocalReplayRead + ReplayWrite + Send + Sync { async fn init(&mut self, options: TracedInitOptions) -> Result<()>; - async fn seal_current_epoch( - &mut self, - next_epoch: u64, - opts: TracedSealCurrentEpochOptions, - ) -> Result<()>; + fn seal_current_epoch(&mut self, next_epoch: u64, opts: TracedSealCurrentEpochOptions); fn is_dirty(&self) -> bool; fn epoch(&self) -> u64; async fn flush( @@ -189,7 +185,7 @@ mock! { #[async_trait::async_trait] impl LocalReplay for LocalReplayInterface{ async fn init(&mut self, options: TracedInitOptions) -> Result<()>; - async fn seal_current_epoch(&mut self, next_epoch: u64, opts: TracedSealCurrentEpochOptions) -> Result<()>; + fn seal_current_epoch(&mut self, next_epoch: u64, opts: TracedSealCurrentEpochOptions); fn is_dirty(&self) -> bool; fn epoch(&self) -> u64; async fn flush(&mut self, delete_ranges: Vec<(Bound, Bound)>) -> Result; diff --git a/src/storage/hummock_trace/src/replay/worker.rs b/src/storage/hummock_trace/src/replay/worker.rs index 26a43ca92ec1d..f77543cf92b9d 100644 --- a/src/storage/hummock_trace/src/replay/worker.rs +++ b/src/storage/hummock_trace/src/replay/worker.rs @@ -347,7 +347,7 @@ impl ReplayWorker { Operation::SealCurrentEpoch { epoch, opts } => { assert_ne!(storage_type, StorageType::Global); let local_storage = local_storages.get_mut(&storage_type).unwrap(); - local_storage.seal_current_epoch(epoch, opts).await.unwrap(); + local_storage.seal_current_epoch(epoch, opts); } Operation::ValidateReadEpoch(epoch) => { assert_eq!(storage_type, StorageType::Global); diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index a633ef0a87c9e..72c24cc5b1cb7 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -394,11 +394,7 @@ impl LocalStateStore for LocalHummockStorage { Ok(()) } - async fn seal_current_epoch( - &mut self, - next_epoch: u64, - mut opts: SealCurrentEpochOptions, - ) -> StorageResult<()> { + fn seal_current_epoch(&mut self, next_epoch: u64, mut opts: SealCurrentEpochOptions) { assert!(!self.is_dirty()); let prev_epoch = self .epoch @@ -429,8 +425,7 @@ impl LocalStateStore for LocalHummockStorage { epoch: prev_epoch, opts, }) - .expect("should be able to send"); - Ok(()) + .expect("should be able to send") } } diff --git a/src/storage/src/mem_table.rs b/src/storage/src/mem_table.rs index 90b77029dcbb2..dd52bb15f1105 100644 --- a/src/storage/src/mem_table.rs +++ b/src/storage/src/mem_table.rs @@ -29,6 +29,7 @@ use risingwave_common::hash::VnodeBitmapExt; use risingwave_hummock_sdk::key::{prefixed_range_with_vnode, FullKey, TableKey, TableKeyRange}; use risingwave_hummock_sdk::table_watermark::WatermarkDirection; use thiserror::Error; +use tracing::error; use crate::error::{StorageError, StorageResult}; use crate::hummock::iterator::{FromRustIterator, RustIteratorBuilder}; @@ -573,16 +574,14 @@ impl LocalStateStore for MemtableLocalState } } } - self.inner - .ingest_batch( - kv_pairs, - delete_ranges, - WriteOptions { - epoch: self.epoch(), - table_id: self.table_id, - }, - ) - .await + self.inner.ingest_batch( + kv_pairs, + delete_ranges, + WriteOptions { + epoch: self.epoch(), + table_id: self.table_id, + }, + ) } fn epoch(&self) -> u64 { @@ -603,11 +602,7 @@ impl LocalStateStore for MemtableLocalState Ok(()) } - async fn seal_current_epoch( - &mut self, - next_epoch: u64, - opts: SealCurrentEpochOptions, - ) -> StorageResult<()> { + fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) { assert!(!self.is_dirty()); let prev_epoch = self .epoch @@ -641,18 +636,17 @@ impl LocalStateStore for MemtableLocalState }) }) .collect_vec(); - self.inner - .ingest_batch( - Vec::new(), - delete_ranges, - WriteOptions { - epoch: self.epoch(), - table_id: self.table_id, - }, - ) - .await?; + if let Err(e) = self.inner.ingest_batch( + Vec::new(), + delete_ranges, + WriteOptions { + epoch: self.epoch(), + table_id: self.table_id, + }, + ) { + error!(err = ?e, "failed to write delete ranges of table watermark"); + } } - Ok(()) } async fn try_flush(&mut self) -> StorageResult<()> { diff --git a/src/storage/src/memory.rs b/src/storage/src/memory.rs index 8dfce8a7ae1b9..b483ff963cb88 100644 --- a/src/storage/src/memory.rs +++ b/src/storage/src/memory.rs @@ -575,8 +575,7 @@ impl StateStoreRead for RangeKvStateStore { } impl StateStoreWrite for RangeKvStateStore { - #[allow(clippy::unused_async)] - async fn ingest_batch( + fn ingest_batch( &self, mut kv_pairs: Vec<(TableKey, StorageValue)>, delete_ranges: Vec<(Bound, Bound)>, @@ -747,7 +746,6 @@ mod tests { table_id: Default::default(), }, ) - .await .unwrap(); state_store .ingest_batch( @@ -767,7 +765,6 @@ mod tests { table_id: Default::default(), }, ) - .await .unwrap(); assert_eq!( state_store diff --git a/src/storage/src/monitor/monitored_store.rs b/src/storage/src/monitor/monitored_store.rs index 5fcf410a8508e..a4944a7a99195 100644 --- a/src/storage/src/monitor/monitored_store.rs +++ b/src/storage/src/monitor/monitored_store.rs @@ -280,13 +280,9 @@ impl LocalStateStore for MonitoredStateStore { self.inner.init(options).await } - async fn seal_current_epoch( - &mut self, - next_epoch: u64, - opts: SealCurrentEpochOptions, - ) -> StorageResult<()> { + fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) { // TODO: may collect metrics - self.inner.seal_current_epoch(next_epoch, opts).await + self.inner.seal_current_epoch(next_epoch, opts) } fn try_flush(&mut self) -> impl Future> + Send + '_ { diff --git a/src/storage/src/monitor/traced_store.rs b/src/storage/src/monitor/traced_store.rs index eddaf592fa747..cd65931a7e602 100644 --- a/src/storage/src/monitor/traced_store.rs +++ b/src/storage/src/monitor/traced_store.rs @@ -208,17 +208,13 @@ impl LocalStateStore for TracedStateStore { self.inner.init(options).await } - async fn seal_current_epoch( - &mut self, - next_epoch: u64, - opts: SealCurrentEpochOptions, - ) -> StorageResult<()> { + fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) { let _span = TraceSpan::new_seal_current_epoch_span( next_epoch, TracedSealCurrentEpochOptions::from(opts.clone()), self.storage_type, ); - self.inner.seal_current_epoch(next_epoch, opts).await + self.inner.seal_current_epoch(next_epoch, opts) } async fn try_flush(&mut self) -> StorageResult<()> { diff --git a/src/storage/src/panic_store.rs b/src/storage/src/panic_store.rs index 5e22e9ecd5a5e..bee3e1e8ec0b8 100644 --- a/src/storage/src/panic_store.rs +++ b/src/storage/src/panic_store.rs @@ -55,8 +55,7 @@ impl StateStoreRead for PanicStateStore { } impl StateStoreWrite for PanicStateStore { - #[allow(clippy::unused_async)] - async fn ingest_batch( + fn ingest_batch( &self, _kv_pairs: Vec<(TableKey, StorageValue)>, _delete_ranges: Vec<(Bound, Bound)>, @@ -130,11 +129,7 @@ impl LocalStateStore for PanicStateStore { panic!("should not operate on the panic state store!"); } - async fn seal_current_epoch( - &mut self, - _next_epoch: u64, - _opts: SealCurrentEpochOptions, - ) -> StorageResult<()> { + fn seal_current_epoch(&mut self, _next_epoch: u64, _opts: SealCurrentEpochOptions) { panic!("should not operate on the panic state store!") } diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index fe0610031dbb0..cd97ca3521a79 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -155,7 +155,7 @@ pub trait StateStoreWrite: StaticSendSync { kv_pairs: Vec<(TableKey, StorageValue)>, delete_ranges: Vec<(Bound, Bound)>, write_options: WriteOptions, - ) -> impl Future> + Send + '_; + ) -> StorageResult; } #[derive(Default, Debug)] @@ -256,11 +256,7 @@ pub trait LocalStateStore: StaticSendSync { /// Updates the monotonically increasing write epoch to `new_epoch`. /// All writes after this function is called will be tagged with `new_epoch`. In other words, /// the previous write epoch is sealed. - fn seal_current_epoch( - &mut self, - next_epoch: u64, - opts: SealCurrentEpochOptions, - ) -> impl Future> + Send + '_; + fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions); /// Check existence of a given `key_range`. /// It is better to provide `prefix_hint` in `read_options`, which will be used diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index c831aef6663fe..faa8a886443f3 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -317,24 +317,19 @@ pub mod verify { } impl StateStoreWrite for VerifyStateStore { - async fn ingest_batch( + fn ingest_batch( &self, kv_pairs: Vec<(TableKey, StorageValue)>, delete_ranges: Vec<(Bound, Bound)>, write_options: WriteOptions, ) -> StorageResult { - let actual = self - .actual - .ingest_batch( - kv_pairs.clone(), - delete_ranges.clone(), - write_options.clone(), - ) - .await; + let actual = self.actual.ingest_batch( + kv_pairs.clone(), + delete_ranges.clone(), + write_options.clone(), + ); if let Some(expected) = &self.expected { - let expected = expected - .ingest_batch(kv_pairs, delete_ranges, write_options) - .await; + let expected = expected.ingest_batch(kv_pairs, delete_ranges, write_options); assert_eq!(actual.is_err(), expected.is_err()); } actual @@ -445,17 +440,11 @@ pub mod verify { Ok(()) } - async fn seal_current_epoch( - &mut self, - next_epoch: u64, - opts: SealCurrentEpochOptions, - ) -> StorageResult<()> { + fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) { if let Some(expected) = &mut self.expected { - expected - .seal_current_epoch(next_epoch, opts.clone()) - .await?; + expected.seal_current_epoch(next_epoch, opts.clone()); } - self.actual.seal_current_epoch(next_epoch, opts).await + self.actual.seal_current_epoch(next_epoch, opts); } fn epoch(&self) -> u64 { @@ -795,11 +784,7 @@ pub mod boxed_state_store { async fn init(&mut self, epoch: InitOptions) -> StorageResult<()>; - async fn seal_current_epoch( - &mut self, - next_epoch: u64, - opts: SealCurrentEpochOptions, - ) -> StorageResult<()>; + fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions); } #[async_trait::async_trait] @@ -864,12 +849,8 @@ pub mod boxed_state_store { self.init(options).await } - async fn seal_current_epoch( - &mut self, - next_epoch: u64, - opts: SealCurrentEpochOptions, - ) -> StorageResult<()> { - self.seal_current_epoch(next_epoch, opts).await + fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) { + self.seal_current_epoch(next_epoch, opts) } } @@ -941,12 +922,8 @@ pub mod boxed_state_store { self.deref_mut().init(options) } - async fn seal_current_epoch( - &mut self, - next_epoch: u64, - opts: SealCurrentEpochOptions, - ) -> StorageResult<()> { - self.deref_mut().seal_current_epoch(next_epoch, opts).await + fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) { + self.deref_mut().seal_current_epoch(next_epoch, opts) } } diff --git a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs index 4a56481b8f7b3..fa380a77b6aee 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs @@ -160,12 +160,10 @@ impl LogWriter for KvLogStoreWriter { } self.state_store.flush(vec![]).await?; let watermark = watermark.into_iter().collect_vec(); - self.state_store - .seal_current_epoch( - next_epoch, - SealCurrentEpochOptions::new(watermark, WatermarkDirection::Ascending), - ) - .await?; + self.state_store.seal_current_epoch( + next_epoch, + SealCurrentEpochOptions::new(watermark, WatermarkDirection::Ascending), + ); self.tx.barrier(epoch, is_checkpoint, next_epoch); self.seq_id = FIRST_SEQ_ID; Ok(()) diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 9d58292171492..d887684686977 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -1041,8 +1041,7 @@ where if !self.is_dirty() { // If the state table is not modified, go fast path. self.local_store - .seal_current_epoch(new_epoch.curr, SealCurrentEpochOptions::no_watermark()) - .await?; + .seal_current_epoch(new_epoch.curr, SealCurrentEpochOptions::no_watermark()); return Ok(()); } else { self.seal_current_epoch(new_epoch.curr) @@ -1104,19 +1103,14 @@ where // TODO(st1page): maybe we should extract a pub struct to do it /// just specially used by those state table read-only and after the call the data /// in the epoch will be visible - pub async fn commit_no_data_expected( - &mut self, - new_epoch: EpochPair, - ) -> StreamExecutorResult<()> { + pub fn commit_no_data_expected(&mut self, new_epoch: EpochPair) { assert_eq!(self.epoch(), new_epoch.prev); assert!(!self.is_dirty()); // Tick the watermark buffer here because state table is expected to be committed once // per epoch. self.watermark_buffer_strategy.tick(); self.local_store - .seal_current_epoch(new_epoch.curr, SealCurrentEpochOptions::no_watermark()) - .await?; - Ok(()) + .seal_current_epoch(new_epoch.curr, SealCurrentEpochOptions::no_watermark()); } /// Write to state store. @@ -1215,9 +1209,7 @@ where } None => SealCurrentEpochOptions::no_watermark(), }; - self.local_store - .seal_current_epoch(next_epoch, seal_opt) - .await?; + self.local_store.seal_current_epoch(next_epoch, seal_opt); Ok(()) } diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs index fecc79cf679b6..28fcaa8862faa 100644 --- a/src/stream/src/executor/backfill/arrangement_backfill.rs +++ b/src/stream/src/executor/backfill/arrangement_backfill.rs @@ -377,9 +377,7 @@ where } if upstream_chunk_buffer_is_empty { - upstream_table - .commit_no_data_expected(barrier.epoch) - .await?; + upstream_table.commit_no_data_expected(barrier.epoch) } else { upstream_table.commit(barrier.epoch).await?; } @@ -498,9 +496,7 @@ where "backfill_finished_after_barrier" ); if let Message::Barrier(barrier) = &msg { - self.state_table - .commit_no_data_expected(barrier.epoch) - .await?; + self.state_table.commit_no_data_expected(barrier.epoch); } yield msg; } diff --git a/src/stream/src/executor/backfill/utils.rs b/src/stream/src/executor/backfill/utils.rs index e39c9b5fd92dd..d344b23c294dc 100644 --- a/src/stream/src/executor/backfill/utils.rs +++ b/src/stream/src/executor/backfill/utils.rs @@ -462,7 +462,7 @@ pub(crate) async fn flush_data( let vnodes = table.vnodes().clone(); if let Some(old_state) = old_state { if old_state[1..] == current_partial_state[1..] { - table.commit_no_data_expected(epoch).await?; + table.commit_no_data_expected(epoch); return Ok(()); } else { vnodes.iter_vnodes_scalar().for_each(|vnode| { @@ -688,7 +688,7 @@ pub(crate) async fn persist_state_per_vnode( flush_data(table, epoch, old_state, current_state).await?; *old_state = Some(current_state.into()); } else { - table.commit_no_data_expected(epoch).await?; + table.commit_no_data_expected(epoch); } Ok(()) } diff --git a/src/stream/src/executor/dedup/append_only_dedup.rs b/src/stream/src/executor/dedup/append_only_dedup.rs index 0f3e9d10fc045..3c44e64cb2aea 100644 --- a/src/stream/src/executor/dedup/append_only_dedup.rs +++ b/src/stream/src/executor/dedup/append_only_dedup.rs @@ -140,9 +140,7 @@ impl AppendOnlyDedupExecutor { self.state_table.commit(barrier.epoch).await?; commit_data = false; } else { - self.state_table - .commit_no_data_expected(barrier.epoch) - .await?; + self.state_table.commit_no_data_expected(barrier.epoch); } if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(self.ctx.id) { diff --git a/src/stream/src/executor/dynamic_filter.rs b/src/stream/src/executor/dynamic_filter.rs index e859d646bebdd..406acb2536ba4 100644 --- a/src/stream/src/executor/dynamic_filter.rs +++ b/src/stream/src/executor/dynamic_filter.rs @@ -464,16 +464,12 @@ impl DynamicFilterExecutor NowExecutor { initialized = true; } else if paused { // Assert that no data is updated. - state_table.commit_no_data_expected(barrier.epoch).await?; + state_table.commit_no_data_expected(barrier.epoch); } else { state_table.commit(barrier.epoch).await?; } diff --git a/src/stream/src/executor/simple_agg.rs b/src/stream/src/executor/simple_agg.rs index 104ffeb001793..0d33a7dc3074e 100644 --- a/src/stream/src/executor/simple_agg.rs +++ b/src/stream/src/executor/simple_agg.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use futures::future::try_join_all; use futures::StreamExt; use futures_async_stream::try_stream; use risingwave_common::array::StreamChunk; @@ -229,11 +228,9 @@ impl SimpleAggExecutor { } else { // No state is changed. // Call commit on state table to increment the epoch. - try_join_all( - this.all_state_tables_mut() - .map(|table| table.commit_no_data_expected(epoch)), - ) - .await?; + this.all_state_tables_mut().for_each(|table| { + table.commit_no_data_expected(epoch); + }); None }; diff --git a/src/stream/src/executor/sort.rs b/src/stream/src/executor/sort.rs index 60fba19314409..42412d4f22587 100644 --- a/src/stream/src/executor/sort.rs +++ b/src/stream/src/executor/sort.rs @@ -148,9 +148,7 @@ impl SortExecutor { if vars.buffer_changed { this.buffer_table.commit(barrier.epoch).await?; } else { - this.buffer_table - .commit_no_data_expected(barrier.epoch) - .await?; + this.buffer_table.commit_no_data_expected(barrier.epoch); } vars.buffer_changed = false; diff --git a/src/stream/src/executor/watermark_filter.rs b/src/stream/src/executor/watermark_filter.rs index b920840cf741d..29418e82c0fd5 100644 --- a/src/stream/src/executor/watermark_filter.rs +++ b/src/stream/src/executor/watermark_filter.rs @@ -256,7 +256,7 @@ impl WatermarkFilterExecutor { } table.commit(barrier.epoch).await?; } else { - table.commit_no_data_expected(barrier.epoch).await?; + table.commit_no_data_expected(barrier.epoch); } if barrier.kind.is_checkpoint() { diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index 869029b53223d..2aab4679bb634 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -425,9 +425,7 @@ impl NormalState { .await .map_err(|e| format!("{:?}", e))?; self.storage - .seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()) - .await - .map_err(|e| format!("{:?}", e))?; + .seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()); Ok(()) }