From f1cf8ec08052b2c3c315188c24009c9843d84690 Mon Sep 17 00:00:00 2001 From: William Wen Date: Fri, 20 Sep 2024 16:17:08 +0800 Subject: [PATCH] feat(storage): per table try wait epoch --- proto/stream_service.proto | 1 + src/compute/src/rpc/service/stream_service.rs | 11 +++- src/meta/src/barrier/command.rs | 21 ++++++-- src/meta/src/barrier/mod.rs | 5 +- src/storage/hummock_sdk/src/version.rs | 7 +++ .../src/bin/replay/replay_impl.rs | 10 ++-- .../hummock_test/src/failpoint_tests.rs | 13 +++-- .../hummock_test/src/hummock_storage_tests.rs | 24 ++++----- .../hummock_test/src/snapshot_tests.rs | 22 ++++++-- .../hummock_test/src/state_store_tests.rs | 28 +++++++--- src/storage/hummock_test/src/test_utils.rs | 7 ++- src/storage/hummock_trace/src/collector.rs | 12 +++-- src/storage/hummock_trace/src/opts.rs | 5 ++ src/storage/hummock_trace/src/record.rs | 4 +- src/storage/hummock_trace/src/replay/mod.rs | 10 ++-- .../hummock_trace/src/replay/worker.rs | 4 +- .../event_handler/hummock_event_handler.rs | 19 +++---- .../src/hummock/event_handler/uploader/mod.rs | 5 +- .../hummock/local_version/recent_versions.rs | 7 +-- .../src/hummock/store/hummock_storage.rs | 17 +++---- .../hummock/store/local_hummock_storage.rs | 18 +++++-- src/storage/src/hummock/store/version.rs | 5 +- src/storage/src/hummock/utils.rs | 51 +++++++++++++++---- src/storage/src/memory.rs | 6 ++- src/storage/src/monitor/monitored_store.rs | 3 +- src/storage/src/monitor/traced_store.rs | 10 ++-- src/storage/src/panic_store.rs | 6 ++- src/storage/src/store.rs | 32 +++++++++++- src/storage/src/store_impl.rs | 20 ++++++-- .../src/table/batch_table/storage_table.rs | 26 ++++++++-- .../common/log_store_impl/kv_log_store/mod.rs | 21 ++++++-- .../source/source_backfill_executor.rs | 7 ++- .../src/executor/source/source_executor.rs | 11 +++- 33 files changed, 327 insertions(+), 121 deletions(-) diff --git a/proto/stream_service.proto b/proto/stream_service.proto index c13ee8875b43f..ab56c9f7e4050 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -49,6 +49,7 @@ message BarrierCompleteResponse { message WaitEpochCommitRequest { uint64 epoch = 1; + uint32 table_id = 2; } message WaitEpochCommitResponse { diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index 6253cfe74c730..e8d403a9693cc 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -17,6 +17,7 @@ use futures::{Stream, StreamExt, TryStreamExt}; use risingwave_pb::stream_service::stream_service_server::StreamService; use risingwave_pb::stream_service::*; use risingwave_storage::dispatch_state_store; +use risingwave_storage::store::TryWaitEpochOptions; use risingwave_stream::error::StreamError; use risingwave_stream::task::{LocalStreamManager, StreamEnvironment}; use tokio::sync::mpsc::unbounded_channel; @@ -45,14 +46,20 @@ impl StreamService for StreamServiceImpl { &self, request: Request, ) -> Result, Status> { - let epoch = request.into_inner().epoch; + let request = request.into_inner(); + let epoch = request.epoch; dispatch_state_store!(self.env.state_store(), store, { use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_storage::StateStore; store - .try_wait_epoch(HummockReadEpoch::Committed(epoch)) + .try_wait_epoch( + HummockReadEpoch::Committed(epoch), + TryWaitEpochOptions { + table_id: request.table_id.into(), + }, + ) .instrument_await(format!("wait_epoch_commit (epoch {})", epoch)) .await .map_err(StreamError::from)?; diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index c18ad5d0f2b3b..1eff171d019b2 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -22,7 +22,6 @@ use risingwave_common::hash::ActorMapping; use risingwave_common::types::Timestamptz; use risingwave_common::util::epoch::Epoch; use risingwave_connector::source::SplitImpl; -use risingwave_hummock_sdk::HummockEpoch; use risingwave_pb::catalog::{CreateType, Table}; use risingwave_pb::common::PbWorkerNode; use risingwave_pb::meta::table_fragments::PbActorStatus; @@ -442,6 +441,8 @@ pub struct CommandContext { pub prev_epoch: TracedEpoch, pub curr_epoch: TracedEpoch, + pub table_ids_to_commit: HashSet, + pub current_paused_reason: Option, pub command: Command, @@ -470,12 +471,12 @@ impl std::fmt::Debug for CommandContext { } impl CommandContext { - #[allow(clippy::too_many_arguments)] pub(super) fn new( node_map: HashMap, subscription_info: InflightSubscriptionInfo, prev_epoch: TracedEpoch, curr_epoch: TracedEpoch, + table_ids_to_commit: HashSet, current_paused_reason: Option, command: Command, kind: BarrierKind, @@ -487,6 +488,7 @@ impl CommandContext { subscription_info, prev_epoch, curr_epoch, + table_ids_to_commit, current_paused_reason, command, kind, @@ -945,7 +947,13 @@ impl Command { } impl CommandContext { - pub async fn wait_epoch_commit(&self, epoch: HummockEpoch) -> MetaResult<()> { + pub async fn wait_epoch_commit(&self) -> MetaResult<()> { + let table_id = self.table_ids_to_commit.iter().next().cloned(); + // try wait epoch on an existing random table id + let Some(table_id) = table_id else { + // no need to wait epoch when there is no table id + return Ok(()); + }; let futures = self.node_map.values().map(|worker_node| async { let client = self .barrier_manager_context @@ -953,7 +961,10 @@ impl CommandContext { .stream_client_pool() .get(worker_node) .await?; - let request = WaitEpochCommitRequest { epoch }; + let request = WaitEpochCommitRequest { + epoch: self.prev_epoch.value().0, + table_id: table_id.table_id, + }; client.wait_epoch_commit(request).await }); @@ -976,7 +987,7 @@ impl CommandContext { // storage version with this epoch is synced to all compute nodes before the // execution of the next command of `Update`, as some newly created operators // may immediately initialize their states on that barrier. - self.wait_epoch_commit(self.prev_epoch.value().0).await?; + self.wait_epoch_commit().await?; } } diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 5bb76ee46133c..e05719f41dde4 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -1029,11 +1029,14 @@ impl GlobalBarrierManager { }); span.record("epoch", curr_epoch.value().0); + let table_ids_to_commit: HashSet<_> = pre_applied_graph_info.existing_table_ids().collect(); + let command_ctx = Arc::new(CommandContext::new( self.active_streaming_nodes.current().clone(), pre_applied_subscription_info, prev_epoch.clone(), curr_epoch.clone(), + table_ids_to_commit.clone(), self.state.paused_reason(), command, kind, @@ -1043,8 +1046,6 @@ impl GlobalBarrierManager { send_latency_timer.observe_duration(); - let table_ids_to_commit: HashSet<_> = pre_applied_graph_info.existing_table_ids().collect(); - let mut jobs_to_wait = HashSet::new(); for (table_id, creating_job) in &mut self.checkpoint_control.creating_streaming_job_controls diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index fe2825cc8ad0c..954426384c818 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -428,6 +428,13 @@ impl HummockVersion { self.max_committed_epoch } + pub fn table_committed_epoch(&self, table_id: TableId) -> Option { + self.state_table_info + .info() + .get(&table_id) + .map(|info| info.committed_epoch) + } + pub fn visible_table_committed_epoch(&self) -> u64 { self.max_committed_epoch } diff --git a/src/storage/hummock_test/src/bin/replay/replay_impl.rs b/src/storage/hummock_test/src/bin/replay/replay_impl.rs index 282409f394476..6653db94e0543 100644 --- a/src/storage/hummock_test/src/bin/replay/replay_impl.rs +++ b/src/storage/hummock_test/src/bin/replay/replay_impl.rs @@ -25,7 +25,7 @@ use risingwave_hummock_sdk::{HummockReadEpoch, HummockVersionId, SyncResult}; use risingwave_hummock_trace::{ GlobalReplay, LocalReplay, LocalReplayRead, ReplayItem, ReplayRead, ReplayStateStore, ReplayWrite, Result, TraceError, TracedBytes, TracedInitOptions, TracedNewLocalOptions, - TracedReadOptions, TracedSealCurrentEpochOptions, TracedSubResp, + TracedReadOptions, TracedSealCurrentEpochOptions, TracedSubResp, TracedTryWaitEpochOptions, }; use risingwave_meta::manager::{MessageStatus, MetaSrvEnv, NotificationManagerRef, WorkerKey}; use risingwave_pb::common::WorkerNode; @@ -170,9 +170,13 @@ impl ReplayStateStore for GlobalReplayImpl { Box::new(LocalReplayImpl(local_storage)) } - async fn try_wait_epoch(&self, epoch: HummockReadEpoch) -> Result<()> { + async fn try_wait_epoch( + &self, + epoch: HummockReadEpoch, + options: TracedTryWaitEpochOptions, + ) -> Result<()> { self.store - .try_wait_epoch(epoch) + .try_wait_epoch(epoch, options.into()) .await .map_err(|_| TraceError::TryWaitEpochFailed)?; Ok(()) diff --git a/src/storage/hummock_test/src/failpoint_tests.rs b/src/storage/hummock_test/src/failpoint_tests.rs index 27072abba08f2..cba7851c8cf88 100644 --- a/src/storage/hummock_test/src/failpoint_tests.rs +++ b/src/storage/hummock_test/src/failpoint_tests.rs @@ -30,7 +30,8 @@ use risingwave_storage::hummock::test_utils::{count_stream, default_opts_for_tes use risingwave_storage::hummock::{CachePolicy, HummockStorage}; use risingwave_storage::storage_value::StorageValue; use risingwave_storage::store::{ - LocalStateStore, NewLocalOptions, PrefetchOptions, ReadOptions, StateStoreRead, WriteOptions, + LocalStateStore, NewLocalOptions, PrefetchOptions, ReadOptions, StateStoreRead, + TryWaitEpochOptions, WriteOptions, }; use risingwave_storage::StateStore; @@ -148,7 +149,10 @@ async fn test_failpoints_state_store_read_upload() { .unwrap(); meta_client.commit_epoch(1, res, false).await.unwrap(); hummock_storage - .try_wait_epoch(HummockReadEpoch::Committed(1)) + .try_wait_epoch( + HummockReadEpoch::Committed(1), + TryWaitEpochOptions::for_test(local.table_id()), + ) .await .unwrap(); // clear block cache @@ -225,7 +229,10 @@ async fn test_failpoints_state_store_read_upload() { .unwrap(); meta_client.commit_epoch(3, res, false).await.unwrap(); hummock_storage - .try_wait_epoch(HummockReadEpoch::Committed(3)) + .try_wait_epoch( + HummockReadEpoch::Committed(3), + TryWaitEpochOptions::for_test(local.table_id()), + ) .await .unwrap(); diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index 18bad67a62570..b938a7d6e036f 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -568,7 +568,7 @@ async fn test_state_store_sync() { .commit_epoch(epoch1, res, false) .await .unwrap(); - test_env.storage.try_wait_epoch_for_test(epoch1).await; + test_env.wait_sync_committed_version().await; { // after sync 1 epoch let read_version = hummock_storage.read_version(); @@ -614,7 +614,7 @@ async fn test_state_store_sync() { .commit_epoch(epoch2, res, false) .await .unwrap(); - test_env.storage.try_wait_epoch_for_test(epoch2).await; + test_env.wait_sync_committed_version().await; { // after sync all epoch let read_version = hummock_storage.read_version(); @@ -910,7 +910,7 @@ async fn test_delete_get() { .commit_epoch(epoch2, res, false) .await .unwrap(); - test_env.storage.try_wait_epoch_for_test(epoch2).await; + test_env.wait_sync_committed_version().await; assert!(test_env .storage .get( @@ -1096,7 +1096,7 @@ async fn test_multiple_epoch_sync() { .commit_epoch(epoch3, sync_result3, false) .await .unwrap(); - test_env.storage.try_wait_epoch_for_test(epoch3).await; + test_env.wait_sync_committed_version().await; test_get().await; } @@ -1270,7 +1270,7 @@ async fn test_iter_with_min_epoch() { .commit_epoch(epoch2, sync_result2, false) .await .unwrap(); - test_env.storage.try_wait_epoch_for_test(epoch2).await; + test_env.wait_sync_committed_version().await; { let iter = test_env @@ -1561,7 +1561,7 @@ async fn test_hummock_version_reader() { .commit_epoch(epoch1, sync_result1, false) .await .unwrap(); - test_env.storage.try_wait_epoch_for_test(epoch1).await; + test_env.wait_sync_committed_version().await; let sync_result2 = test_env .storage @@ -1573,7 +1573,7 @@ async fn test_hummock_version_reader() { .commit_epoch(epoch2, sync_result2, false) .await .unwrap(); - test_env.storage.try_wait_epoch_for_test(epoch2).await; + test_env.wait_sync_committed_version().await; let sync_result3 = test_env .storage @@ -1585,7 +1585,7 @@ async fn test_hummock_version_reader() { .commit_epoch(epoch3, sync_result3, false) .await .unwrap(); - test_env.storage.try_wait_epoch_for_test(epoch3).await; + test_env.wait_sync_committed_version().await; { let (_, read_snapshot) = read_filter_for_version( epoch1, @@ -1979,7 +1979,7 @@ async fn test_get_with_min_epoch() { .await .unwrap(); - test_env.storage.try_wait_epoch_for_test(epoch2).await; + test_env.wait_sync_committed_version().await; let k = gen_key(0); let prefix_hint = { let mut ret = Vec::with_capacity(TABLE_PREFIX_LEN + k.len()); @@ -2409,7 +2409,7 @@ async fn test_table_watermark() { }; test_env.commit_epoch(epoch1).await; - test_env.storage.try_wait_epoch_for_test(epoch1).await; + test_env.wait_sync_committed_version().await; let (local1, local2) = test_after_epoch2(local1, local2).await; @@ -2498,7 +2498,7 @@ async fn test_table_watermark() { let (local1, local2) = test_after_epoch2(local1, local2).await; test_env.commit_epoch(epoch2).await; - test_env.storage.try_wait_epoch_for_test(epoch2).await; + test_env.wait_sync_committed_version().await; test_global_read(test_env.storage.clone(), epoch2).await; @@ -2533,7 +2533,7 @@ async fn test_table_watermark() { let (local1, local2) = test_after_epoch2(local1, local2).await; test_env.commit_epoch(epoch3).await; - test_env.storage.try_wait_epoch_for_test(epoch3).await; + test_env.wait_sync_committed_version().await; check_version_table_watermark(test_env.storage.get_pinned_version()); diff --git a/src/storage/hummock_test/src/snapshot_tests.rs b/src/storage/hummock_test/src/snapshot_tests.rs index b15e8a3fa372c..503dd38c57d7e 100644 --- a/src/storage/hummock_test/src/snapshot_tests.rs +++ b/src/storage/hummock_test/src/snapshot_tests.rs @@ -27,7 +27,7 @@ use risingwave_storage::hummock::{CachePolicy, HummockStorage}; use risingwave_storage::storage_value::StorageValue; use risingwave_storage::store::{ LocalStateStore, NewLocalOptions, PrefetchOptions, ReadOptions, SealCurrentEpochOptions, - StateStoreRead, WriteOptions, + StateStoreRead, TryWaitEpochOptions, WriteOptions, }; use risingwave_storage::StateStore; @@ -149,7 +149,10 @@ async fn test_snapshot_inner( .await .unwrap(); hummock_storage - .try_wait_epoch(HummockReadEpoch::Committed(epoch1)) + .try_wait_epoch( + HummockReadEpoch::Committed(epoch1), + TryWaitEpochOptions::for_test(Default::default()), + ) .await .unwrap(); } @@ -193,7 +196,10 @@ async fn test_snapshot_inner( .await .unwrap(); hummock_storage - .try_wait_epoch(HummockReadEpoch::Committed(epoch2)) + .try_wait_epoch( + HummockReadEpoch::Committed(epoch2), + TryWaitEpochOptions::for_test(Default::default()), + ) .await .unwrap(); } @@ -236,7 +242,10 @@ async fn test_snapshot_inner( .await .unwrap(); hummock_storage - .try_wait_epoch(HummockReadEpoch::Committed(epoch3)) + .try_wait_epoch( + HummockReadEpoch::Committed(epoch3), + TryWaitEpochOptions::for_test(Default::default()), + ) .await .unwrap(); } @@ -298,7 +307,10 @@ async fn test_snapshot_range_scan_inner( .await .unwrap(); hummock_storage - .try_wait_epoch(HummockReadEpoch::Committed(epoch)) + .try_wait_epoch( + HummockReadEpoch::Committed(epoch), + TryWaitEpochOptions::for_test(local.table_id()), + ) .await .unwrap(); } diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index ab1e84aca2a66..828347f401236 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -385,7 +385,10 @@ async fn test_basic_v2() { .await .unwrap(); hummock_storage - .try_wait_epoch(HummockReadEpoch::Committed(epoch1)) + .try_wait_epoch( + HummockReadEpoch::Committed(epoch1), + TryWaitEpochOptions::for_test(local.table_id()), + ) .await .unwrap(); let value = hummock_storage @@ -1102,7 +1105,10 @@ async fn test_delete_get_v2() { .await .unwrap(); hummock_storage - .try_wait_epoch(HummockReadEpoch::Committed(epoch2)) + .try_wait_epoch( + HummockReadEpoch::Committed(epoch2), + TryWaitEpochOptions::for_test(local.table_id()), + ) .await .unwrap(); assert!(hummock_storage @@ -1263,7 +1269,10 @@ async fn test_multiple_epoch_sync_v2() { .await .unwrap(); hummock_storage - .try_wait_epoch(HummockReadEpoch::Committed(epoch3)) + .try_wait_epoch( + HummockReadEpoch::Committed(epoch3), + TryWaitEpochOptions::for_test(local.table_id()), + ) .await .unwrap(); test_get().await; @@ -1365,7 +1374,10 @@ async fn test_gc_watermark_and_clear_shared_buffer() { .await .unwrap(); hummock_storage - .try_wait_epoch(HummockReadEpoch::Committed(epoch1)) + .try_wait_epoch( + HummockReadEpoch::Committed(epoch1), + TryWaitEpochOptions::for_test(local_hummock_storage.table_id()), + ) .await .unwrap(); @@ -1639,8 +1651,12 @@ async fn test_iter_log() { } hummock_storage - .try_wait_epoch_for_test(test_log_data.last().unwrap().0) - .await; + .try_wait_epoch( + HummockReadEpoch::Committed(test_log_data.last().unwrap().0), + TryWaitEpochOptions { table_id }, + ) + .await + .unwrap(); let verify_state_store = VerifyStateStore { actual: hummock_storage, diff --git a/src/storage/hummock_test/src/test_utils.rs b/src/storage/hummock_test/src/test_utils.rs index da861ff92810c..5a613c1d54eff 100644 --- a/src/storage/hummock_test/src/test_utils.rs +++ b/src/storage/hummock_test/src/test_utils.rs @@ -262,7 +262,12 @@ impl HummockTestEnv { .await .unwrap(); - self.storage.try_wait_epoch_for_test(epoch).await; + self.wait_sync_committed_version().await; + } + + pub async fn wait_sync_committed_version(&self) { + let version = self.manager.get_current_version().await; + self.storage.wait_version(version).await; } } diff --git a/src/storage/hummock_trace/src/collector.rs b/src/storage/hummock_trace/src/collector.rs index 979bf067db861..068cbdcee45ed 100644 --- a/src/storage/hummock_trace/src/collector.rs +++ b/src/storage/hummock_trace/src/collector.rs @@ -37,7 +37,7 @@ use crate::write::{TraceWriter, TraceWriterImpl}; use crate::{ ConcurrentIdGenerator, Operation, OperationResult, Record, RecordId, RecordIdGenerator, TracedInitOptions, TracedNewLocalOptions, TracedReadOptions, TracedSealCurrentEpochOptions, - TracedSubResp, UniqueIdGenerator, + TracedSubResp, TracedTryWaitEpochOptions, UniqueIdGenerator, }; // Global collector instance used for trace collection @@ -216,8 +216,14 @@ impl TraceSpan { Self::new_global_op(Operation::SealCurrentEpoch { epoch, opts }, storage_type) } - pub fn new_try_wait_epoch_span(epoch: HummockReadEpoch) -> MayTraceSpan { - Self::new_global_op(Operation::TryWaitEpoch(epoch.into()), StorageType::Global) + pub fn new_try_wait_epoch_span( + epoch: HummockReadEpoch, + options: TracedTryWaitEpochOptions, + ) -> MayTraceSpan { + Self::new_global_op( + Operation::TryWaitEpoch(epoch.into(), options), + StorageType::Global, + ) } pub fn new_get_span( diff --git a/src/storage/hummock_trace/src/opts.rs b/src/storage/hummock_trace/src/opts.rs index 562e989051395..c3c981a612064 100644 --- a/src/storage/hummock_trace/src/opts.rs +++ b/src/storage/hummock_trace/src/opts.rs @@ -170,6 +170,11 @@ pub struct TracedNewLocalOptions { pub vnodes: TracedBitmap, } +#[derive(Encode, Decode, PartialEq, Debug, Clone)] +pub struct TracedTryWaitEpochOptions { + pub table_id: TracedTableId, +} + #[cfg(test)] impl TracedNewLocalOptions { pub(crate) fn for_test(table_id: u32) -> Self { diff --git a/src/storage/hummock_trace/src/record.rs b/src/storage/hummock_trace/src/record.rs index 2995b7f13c8a7..a9ae562f02b4c 100644 --- a/src/storage/hummock_trace/src/record.rs +++ b/src/storage/hummock_trace/src/record.rs @@ -23,7 +23,7 @@ use risingwave_pb::meta::SubscribeResponse; use crate::{ LocalStorageId, StorageType, TracedHummockReadEpoch, TracedInitOptions, TracedNewLocalOptions, - TracedReadOptions, TracedSealCurrentEpochOptions, + TracedReadOptions, TracedSealCurrentEpochOptions, TracedTryWaitEpochOptions, }; pub type RecordId = u64; @@ -164,7 +164,7 @@ pub enum Operation { LocalStorageInit(TracedInitOptions), /// Try wait epoch - TryWaitEpoch(TracedHummockReadEpoch), + TryWaitEpoch(TracedHummockReadEpoch, TracedTryWaitEpochOptions), /// Seal current epoch SealCurrentEpoch { diff --git a/src/storage/hummock_trace/src/replay/mod.rs b/src/storage/hummock_trace/src/replay/mod.rs index 91e80cad1c4b6..347ef30704570 100644 --- a/src/storage/hummock_trace/src/replay/mod.rs +++ b/src/storage/hummock_trace/src/replay/mod.rs @@ -33,7 +33,7 @@ use crate::error::Result; use crate::TraceError; use crate::{ LocalStorageId, Record, TracedBytes, TracedInitOptions, TracedNewLocalOptions, - TracedReadOptions, TracedSealCurrentEpochOptions, + TracedReadOptions, TracedSealCurrentEpochOptions, TracedTryWaitEpochOptions, }; pub type ReplayItem = (TracedBytes, TracedBytes); @@ -118,7 +118,11 @@ pub trait ReplayStateStore { async fn sync(&self, id: u64, table_ids: Vec) -> Result; async fn notify_hummock(&self, info: Info, op: RespOperation, version: u64) -> Result; async fn new_local(&self, opts: TracedNewLocalOptions) -> Box; - async fn try_wait_epoch(&self, epoch: HummockReadEpoch) -> Result<()>; + async fn try_wait_epoch( + &self, + epoch: HummockReadEpoch, + options: TracedTryWaitEpochOptions, + ) -> Result<()>; } // define mock trait for replay interfaces @@ -147,7 +151,7 @@ mock! { async fn notify_hummock(&self, info: Info, op: RespOperation, version: u64, ) -> Result; async fn new_local(&self, opts: TracedNewLocalOptions) -> Box; - async fn try_wait_epoch(&self, epoch: HummockReadEpoch) -> Result<()>; + async fn try_wait_epoch(&self, epoch: HummockReadEpoch,options: TracedTryWaitEpochOptions) -> Result<()>; } impl GlobalReplay for GlobalReplayInterface{} } diff --git a/src/storage/hummock_trace/src/replay/worker.rs b/src/storage/hummock_trace/src/replay/worker.rs index 8a02c3efde196..08d877cadf3ab 100644 --- a/src/storage/hummock_trace/src/replay/worker.rs +++ b/src/storage/hummock_trace/src/replay/worker.rs @@ -310,11 +310,11 @@ impl ReplayWorker { let local_storage = local_storages.get_mut(&storage_type).unwrap(); local_storage.init(options).await.unwrap(); } - Operation::TryWaitEpoch(epoch) => { + Operation::TryWaitEpoch(epoch, options) => { assert_eq!(storage_type, StorageType::Global); let res = res_rx.recv().await.expect("recv result failed"); if let OperationResult::TryWaitEpoch(expected) = res { - let actual = replay.try_wait_epoch(epoch.into()).await; + let actual = replay.try_wait_epoch(epoch.into(), options).await; assert_eq!(TraceResult::from(actual), expected, "try_wait_epoch wrong"); } else { panic!( diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index 1c8abc78ddffc..82cdc14838946 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -197,7 +197,7 @@ pub struct HummockEventHandler { /// A copy of `read_version_mapping` but owned by event handler local_read_version_mapping: HashMap, - version_update_notifier_tx: Arc>, + version_update_notifier_tx: Arc>, recent_versions: Arc>, write_conflict_detector: Option>, @@ -316,8 +316,7 @@ impl HummockEventHandler { ) -> Self { let (hummock_event_tx, hummock_event_rx) = event_channel(state_store_metrics.event_handler_pending_event.clone()); - let (version_update_notifier_tx, _) = - tokio::sync::watch::channel(pinned_version.visible_table_committed_epoch()); + let (version_update_notifier_tx, _) = tokio::sync::watch::channel(pinned_version.clone()); let version_update_notifier_tx = Arc::new(version_update_notifier_tx); let read_version_mapping = Arc::new(RwLock::new(HashMap::default())); let buffer_tracker = BufferTracker::from_storage_opts( @@ -371,7 +370,7 @@ impl HummockEventHandler { } } - pub fn version_update_notifier_tx(&self) -> Arc> { + pub fn version_update_notifier_tx(&self) -> Arc> { self.version_update_notifier_tx.clone() } @@ -648,18 +647,16 @@ impl HummockEventHandler { ); } - let prev_max_committed_epoch = pinned_version.visible_table_committed_epoch(); let max_committed_epoch = new_pinned_version.visible_table_committed_epoch(); // only notify local_version_manager when MCE change self.version_update_notifier_tx.send_if_modified(|state| { - assert_eq!(prev_max_committed_epoch, *state); - if max_committed_epoch > *state { - *state = max_committed_epoch; - true - } else { - false + if state.id() == new_pinned_version.id() { + return false; } + assert!(pinned_version.id() > state.id()); + *state = new_pinned_version.clone(); + true }); if let Some(conflict_detector) = self.write_conflict_detector.as_ref() { diff --git a/src/storage/src/hummock/event_handler/uploader/mod.rs b/src/storage/src/hummock/event_handler/uploader/mod.rs index 90e6a9306930a..3557136c0b96c 100644 --- a/src/storage/src/hummock/event_handler/uploader/mod.rs +++ b/src/storage/src/hummock/event_handler/uploader/mod.rs @@ -1195,10 +1195,7 @@ impl HummockUploader { self.context .pinned_version .version() - .state_table_info - .info() - .get(table_id) - .map(|info| info.committed_epoch), + .table_committed_epoch(*table_id), ) }); table_data.new_epoch(epoch); diff --git a/src/storage/src/hummock/local_version/recent_versions.rs b/src/storage/src/hummock/local_version/recent_versions.rs index 8d3f1a015ad6a..c88a15ecdb526 100644 --- a/src/storage/src/hummock/local_version/recent_versions.rs +++ b/src/storage/src/hummock/local_version/recent_versions.rs @@ -140,12 +140,7 @@ impl RecentVersions { ); } let result = self.recent_versions.binary_search_by(|version| { - let committed_epoch = version - .version() - .state_table_info - .info() - .get(&table_id) - .map(|info| info.committed_epoch); + let committed_epoch = version.version().table_committed_epoch(table_id); if let Some(committed_epoch) = committed_epoch { committed_epoch.cmp(&epoch) } else { diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 888de0db1af1c..ae018df24ec8a 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -96,7 +96,7 @@ pub struct HummockStorage { buffer_tracker: BufferTracker, - version_update_notifier_tx: Arc>, + version_update_notifier_tx: Arc>, recent_versions: Arc>, @@ -626,7 +626,11 @@ impl StateStore for HummockStorage { /// Waits until the local hummock version contains the epoch. If `wait_epoch` is `Current`, /// we will only check whether it is le `sealed_epoch` and won't wait. - async fn try_wait_epoch(&self, wait_epoch: HummockReadEpoch) -> StorageResult<()> { + async fn try_wait_epoch( + &self, + wait_epoch: HummockReadEpoch, + options: TryWaitEpochOptions, + ) -> StorageResult<()> { let wait_epoch = match wait_epoch { HummockReadEpoch::Committed(epoch) => { assert!(!is_max_epoch(epoch), "epoch should not be MAX EPOCH"); @@ -634,7 +638,7 @@ impl StateStore for HummockStorage { } _ => return Ok(()), }; - wait_for_epoch(&self.version_update_notifier_tx, wait_epoch).await + wait_for_epoch(&self.version_update_notifier_tx, wait_epoch, options).await } fn sync(&self, epoch: u64, table_ids: HashSet) -> impl SyncFuture { @@ -698,13 +702,6 @@ impl HummockStorage { self.buffer_tracker.get_buffer_size() } - pub async fn try_wait_epoch_for_test(&self, wait_epoch: u64) { - let mut rx = self.version_update_notifier_tx.subscribe(); - while *(rx.borrow_and_update()) < wait_epoch { - rx.changed().await.unwrap(); - } - } - /// Creates a [`HummockStorage`] with default stats. Should only be used by tests. pub async fn for_test( options: Arc, diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index 2221b4350ebc9..ae3815ca551a5 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -25,7 +25,7 @@ use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::MAX_SPILL_TIMES; use risingwave_hummock_sdk::key::{is_empty_key_range, vnode_range, TableKey, TableKeyRange}; use risingwave_hummock_sdk::sstable_info::SstableInfo; -use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch}; +use risingwave_hummock_sdk::EpochWithGap; use tracing::{warn, Instrument}; use super::version::{StagingData, VersionUpdate}; @@ -37,6 +37,7 @@ use crate::hummock::iterator::{ Backward, BackwardUserIterator, ConcatIteratorInner, Forward, HummockIteratorUnion, IteratorFactory, MergeIterator, UserIterator, }; +use crate::hummock::local_version::pinned_version::PinnedVersion; use crate::hummock::shared_buffer::shared_buffer_batch::{ SharedBufferBatch, SharedBufferBatchIterator, SharedBufferBatchOldValues, SharedBufferItem, SharedBufferValue, @@ -96,7 +97,7 @@ pub struct LocalHummockStorage { write_limiter: WriteLimiterRef, - version_update_notifier_tx: Arc>, + version_update_notifier_tx: Arc>, mem_table_spill_threshold: usize, } @@ -134,8 +135,15 @@ impl LocalHummockStorage { .await } - pub async fn wait_for_epoch(&self, wait_epoch: u64) -> StorageResult<()> { - wait_for_epoch(&self.version_update_notifier_tx, wait_epoch).await + async fn wait_for_epoch(&self, wait_epoch: u64) -> StorageResult<()> { + wait_for_epoch( + &self.version_update_notifier_tx, + wait_epoch, + TryWaitEpochOptions { + table_id: self.table_id, + }, + ) + .await } pub async fn iter_flushed( @@ -658,7 +666,7 @@ impl LocalHummockStorage { memory_limiter: Arc, write_limiter: WriteLimiterRef, option: NewLocalOptions, - version_update_notifier_tx: Arc>, + version_update_notifier_tx: Arc>, mem_table_spill_threshold: usize, ) -> Self { let stats = hummock_version_reader.stats().clone(); diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 7d0dc49847398..23631ba2d81ea 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -428,10 +428,7 @@ impl HummockReadVersion { vnode_watermarks, self.committed .version() - .state_table_info - .info() - .get(&self.table_id) - .map(|info| info.committed_epoch), + .table_committed_epoch(self.table_id), )); } } diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index c2f6cbafed79b..b8761da23143e 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -19,26 +19,28 @@ use std::ops::Bound::{Excluded, Included, Unbounded}; use std::ops::{Bound, RangeBounds}; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering}; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use bytes::Bytes; use foyer::CacheContext; use parking_lot::Mutex; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::config::StorageMemoryConfig; +use risingwave_hummock_sdk::can_concat; use risingwave_hummock_sdk::key::{ bound_table_key_range, EmptySliceRef, FullKey, TableKey, UserKey, }; use risingwave_hummock_sdk::sstable_info::SstableInfo; -use risingwave_hummock_sdk::{can_concat, HummockEpoch}; use tokio::sync::oneshot::{channel, Receiver, Sender}; +use tracing::warn; use super::{HummockError, SstableStoreRef}; use crate::error::StorageResult; +use crate::hummock::local_version::pinned_version::PinnedVersion; use crate::hummock::CachePolicy; use crate::mem_table::{KeyOp, MemTableError}; use crate::monitor::MemoryCollector; -use crate::store::{OpConsistencyLevel, ReadOptions, StateStoreRead}; +use crate::store::{OpConsistencyLevel, ReadOptions, StateStoreRead, TryWaitEpochOptions}; pub fn range_overlap( search_key_range: &R, @@ -575,15 +577,29 @@ pub(crate) fn filter_with_delete_range<'a>( } pub(crate) async fn wait_for_epoch( - notifier: &tokio::sync::watch::Sender, + notifier: &tokio::sync::watch::Sender, wait_epoch: u64, + options: TryWaitEpochOptions, ) -> StorageResult<()> { let mut receiver = notifier.subscribe(); - // avoid unnecessary check in the loop if the value does not change - let max_committed_epoch = *receiver.borrow_and_update(); - if max_committed_epoch >= wait_epoch { - return Ok(()); + { + // avoid unnecessary check in the loop if the value does not change + let committed_epoch = receiver + .borrow_and_update() + .version() + .table_committed_epoch(options.table_id); + if let Some(committed_epoch) = committed_epoch { + if committed_epoch >= wait_epoch { + return Ok(()); + } + } else { + warn!( + table_id = options.table_id.table_id, + "table id not exist yet. wait for table creation" + ); + } } + let start_time = Instant::now(); loop { match tokio::time::timeout(Duration::from_secs(30), receiver.changed()).await { Err(_) => { @@ -598,6 +614,7 @@ pub(crate) async fn wait_for_epoch( // See #3845 for more details. tracing::warn!( epoch = wait_epoch, + elapsed = ?start_time.elapsed(), "wait_epoch timeout when waiting for version update", ); continue; @@ -606,9 +623,21 @@ pub(crate) async fn wait_for_epoch( return Err(HummockError::wait_epoch("tx dropped").into()); } Ok(Ok(_)) => { - let max_committed_epoch = *receiver.borrow(); - if max_committed_epoch >= wait_epoch { - return Ok(()); + // TODO: should handle the corner case of drop table + let committed_epoch = receiver + .borrow() + .version() + .table_committed_epoch(options.table_id); + if let Some(committed_epoch) = committed_epoch { + if committed_epoch >= wait_epoch { + return Ok(()); + } + } else { + warn!( + table_id = options.table_id.table_id, + elapsed = ?start_time.elapsed(), + "table id not exist yet. wait for table creation" + ); } } } diff --git a/src/storage/src/memory.rs b/src/storage/src/memory.rs index 7416f54688a5f..9558811a2bdb0 100644 --- a/src/storage/src/memory.rs +++ b/src/storage/src/memory.rs @@ -735,7 +735,11 @@ impl StateStore for RangeKvStateStore { type Local = MemtableLocalStateStore; #[allow(clippy::unused_async)] - async fn try_wait_epoch(&self, _epoch: HummockReadEpoch) -> StorageResult<()> { + async fn try_wait_epoch( + &self, + _epoch: HummockReadEpoch, + _options: TryWaitEpochOptions, + ) -> StorageResult<()> { // memory backend doesn't need to wait for epoch, so this is a no-op. Ok(()) } diff --git a/src/storage/src/monitor/monitored_store.rs b/src/storage/src/monitor/monitored_store.rs index ec2785c354229..a7be71307ffec 100644 --- a/src/storage/src/monitor/monitored_store.rs +++ b/src/storage/src/monitor/monitored_store.rs @@ -298,9 +298,10 @@ impl StateStore for MonitoredStateStore { fn try_wait_epoch( &self, epoch: HummockReadEpoch, + options: TryWaitEpochOptions, ) -> impl Future> + Send + '_ { self.inner - .try_wait_epoch(epoch) + .try_wait_epoch(epoch, options) .verbose_instrument_await("store_wait_epoch") .inspect_err(|e| error!(error = %e.as_report(), "Failed in wait_epoch")) } diff --git a/src/storage/src/monitor/traced_store.rs b/src/storage/src/monitor/traced_store.rs index dac8712a924d7..8bd8013ba3810 100644 --- a/src/storage/src/monitor/traced_store.rs +++ b/src/storage/src/monitor/traced_store.rs @@ -242,10 +242,14 @@ impl LocalStateStore for TracedStateStore { impl StateStore for TracedStateStore { type Local = TracedStateStore; - async fn try_wait_epoch(&self, epoch: HummockReadEpoch) -> StorageResult<()> { - let span = TraceSpan::new_try_wait_epoch_span(epoch); + async fn try_wait_epoch( + &self, + epoch: HummockReadEpoch, + options: TryWaitEpochOptions, + ) -> StorageResult<()> { + let span = TraceSpan::new_try_wait_epoch_span(epoch, options.clone().into()); - let res = self.inner.try_wait_epoch(epoch).await; + let res = self.inner.try_wait_epoch(epoch, options).await; span.may_send_result(OperationResult::TryWaitEpoch( res.as_ref().map(|o| *o).into(), )); diff --git a/src/storage/src/panic_store.rs b/src/storage/src/panic_store.rs index a7fb7c3643ed0..03b0471f90446 100644 --- a/src/storage/src/panic_store.rs +++ b/src/storage/src/panic_store.rs @@ -173,7 +173,11 @@ impl StateStore for PanicStateStore { type Local = Self; #[allow(clippy::unused_async)] - async fn try_wait_epoch(&self, _epoch: HummockReadEpoch) -> StorageResult<()> { + async fn try_wait_epoch( + &self, + _epoch: HummockReadEpoch, + _options: TryWaitEpochOptions, + ) -> StorageResult<()> { panic!("should not wait epoch from the panic state store!"); } diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index db21faa78c6cf..e958a93392dc9 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -35,7 +35,8 @@ use risingwave_hummock_sdk::table_watermark::{VnodeWatermark, WatermarkDirection use risingwave_hummock_sdk::{HummockReadEpoch, SyncResult}; use risingwave_hummock_trace::{ TracedInitOptions, TracedNewLocalOptions, TracedOpConsistencyLevel, TracedPrefetchOptions, - TracedReadOptions, TracedSealCurrentEpochOptions, TracedWriteOptions, + TracedReadOptions, TracedSealCurrentEpochOptions, TracedTryWaitEpochOptions, + TracedWriteOptions, }; use risingwave_pb::hummock::PbVnodeWatermark; @@ -344,6 +345,34 @@ pub trait StateStoreWrite: StaticSendSync { pub trait SyncFuture = Future> + Send + 'static; +#[derive(Clone)] +pub struct TryWaitEpochOptions { + pub table_id: TableId, +} + +impl TryWaitEpochOptions { + #[cfg(any(test, feature = "test"))] + pub fn for_test(table_id: TableId) -> Self { + Self { table_id } + } +} + +impl From for TryWaitEpochOptions { + fn from(value: TracedTryWaitEpochOptions) -> Self { + Self { + table_id: value.table_id.into(), + } + } +} + +impl From for TracedTryWaitEpochOptions { + fn from(value: TryWaitEpochOptions) -> Self { + Self { + table_id: value.table_id.into(), + } + } +} + pub trait StateStore: StateStoreRead + StaticSendSync + Clone { type Local: LocalStateStore; @@ -352,6 +381,7 @@ pub trait StateStore: StateStoreRead + StaticSendSync + Clone { fn try_wait_epoch( &self, epoch: HummockReadEpoch, + options: TryWaitEpochOptions, ) -> impl Future> + Send + '_; fn sync(&self, epoch: u64, table_ids: HashSet) -> impl SyncFuture; diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index d9261d67e24bc..cfb0bfb3c8085 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -570,8 +570,9 @@ pub mod verify { fn try_wait_epoch( &self, epoch: HummockReadEpoch, + options: TryWaitEpochOptions, ) -> impl Future> + Send + '_ { - self.actual.try_wait_epoch(epoch) + self.actual.try_wait_epoch(epoch, options) } fn sync(&self, epoch: u64, table_ids: HashSet) -> impl SyncFuture { @@ -1154,7 +1155,11 @@ pub mod boxed_state_store { #[async_trait::async_trait] pub trait DynamicDispatchedStateStoreExt: StaticSendSync { - async fn try_wait_epoch(&self, epoch: HummockReadEpoch) -> StorageResult<()>; + async fn try_wait_epoch( + &self, + epoch: HummockReadEpoch, + options: TryWaitEpochOptions, + ) -> StorageResult<()>; fn sync( &self, @@ -1167,8 +1172,12 @@ pub mod boxed_state_store { #[async_trait::async_trait] impl DynamicDispatchedStateStoreExt for S { - async fn try_wait_epoch(&self, epoch: HummockReadEpoch) -> StorageResult<()> { - self.try_wait_epoch(epoch).await + async fn try_wait_epoch( + &self, + epoch: HummockReadEpoch, + options: TryWaitEpochOptions, + ) -> StorageResult<()> { + self.try_wait_epoch(epoch, options).await } fn sync( @@ -1253,8 +1262,9 @@ pub mod boxed_state_store { fn try_wait_epoch( &self, epoch: HummockReadEpoch, + options: TryWaitEpochOptions, ) -> impl Future> + Send + '_ { - self.deref().try_wait_epoch(epoch) + self.deref().try_wait_epoch(epoch, options) } fn sync( diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index 254e8e73095b1..d368542db4a06 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -49,6 +49,7 @@ use crate::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew}; use crate::row_serde::{find_columns_by_ids, ColumnMapping}; use crate::store::{ PrefetchOptions, ReadLogOptions, ReadOptions, StateStoreIter, StateStoreIterExt, + TryWaitEpochOptions, }; use crate::table::merge_sort::merge_sort; use crate::table::{ChangeLogRow, KeyedRow, TableDistribution, TableIter}; @@ -365,7 +366,14 @@ impl StorageTableInner { wait_epoch, HummockReadEpoch::TimeTravel(_) | HummockReadEpoch::Committed(_) ); - self.store.try_wait_epoch(wait_epoch).await?; + self.store + .try_wait_epoch( + wait_epoch, + TryWaitEpochOptions { + table_id: self.table_id, + }, + ) + .await?; let serialized_pk = serialize_pk_with_vnode( &pk, &self.pk_serializer, @@ -872,7 +880,14 @@ impl StorageTableInnerIterInner { epoch: HummockReadEpoch, ) -> StorageResult { let raw_epoch = epoch.get_epoch(); - store.try_wait_epoch(epoch).await?; + store + .try_wait_epoch( + epoch, + TryWaitEpochOptions { + table_id: read_options.table_id, + }, + ) + .await?; let iter = store.iter(table_key_range, raw_epoch, read_options).await?; let iter = Self { iter, @@ -975,7 +990,12 @@ impl StorageTableInnerIterLogInner { end_epoch: u64, ) -> StorageResult { store - .try_wait_epoch(HummockReadEpoch::Committed(end_epoch)) + .try_wait_epoch( + HummockReadEpoch::Committed(end_epoch), + TryWaitEpochOptions { + table_id: read_options.table_id, + }, + ) .await?; let iter = store .iter_log((start_epoch, end_epoch), table_key_range, read_options) diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index 440c7188d2fa1..3da42d67f2a0b 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -455,6 +455,7 @@ mod tests { use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_hummock_test::test_utils::prepare_hummock_test_env; use risingwave_storage::hummock::HummockStorage; + use risingwave_storage::store::TryWaitEpochOptions; use risingwave_storage::StateStore; use crate::common::log_store_impl::kv_log_store::reader::KvLogStoreReader; @@ -685,7 +686,10 @@ mod tests { .unwrap(); test_env .storage - .try_wait_epoch(HummockReadEpoch::Committed(epoch2)) + .try_wait_epoch( + HummockReadEpoch::Committed(epoch2), + TryWaitEpochOptions::for_test(table.id.into()), + ) .await .unwrap(); @@ -901,7 +905,10 @@ mod tests { test_env.commit_epoch(epoch2).await; test_env .storage - .try_wait_epoch(HummockReadEpoch::Committed(epoch2)) + .try_wait_epoch( + HummockReadEpoch::Committed(epoch2), + TryWaitEpochOptions::for_test(table.id.into()), + ) .await .unwrap(); @@ -1137,7 +1144,10 @@ mod tests { test_env.commit_epoch(epoch2).await; test_env .storage - .try_wait_epoch(HummockReadEpoch::Committed(epoch2)) + .try_wait_epoch( + HummockReadEpoch::Committed(epoch2), + TryWaitEpochOptions::for_test(table.id.into()), + ) .await .unwrap(); @@ -1401,7 +1411,10 @@ mod tests { test_env .storage - .try_wait_epoch(HummockReadEpoch::Committed(epoch3)) + .try_wait_epoch( + HummockReadEpoch::Committed(epoch3), + TryWaitEpochOptions::for_test(table.id.into()), + ) .await .unwrap(); diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index 3f2cd83aca286..5efaaf5c3764d 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -32,6 +32,7 @@ use risingwave_connector::source::{ SplitMetaData, }; use risingwave_hummock_sdk::HummockReadEpoch; +use risingwave_storage::store::TryWaitEpochOptions; use serde::{Deserialize, Serialize}; use thiserror_ext::AsReport; @@ -417,6 +418,7 @@ impl SourceBackfillExecutorInner { } let state_store = self.backfill_state_store.state_store.state_store().clone(); + let table_id = self.backfill_state_store.state_store.table_id().into(); static STATE_TABLE_INITIALIZED: Once = Once::new(); tokio::spawn(async move { // This is for self.backfill_finished() to be safe. @@ -424,7 +426,10 @@ impl SourceBackfillExecutorInner { let epoch = barrier.epoch.curr; tracing::info!("waiting for epoch: {}", epoch); state_store - .try_wait_epoch(HummockReadEpoch::Committed(epoch)) + .try_wait_epoch( + HummockReadEpoch::Committed(epoch), + TryWaitEpochOptions { table_id }, + ) .await .expect("failed to wait epoch"); STATE_TABLE_INITIALIZED.call_once(|| ()); diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index dd93ac85d1f1c..d4a02ce462441 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -20,6 +20,7 @@ use either::Either; use futures::TryStreamExt; use itertools::Itertools; use risingwave_common::array::ArrayRef; +use risingwave_common::catalog::TableId; use risingwave_common::metrics::{LabelGuardedIntCounter, GLOBAL_ERROR_METRICS}; use risingwave_common::system_param::local_manager::SystemParamsReaderRef; use risingwave_common::system_param::reader::SystemParamsRead; @@ -32,6 +33,7 @@ use risingwave_connector::source::{ SplitMetaData, WaitCheckpointTask, }; use risingwave_hummock_sdk::HummockReadEpoch; +use risingwave_storage::store::TryWaitEpochOptions; use thiserror_ext::AsReport; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::{mpsc, oneshot}; @@ -104,6 +106,7 @@ impl SourceExecutor { let wait_checkpoint_worker = WaitCheckpointWorker { wait_checkpoint_rx, state_store: core.split_state_store.state_table.state_store().clone(), + table_id: core.split_state_store.state_table.table_id().into(), }; tokio::spawn(wait_checkpoint_worker.run()); Ok(Some(WaitCheckpointTaskBuilder { @@ -820,6 +823,7 @@ impl WaitCheckpointTaskBuilder { struct WaitCheckpointWorker { wait_checkpoint_rx: UnboundedReceiver<(Epoch, WaitCheckpointTask)>, state_store: S, + table_id: TableId, } impl WaitCheckpointWorker { @@ -832,7 +836,12 @@ impl WaitCheckpointWorker { tracing::debug!("start to wait epoch {}", epoch.0); let ret = self .state_store - .try_wait_epoch(HummockReadEpoch::Committed(epoch.0)) + .try_wait_epoch( + HummockReadEpoch::Committed(epoch.0), + TryWaitEpochOptions { + table_id: self.table_id, + }, + ) .await; match ret {