diff --git a/proto/common.proto b/proto/common.proto index 05d938cc26523..1030d07d7c343 100644 --- a/proto/common.proto +++ b/proto/common.proto @@ -105,9 +105,14 @@ message WorkerSlotMapping { repeated uint64 data = 2; } +message BatchQueryCommittedEpoch { + uint64 epoch = 1; + uint64 hummock_version_id = 2; +} + message BatchQueryEpoch { oneof epoch { - uint64 committed = 1; + BatchQueryCommittedEpoch committed = 1; uint64 current = 2; uint64 backup = 3; uint64 time_travel = 4; diff --git a/proto/stream_service.proto b/proto/stream_service.proto index c13ee8875b43f..ab56c9f7e4050 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -49,6 +49,7 @@ message BarrierCompleteResponse { message WaitEpochCommitRequest { uint64 epoch = 1; + uint32 table_id = 2; } message WaitEpochCommitResponse { diff --git a/src/batch/src/executor/log_row_seq_scan.rs b/src/batch/src/executor/log_row_seq_scan.rs index 79cc21b0f02ed..5b88736fb06f6 100644 --- a/src/batch/src/executor/log_row_seq_scan.rs +++ b/src/batch/src/executor/log_row_seq_scan.rs @@ -25,6 +25,7 @@ use risingwave_common::catalog::{ColumnId, Field, Schema}; use risingwave_common::hash::VnodeCountCompat; use risingwave_common::row::{Row, RowExt}; use risingwave_common::types::ScalarImpl; +use risingwave_hummock_sdk::{HummockReadEpoch, HummockVersionId}; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch}; use risingwave_pb::plan_common::StorageTableDesc; @@ -50,6 +51,7 @@ pub struct LogRowSeqScanExecutor { table: StorageTable, old_epoch: u64, new_epoch: u64, + version_id: HummockVersionId, } impl LogRowSeqScanExecutor { @@ -57,6 +59,7 @@ impl LogRowSeqScanExecutor { table: StorageTable, old_epoch: u64, new_epoch: u64, + version_id: HummockVersionId, chunk_size: usize, identity: String, metrics: Option, @@ -74,6 +77,7 @@ impl LogRowSeqScanExecutor { table, old_epoch, new_epoch, + version_id, } } } @@ -127,12 +131,18 @@ impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder { unreachable!("invalid new epoch: {:?}", log_store_seq_scan_node.new_epoch) }; + assert_eq!(old_epoch.hummock_version_id, new_epoch.hummock_version_id); + let version_id = old_epoch.hummock_version_id; + let old_epoch = old_epoch.epoch; + let new_epoch = new_epoch.epoch; + dispatch_state_store!(source.context().state_store(), state_store, { let table = StorageTable::new_partial(state_store, column_ids, vnodes, table_desc); Ok(Box::new(LogRowSeqScanExecutor::new( table, - *old_epoch, - *new_epoch, + old_epoch, + new_epoch, + HummockVersionId::new(version_id), chunk_size as usize, source.plan_node().get_identity().clone(), metrics, @@ -163,6 +173,7 @@ impl LogRowSeqScanExecutor { table, old_epoch, new_epoch, + version_id, schema, .. } = *self; @@ -179,6 +190,7 @@ impl LogRowSeqScanExecutor { table.clone(), old_epoch, new_epoch, + version_id, chunk_size, histogram, Arc::new(schema.clone()), @@ -195,13 +207,17 @@ impl LogRowSeqScanExecutor { table: Arc>, old_epoch: u64, new_epoch: u64, + version_id: HummockVersionId, chunk_size: usize, histogram: Option>, schema: Arc, ) { // Range Scan. let iter = table - .batch_iter_log_with_pk_bounds(old_epoch, new_epoch) + .batch_iter_log_with_pk_bounds( + old_epoch, + HummockReadEpoch::BatchQueryCommitted(new_epoch, version_id), + ) .await? .flat_map(|r| { futures::stream::iter(std::iter::from_coroutine( diff --git a/src/batch/src/executor/mod.rs b/src/batch/src/executor/mod.rs index 07be18ca72988..53dabccaf260f 100644 --- a/src/batch/src/executor/mod.rs +++ b/src/batch/src/executor/mod.rs @@ -260,7 +260,7 @@ impl<'a, C: BatchTaskContext> ExecutorBuilder<'a, C> { #[cfg(test)] mod tests { - use risingwave_hummock_sdk::to_committed_batch_query_epoch; + use risingwave_hummock_sdk::test_batch_query_epoch; use risingwave_pb::batch_plan::PlanNode; use crate::executor::ExecutorBuilder; @@ -278,7 +278,7 @@ mod tests { &plan_node, task_id, ComputeNodeContext::for_test(), - to_committed_batch_query_epoch(u64::MAX), + test_batch_query_epoch(), ShutdownToken::empty(), ); let child_plan = &PlanNode::default(); diff --git a/src/batch/src/task/task_manager.rs b/src/batch/src/task/task_manager.rs index f25ce88379905..4db15df2dbe85 100644 --- a/src/batch/src/task/task_manager.rs +++ b/src/batch/src/task/task_manager.rs @@ -137,12 +137,12 @@ impl BatchManager { tid: &PbTaskId, plan: PlanFragment, ) -> Result<()> { - use risingwave_hummock_sdk::to_committed_batch_query_epoch; + use risingwave_hummock_sdk::test_batch_query_epoch; self.fire_task( tid, plan, - to_committed_batch_query_epoch(0), + test_batch_query_epoch(), ComputeNodeContext::for_test(), StateReporter::new_with_test(), TracingContext::none(), diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index 6253cfe74c730..e8d403a9693cc 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -17,6 +17,7 @@ use futures::{Stream, StreamExt, TryStreamExt}; use risingwave_pb::stream_service::stream_service_server::StreamService; use risingwave_pb::stream_service::*; use risingwave_storage::dispatch_state_store; +use risingwave_storage::store::TryWaitEpochOptions; use risingwave_stream::error::StreamError; use risingwave_stream::task::{LocalStreamManager, StreamEnvironment}; use tokio::sync::mpsc::unbounded_channel; @@ -45,14 +46,20 @@ impl StreamService for StreamServiceImpl { &self, request: Request, ) -> Result, Status> { - let epoch = request.into_inner().epoch; + let request = request.into_inner(); + let epoch = request.epoch; dispatch_state_store!(self.env.state_store(), store, { use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_storage::StateStore; store - .try_wait_epoch(HummockReadEpoch::Committed(epoch)) + .try_wait_epoch( + HummockReadEpoch::Committed(epoch), + TryWaitEpochOptions { + table_id: request.table_id.into(), + }, + ) .instrument_await(format!("wait_epoch_commit (epoch {})", epoch)) .await .map_err(StreamError::from)?; diff --git a/src/compute/tests/cdc_tests.rs b/src/compute/tests/cdc_tests.rs index 8fea0f48fa82d..fa92e4eefc3a9 100644 --- a/src/compute/tests/cdc_tests.rs +++ b/src/compute/tests/cdc_tests.rs @@ -39,7 +39,7 @@ use risingwave_connector::source::cdc::external::{ }; use risingwave_connector::source::cdc::DebeziumCdcSplit; use risingwave_connector::source::SplitImpl; -use risingwave_hummock_sdk::to_committed_batch_query_epoch; +use risingwave_hummock_sdk::test_batch_query_epoch; use risingwave_storage::memory::MemoryStateStore; use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_stream::common::table::state_table::StateTable; @@ -384,7 +384,7 @@ async fn test_cdc_backfill() -> StreamResult<()> { table.clone(), vec![ScanRange::full()], true, - to_committed_batch_query_epoch(u64::MAX), + test_batch_query_epoch(), 1024, "RowSeqExecutor2".to_string(), None, diff --git a/src/compute/tests/integration_tests.rs b/src/compute/tests/integration_tests.rs index 13a76c6989b48..f36cd6cf8164f 100644 --- a/src/compute/tests/integration_tests.rs +++ b/src/compute/tests/integration_tests.rs @@ -41,7 +41,7 @@ use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_connector::source::reader::desc::test_utils::create_source_desc_builder; use risingwave_dml::dml_manager::DmlManager; -use risingwave_hummock_sdk::to_committed_batch_query_epoch; +use risingwave_hummock_sdk::test_batch_query_epoch; use risingwave_pb::catalog::StreamSourceInfo; use risingwave_pb::plan_common::PbRowFormatType; use risingwave_storage::memory::MemoryStateStore; @@ -263,7 +263,7 @@ async fn test_table_materialize() -> StreamResult<()> { table.clone(), vec![ScanRange::full()], true, - to_committed_batch_query_epoch(u64::MAX), + test_batch_query_epoch(), 1024, "RowSeqExecutor2".to_string(), None, @@ -334,7 +334,7 @@ async fn test_table_materialize() -> StreamResult<()> { table.clone(), vec![ScanRange::full()], true, - to_committed_batch_query_epoch(u64::MAX), + test_batch_query_epoch(), 1024, "RowSeqScanExecutor2".to_string(), None, @@ -414,7 +414,7 @@ async fn test_table_materialize() -> StreamResult<()> { table, vec![ScanRange::full()], true, - to_committed_batch_query_epoch(u64::MAX), + test_batch_query_epoch(), 1024, "RowSeqScanExecutor2".to_string(), None, @@ -490,7 +490,7 @@ async fn test_row_seq_scan() -> StreamResult<()> { table, vec![ScanRange::full()], true, - to_committed_batch_query_epoch(u64::MAX), + test_batch_query_epoch(), 1, "RowSeqScanExecutor2".to_string(), None, diff --git a/src/frontend/src/optimizer/plan_node/batch_log_seq_scan.rs b/src/frontend/src/optimizer/plan_node/batch_log_seq_scan.rs index 93132ce65e51c..de8cadda4502d 100644 --- a/src/frontend/src/optimizer/plan_node/batch_log_seq_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_log_seq_scan.rs @@ -15,7 +15,7 @@ use pretty_xmlish::{Pretty, XmlNode}; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::LogRowSeqScanNode; -use risingwave_pb::common::BatchQueryEpoch; +use risingwave_pb::common::{BatchQueryCommittedEpoch, BatchQueryEpoch}; use super::batch::prelude::*; use super::utils::{childless_record, Distill}; @@ -112,12 +112,18 @@ impl TryToBatchPb for BatchLogSeqScan { vnode_bitmap: None, old_epoch: Some(BatchQueryEpoch { epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::Committed( - self.core.old_epoch, + BatchQueryCommittedEpoch { + epoch: self.core.old_epoch, + hummock_version_id: 0, + }, )), }), new_epoch: Some(BatchQueryEpoch { epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::Committed( - self.core.new_epoch, + BatchQueryCommittedEpoch { + epoch: self.core.new_epoch, + hummock_version_id: 0, + }, )), }), })) diff --git a/src/frontend/src/optimizer/plan_node/generic/log_scan.rs b/src/frontend/src/optimizer/plan_node/generic/log_scan.rs index 9a1e53aeab758..793cbaea4f205 100644 --- a/src/frontend/src/optimizer/plan_node/generic/log_scan.rs +++ b/src/frontend/src/optimizer/plan_node/generic/log_scan.rs @@ -20,6 +20,7 @@ use pretty_xmlish::Pretty; use risingwave_common::catalog::{Field, Schema, TableDesc}; use risingwave_common::types::DataType; use risingwave_common::util::sort_util::ColumnOrder; +use risingwave_hummock_sdk::HummockVersionId; use crate::catalog::ColumnId; use crate::optimizer::optimizer_context::OptimizerContextRef; @@ -44,6 +45,7 @@ pub struct LogScan { pub old_epoch: u64, pub new_epoch: u64, + pub version_id: HummockVersionId, } impl LogScan { @@ -101,6 +103,7 @@ impl LogScan { ctx: OptimizerContextRef, old_epoch: u64, new_epoch: u64, + version_id: HummockVersionId, ) -> Self { Self { table_name, @@ -110,6 +113,7 @@ impl LogScan { ctx, old_epoch, new_epoch, + version_id, } } diff --git a/src/frontend/src/scheduler/snapshot.rs b/src/frontend/src/scheduler/snapshot.rs index 49667b2aa143b..bb1d98aa5f8f7 100644 --- a/src/frontend/src/scheduler/snapshot.rs +++ b/src/frontend/src/scheduler/snapshot.rs @@ -22,7 +22,7 @@ use risingwave_hummock_sdk::version::HummockVersionStateTableInfo; use risingwave_hummock_sdk::{ FrontendHummockVersion, FrontendHummockVersionDelta, HummockVersionId, INVALID_VERSION_ID, }; -use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch}; +use risingwave_pb::common::{batch_query_epoch, BatchQueryCommittedEpoch, BatchQueryEpoch}; use risingwave_pb::hummock::{HummockVersionDeltas, StateTableInfoDelta}; use tokio::sync::watch; @@ -55,7 +55,10 @@ impl ReadSnapshot { Ok(match self { ReadSnapshot::FrontendPinned { snapshot } => BatchQueryEpoch { epoch: Some(batch_query_epoch::Epoch::Committed( - snapshot.batch_query_epoch(read_storage_tables)?.0, + BatchQueryCommittedEpoch { + epoch: snapshot.batch_query_epoch(read_storage_tables)?.0, + hummock_version_id: snapshot.value.id.to_u64(), + }, )), }, ReadSnapshot::ReadUncommitted => BatchQueryEpoch { diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index aa798a236b5c8..5270aced1f13f 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -30,6 +30,7 @@ use risingwave_common::catalog::Field; use risingwave_common::error::BoxedError; use risingwave_common::session_config::QueryMode; use risingwave_common::types::DataType; +use risingwave_hummock_sdk::HummockVersionId; use risingwave_sqlparser::ast::{Ident, ObjectName, Statement}; use super::SessionImpl; @@ -523,6 +524,18 @@ impl SubscriptionCursor { let init_query_timer = Instant::now(); let (chunk_stream, fields) = if let Some(rw_timestamp) = rw_timestamp { let context = OptimizerContext::from_handler_args(handle_args); + let version_id = { + let version = session.env.hummock_snapshot_manager.acquire(); + let version = version.version(); + if !version + .state_table_info + .info() + .contains_key(dependent_table_id) + { + return Err(anyhow!("table id {dependent_table_id} has been dropped").into()); + } + version.id + }; let plan_fragmenter_result = gen_batch_plan_fragmenter( &session, Self::create_batch_plan_for_cursor( @@ -531,6 +544,7 @@ impl SubscriptionCursor { context.into(), rw_timestamp, rw_timestamp, + version_id, )?, )?; create_chunk_stream_for_cursor(session, plan_fragmenter_result).await? @@ -606,6 +620,7 @@ impl SubscriptionCursor { context: OptimizerContextRef, old_epoch: u64, new_epoch: u64, + version_id: HummockVersionId, ) -> Result { let out_col_idx = table_catalog .columns @@ -621,6 +636,7 @@ impl SubscriptionCursor { context, old_epoch, new_epoch, + version_id, ); let batch_log_seq_scan = BatchLogSeqScan::new(core); let schema = batch_log_seq_scan diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index c18ad5d0f2b3b..1eff171d019b2 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -22,7 +22,6 @@ use risingwave_common::hash::ActorMapping; use risingwave_common::types::Timestamptz; use risingwave_common::util::epoch::Epoch; use risingwave_connector::source::SplitImpl; -use risingwave_hummock_sdk::HummockEpoch; use risingwave_pb::catalog::{CreateType, Table}; use risingwave_pb::common::PbWorkerNode; use risingwave_pb::meta::table_fragments::PbActorStatus; @@ -442,6 +441,8 @@ pub struct CommandContext { pub prev_epoch: TracedEpoch, pub curr_epoch: TracedEpoch, + pub table_ids_to_commit: HashSet, + pub current_paused_reason: Option, pub command: Command, @@ -470,12 +471,12 @@ impl std::fmt::Debug for CommandContext { } impl CommandContext { - #[allow(clippy::too_many_arguments)] pub(super) fn new( node_map: HashMap, subscription_info: InflightSubscriptionInfo, prev_epoch: TracedEpoch, curr_epoch: TracedEpoch, + table_ids_to_commit: HashSet, current_paused_reason: Option, command: Command, kind: BarrierKind, @@ -487,6 +488,7 @@ impl CommandContext { subscription_info, prev_epoch, curr_epoch, + table_ids_to_commit, current_paused_reason, command, kind, @@ -945,7 +947,13 @@ impl Command { } impl CommandContext { - pub async fn wait_epoch_commit(&self, epoch: HummockEpoch) -> MetaResult<()> { + pub async fn wait_epoch_commit(&self) -> MetaResult<()> { + let table_id = self.table_ids_to_commit.iter().next().cloned(); + // try wait epoch on an existing random table id + let Some(table_id) = table_id else { + // no need to wait epoch when there is no table id + return Ok(()); + }; let futures = self.node_map.values().map(|worker_node| async { let client = self .barrier_manager_context @@ -953,7 +961,10 @@ impl CommandContext { .stream_client_pool() .get(worker_node) .await?; - let request = WaitEpochCommitRequest { epoch }; + let request = WaitEpochCommitRequest { + epoch: self.prev_epoch.value().0, + table_id: table_id.table_id, + }; client.wait_epoch_commit(request).await }); @@ -976,7 +987,7 @@ impl CommandContext { // storage version with this epoch is synced to all compute nodes before the // execution of the next command of `Update`, as some newly created operators // may immediately initialize their states on that barrier. - self.wait_epoch_commit(self.prev_epoch.value().0).await?; + self.wait_epoch_commit().await?; } } diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index f47e400dce81a..1b00c9ddb7339 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -1028,11 +1028,14 @@ impl GlobalBarrierManager { }); span.record("epoch", curr_epoch.value().0); + let table_ids_to_commit: HashSet<_> = pre_applied_graph_info.existing_table_ids().collect(); + let command_ctx = Arc::new(CommandContext::new( self.active_streaming_nodes.current().clone(), pre_applied_subscription_info, prev_epoch.clone(), curr_epoch.clone(), + table_ids_to_commit.clone(), self.state.paused_reason(), command, kind, @@ -1042,8 +1045,6 @@ impl GlobalBarrierManager { send_latency_timer.observe_duration(); - let table_ids_to_commit: HashSet<_> = pre_applied_graph_info.existing_table_ids().collect(); - let mut jobs_to_wait = HashSet::new(); for (table_id, creating_job) in &mut self.checkpoint_control.creating_streaming_job_controls diff --git a/src/storage/hummock_sdk/src/lib.rs b/src/storage/hummock_sdk/src/lib.rs index 9eed880f8a5be..6058d3a16d5e8 100644 --- a/src/storage/hummock_sdk/src/lib.rs +++ b/src/storage/hummock_sdk/src/lib.rs @@ -208,8 +208,10 @@ impl PartialEq for LocalSstableInfo { /// Package read epoch of hummock, it be used for `wait_epoch` #[derive(Debug, Clone, Copy)] pub enum HummockReadEpoch { - /// We need to wait the `max_committed_epoch` + /// We need to wait the `committed_epoch` of the read table Committed(HummockEpoch), + /// We need to wait the `committed_epoch` of the read table and also the hummock version to the version id + BatchQueryCommitted(HummockEpoch, HummockVersionId), /// We don't need to wait epoch, we usually do stream reading with it. NoWait(HummockEpoch), /// We don't need to wait epoch. @@ -220,7 +222,10 @@ pub enum HummockReadEpoch { impl From for HummockReadEpoch { fn from(e: BatchQueryEpoch) -> Self { match e.epoch.unwrap() { - batch_query_epoch::Epoch::Committed(epoch) => HummockReadEpoch::Committed(epoch), + batch_query_epoch::Epoch::Committed(epoch) => HummockReadEpoch::BatchQueryCommitted( + epoch.epoch, + HummockVersionId::new(epoch.hummock_version_id), + ), batch_query_epoch::Epoch::Current(epoch) => { if epoch != HummockEpoch::MAX { warn!( @@ -236,19 +241,29 @@ impl From for HummockReadEpoch { } } -pub fn to_committed_batch_query_epoch(epoch: u64) -> BatchQueryEpoch { +pub fn test_batch_query_epoch() -> BatchQueryEpoch { BatchQueryEpoch { - epoch: Some(batch_query_epoch::Epoch::Committed(epoch)), + epoch: Some(batch_query_epoch::Epoch::Current(u64::MAX)), } } impl HummockReadEpoch { pub fn get_epoch(&self) -> HummockEpoch { *match self { - HummockReadEpoch::Committed(epoch) => epoch, - HummockReadEpoch::NoWait(epoch) => epoch, - HummockReadEpoch::Backup(epoch) => epoch, - HummockReadEpoch::TimeTravel(epoch) => epoch, + HummockReadEpoch::Committed(epoch) + | HummockReadEpoch::BatchQueryCommitted(epoch, _) + | HummockReadEpoch::NoWait(epoch) + | HummockReadEpoch::Backup(epoch) + | HummockReadEpoch::TimeTravel(epoch) => epoch, + } + } + + pub fn is_read_committed(&self) -> bool { + match self { + HummockReadEpoch::Committed(_) + | HummockReadEpoch::TimeTravel(_) + | HummockReadEpoch::BatchQueryCommitted(_, _) => true, + HummockReadEpoch::NoWait(_) | HummockReadEpoch::Backup(_) => false, } } } diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index 7be1cbdb834c4..1c627285f9348 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -422,6 +422,13 @@ impl HummockVersion { self.max_committed_epoch } + pub fn table_committed_epoch(&self, table_id: TableId) -> Option { + self.state_table_info + .info() + .get(&table_id) + .map(|info| info.committed_epoch) + } + pub fn visible_table_committed_epoch(&self) -> u64 { self.max_committed_epoch } diff --git a/src/storage/hummock_test/src/bin/replay/replay_impl.rs b/src/storage/hummock_test/src/bin/replay/replay_impl.rs index 282409f394476..6653db94e0543 100644 --- a/src/storage/hummock_test/src/bin/replay/replay_impl.rs +++ b/src/storage/hummock_test/src/bin/replay/replay_impl.rs @@ -25,7 +25,7 @@ use risingwave_hummock_sdk::{HummockReadEpoch, HummockVersionId, SyncResult}; use risingwave_hummock_trace::{ GlobalReplay, LocalReplay, LocalReplayRead, ReplayItem, ReplayRead, ReplayStateStore, ReplayWrite, Result, TraceError, TracedBytes, TracedInitOptions, TracedNewLocalOptions, - TracedReadOptions, TracedSealCurrentEpochOptions, TracedSubResp, + TracedReadOptions, TracedSealCurrentEpochOptions, TracedSubResp, TracedTryWaitEpochOptions, }; use risingwave_meta::manager::{MessageStatus, MetaSrvEnv, NotificationManagerRef, WorkerKey}; use risingwave_pb::common::WorkerNode; @@ -170,9 +170,13 @@ impl ReplayStateStore for GlobalReplayImpl { Box::new(LocalReplayImpl(local_storage)) } - async fn try_wait_epoch(&self, epoch: HummockReadEpoch) -> Result<()> { + async fn try_wait_epoch( + &self, + epoch: HummockReadEpoch, + options: TracedTryWaitEpochOptions, + ) -> Result<()> { self.store - .try_wait_epoch(epoch) + .try_wait_epoch(epoch, options.into()) .await .map_err(|_| TraceError::TryWaitEpochFailed)?; Ok(()) diff --git a/src/storage/hummock_test/src/failpoint_tests.rs b/src/storage/hummock_test/src/failpoint_tests.rs index 27072abba08f2..cba7851c8cf88 100644 --- a/src/storage/hummock_test/src/failpoint_tests.rs +++ b/src/storage/hummock_test/src/failpoint_tests.rs @@ -30,7 +30,8 @@ use risingwave_storage::hummock::test_utils::{count_stream, default_opts_for_tes use risingwave_storage::hummock::{CachePolicy, HummockStorage}; use risingwave_storage::storage_value::StorageValue; use risingwave_storage::store::{ - LocalStateStore, NewLocalOptions, PrefetchOptions, ReadOptions, StateStoreRead, WriteOptions, + LocalStateStore, NewLocalOptions, PrefetchOptions, ReadOptions, StateStoreRead, + TryWaitEpochOptions, WriteOptions, }; use risingwave_storage::StateStore; @@ -148,7 +149,10 @@ async fn test_failpoints_state_store_read_upload() { .unwrap(); meta_client.commit_epoch(1, res, false).await.unwrap(); hummock_storage - .try_wait_epoch(HummockReadEpoch::Committed(1)) + .try_wait_epoch( + HummockReadEpoch::Committed(1), + TryWaitEpochOptions::for_test(local.table_id()), + ) .await .unwrap(); // clear block cache @@ -225,7 +229,10 @@ async fn test_failpoints_state_store_read_upload() { .unwrap(); meta_client.commit_epoch(3, res, false).await.unwrap(); hummock_storage - .try_wait_epoch(HummockReadEpoch::Committed(3)) + .try_wait_epoch( + HummockReadEpoch::Committed(3), + TryWaitEpochOptions::for_test(local.table_id()), + ) .await .unwrap(); diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index c59d33130d586..c477693b114fb 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -568,7 +568,7 @@ async fn test_state_store_sync() { .commit_epoch(epoch1, res, false) .await .unwrap(); - test_env.storage.try_wait_epoch_for_test(epoch1).await; + test_env.wait_sync_committed_version().await; { // after sync 1 epoch let read_version = hummock_storage.read_version(); @@ -614,7 +614,7 @@ async fn test_state_store_sync() { .commit_epoch(epoch2, res, false) .await .unwrap(); - test_env.storage.try_wait_epoch_for_test(epoch2).await; + test_env.wait_sync_committed_version().await; { // after sync all epoch let read_version = hummock_storage.read_version(); @@ -912,7 +912,7 @@ async fn test_delete_get() { .commit_epoch(epoch2, res, false) .await .unwrap(); - test_env.storage.try_wait_epoch_for_test(epoch2).await; + test_env.wait_sync_committed_version().await; assert!(test_env .storage .get( @@ -1112,7 +1112,7 @@ async fn test_multiple_epoch_sync() { .commit_epoch(epoch3, sync_result3, false) .await .unwrap(); - test_env.storage.try_wait_epoch_for_test(epoch3).await; + test_env.wait_sync_committed_version().await; test_get(true).await; } @@ -1286,7 +1286,7 @@ async fn test_iter_with_min_epoch() { .commit_epoch(epoch2, sync_result2, false) .await .unwrap(); - test_env.storage.try_wait_epoch_for_test(epoch2).await; + test_env.wait_sync_committed_version().await; { let iter = test_env @@ -1578,7 +1578,7 @@ async fn test_hummock_version_reader() { .commit_epoch(epoch1, sync_result1, false) .await .unwrap(); - test_env.storage.try_wait_epoch_for_test(epoch1).await; + test_env.wait_sync_committed_version().await; let sync_result2 = test_env .storage @@ -1590,7 +1590,7 @@ async fn test_hummock_version_reader() { .commit_epoch(epoch2, sync_result2, false) .await .unwrap(); - test_env.storage.try_wait_epoch_for_test(epoch2).await; + test_env.wait_sync_committed_version().await; let sync_result3 = test_env .storage @@ -1602,7 +1602,7 @@ async fn test_hummock_version_reader() { .commit_epoch(epoch3, sync_result3, false) .await .unwrap(); - test_env.storage.try_wait_epoch_for_test(epoch3).await; + test_env.wait_sync_committed_version().await; { let (_, read_snapshot) = read_filter_for_version( epoch1, @@ -1996,7 +1996,7 @@ async fn test_get_with_min_epoch() { .await .unwrap(); - test_env.storage.try_wait_epoch_for_test(epoch2).await; + test_env.wait_sync_committed_version().await; let k = gen_key(0); let prefix_hint = { let mut ret = Vec::with_capacity(TABLE_PREFIX_LEN + k.len()); @@ -2433,7 +2433,7 @@ async fn test_table_watermark() { }; test_env.commit_epoch(epoch1).await; - test_env.storage.try_wait_epoch_for_test(epoch1).await; + test_env.wait_sync_committed_version().await; let (local1, local2) = test_after_epoch2(local1, local2).await; @@ -2522,7 +2522,7 @@ async fn test_table_watermark() { let (local1, local2) = test_after_epoch2(local1, local2).await; test_env.commit_epoch(epoch2).await; - test_env.storage.try_wait_epoch_for_test(epoch2).await; + test_env.wait_sync_committed_version().await; test_global_read(test_env.storage.clone(), epoch2).await; @@ -2557,7 +2557,7 @@ async fn test_table_watermark() { let (local1, local2) = test_after_epoch2(local1, local2).await; test_env.commit_epoch(epoch3).await; - test_env.storage.try_wait_epoch_for_test(epoch3).await; + test_env.wait_sync_committed_version().await; check_version_table_watermark(test_env.storage.get_pinned_version()); diff --git a/src/storage/hummock_test/src/snapshot_tests.rs b/src/storage/hummock_test/src/snapshot_tests.rs index 2567ce2276e90..670205850b89d 100644 --- a/src/storage/hummock_test/src/snapshot_tests.rs +++ b/src/storage/hummock_test/src/snapshot_tests.rs @@ -27,7 +27,7 @@ use risingwave_storage::hummock::{CachePolicy, HummockStorage}; use risingwave_storage::storage_value::StorageValue; use risingwave_storage::store::{ LocalStateStore, NewLocalOptions, PrefetchOptions, ReadOptions, SealCurrentEpochOptions, - StateStoreRead, WriteOptions, + StateStoreRead, TryWaitEpochOptions, WriteOptions, }; use risingwave_storage::StateStore; @@ -157,7 +157,10 @@ async fn test_snapshot_inner( .await .unwrap(); hummock_storage - .try_wait_epoch(HummockReadEpoch::Committed(epoch1)) + .try_wait_epoch( + HummockReadEpoch::Committed(epoch1), + TryWaitEpochOptions::for_test(Default::default()), + ) .await .unwrap(); } @@ -201,7 +204,10 @@ async fn test_snapshot_inner( .await .unwrap(); hummock_storage - .try_wait_epoch(HummockReadEpoch::Committed(epoch2)) + .try_wait_epoch( + HummockReadEpoch::Committed(epoch2), + TryWaitEpochOptions::for_test(Default::default()), + ) .await .unwrap(); } @@ -251,7 +257,10 @@ async fn test_snapshot_inner( .await .unwrap(); hummock_storage - .try_wait_epoch(HummockReadEpoch::Committed(epoch3)) + .try_wait_epoch( + HummockReadEpoch::Committed(epoch3), + TryWaitEpochOptions::for_test(Default::default()), + ) .await .unwrap(); } @@ -334,7 +343,10 @@ async fn test_snapshot_range_scan_inner( .await .unwrap(); hummock_storage - .try_wait_epoch(HummockReadEpoch::Committed(epoch)) + .try_wait_epoch( + HummockReadEpoch::Committed(epoch), + TryWaitEpochOptions::for_test(local.table_id()), + ) .await .unwrap(); } diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index deb0493250f41..a6cbca021ce51 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -31,6 +31,7 @@ use risingwave_hummock_sdk::{ HummockReadEpoch, HummockSstableObjectId, LocalSstableInfo, SyncResult, }; use risingwave_meta::hummock::test_utils::setup_compute_env; +use risingwave_meta::hummock::{CommitEpochInfo, NewTableFragmentInfo}; use risingwave_rpc_client::HummockMetaClient; use risingwave_storage::hummock::iterator::change_log::test_utils::{ apply_test_log_data, gen_test_data, @@ -385,7 +386,10 @@ async fn test_basic_v2() { .await .unwrap(); hummock_storage - .try_wait_epoch(HummockReadEpoch::Committed(epoch1)) + .try_wait_epoch( + HummockReadEpoch::Committed(epoch1), + TryWaitEpochOptions::for_test(local.table_id()), + ) .await .unwrap(); let value = hummock_storage @@ -1102,7 +1106,10 @@ async fn test_delete_get_v2() { .await .unwrap(); hummock_storage - .try_wait_epoch(HummockReadEpoch::Committed(epoch2)) + .try_wait_epoch( + HummockReadEpoch::Committed(epoch2), + TryWaitEpochOptions::for_test(local.table_id()), + ) .await .unwrap(); assert!(hummock_storage @@ -1274,7 +1281,10 @@ async fn test_multiple_epoch_sync_v2() { .await .unwrap(); hummock_storage - .try_wait_epoch(HummockReadEpoch::Committed(epoch3)) + .try_wait_epoch( + HummockReadEpoch::Committed(epoch3), + TryWaitEpochOptions::for_test(local.table_id()), + ) .await .unwrap(); test_get(true).await; @@ -1376,7 +1386,10 @@ async fn test_gc_watermark_and_clear_shared_buffer() { .await .unwrap(); hummock_storage - .try_wait_epoch(HummockReadEpoch::Committed(epoch1)) + .try_wait_epoch( + HummockReadEpoch::Committed(epoch1), + TryWaitEpochOptions::for_test(local_hummock_storage.table_id()), + ) .await .unwrap(); @@ -1408,7 +1421,31 @@ async fn test_gc_watermark_and_clear_shared_buffer() { async fn test_replicated_local_hummock_storage() { const TEST_TABLE_ID: TableId = TableId { table_id: 233 }; - let (hummock_storage, _meta_client) = with_hummock_storage_v2(Default::default()).await; + let (hummock_storage, meta_client) = with_hummock_storage_v2(Default::default()).await; + + let epoch0 = meta_client + .hummock_manager_ref() + .on_current_version(|version| version.visible_table_committed_epoch()) + .await; + + let epoch0 = epoch0.next_epoch(); + + meta_client + .hummock_manager_ref() + .commit_epoch(CommitEpochInfo { + sstables: vec![], + new_table_watermarks: Default::default(), + sst_to_context: Default::default(), + new_table_fragment_info: NewTableFragmentInfo::NewCompactionGroup { + table_ids: HashSet::from_iter([TEST_TABLE_ID]), + }, + change_log_delta: Default::default(), + committed_epoch: epoch0, + tables_to_commit: Default::default(), + is_visible_table_committed_epoch: true, + }) + .await + .unwrap(); let read_options = ReadOptions { table_id: TableId { @@ -1429,12 +1466,6 @@ async fn test_replicated_local_hummock_storage() { )) .await; - let epoch0 = local_hummock_storage - .read_version() - .read() - .committed() - .max_committed_epoch(); - let epoch1 = epoch0.next_epoch(); local_hummock_storage.init_for_test(epoch1).await.unwrap(); @@ -1484,13 +1515,13 @@ async fn test_replicated_local_hummock_storage() { [ Ok( ( - FullKey { UserKey { 233, TableKey { 000061616161 } }, epoch: 65536, epoch_with_gap: 65536, spill_offset: 0}, + FullKey { UserKey { 233, TableKey { 000061616161 } }, epoch: 131072, epoch_with_gap: 131072, spill_offset: 0}, b"1111", ), ), Ok( ( - FullKey { UserKey { 233, TableKey { 000062626262 } }, epoch: 65536, epoch_with_gap: 65536, spill_offset: 0}, + FullKey { UserKey { 233, TableKey { 000062626262 } }, epoch: 131072, epoch_with_gap: 131072, spill_offset: 0}, b"2222", ), ), @@ -1552,13 +1583,13 @@ async fn test_replicated_local_hummock_storage() { [ Ok( ( - FullKey { UserKey { 233, TableKey { 000063636363 } }, epoch: 131072, epoch_with_gap: 131072, spill_offset: 0}, + FullKey { UserKey { 233, TableKey { 000063636363 } }, epoch: 196608, epoch_with_gap: 196608, spill_offset: 0}, b"3333", ), ), Ok( ( - FullKey { UserKey { 233, TableKey { 000064646464 } }, epoch: 131072, epoch_with_gap: 131072, spill_offset: 0}, + FullKey { UserKey { 233, TableKey { 000064646464 } }, epoch: 196608, epoch_with_gap: 196608, spill_offset: 0}, b"4444", ), ), @@ -1650,8 +1681,12 @@ async fn test_iter_log() { } hummock_storage - .try_wait_epoch_for_test(test_log_data.last().unwrap().0) - .await; + .try_wait_epoch( + HummockReadEpoch::Committed(test_log_data.last().unwrap().0), + TryWaitEpochOptions { table_id }, + ) + .await + .unwrap(); let verify_state_store = VerifyStateStore { actual: hummock_storage, diff --git a/src/storage/hummock_test/src/test_utils.rs b/src/storage/hummock_test/src/test_utils.rs index da861ff92810c..5a613c1d54eff 100644 --- a/src/storage/hummock_test/src/test_utils.rs +++ b/src/storage/hummock_test/src/test_utils.rs @@ -262,7 +262,12 @@ impl HummockTestEnv { .await .unwrap(); - self.storage.try_wait_epoch_for_test(epoch).await; + self.wait_sync_committed_version().await; + } + + pub async fn wait_sync_committed_version(&self) { + let version = self.manager.get_current_version().await; + self.storage.wait_version(version).await; } } diff --git a/src/storage/hummock_trace/src/collector.rs b/src/storage/hummock_trace/src/collector.rs index 979bf067db861..068cbdcee45ed 100644 --- a/src/storage/hummock_trace/src/collector.rs +++ b/src/storage/hummock_trace/src/collector.rs @@ -37,7 +37,7 @@ use crate::write::{TraceWriter, TraceWriterImpl}; use crate::{ ConcurrentIdGenerator, Operation, OperationResult, Record, RecordId, RecordIdGenerator, TracedInitOptions, TracedNewLocalOptions, TracedReadOptions, TracedSealCurrentEpochOptions, - TracedSubResp, UniqueIdGenerator, + TracedSubResp, TracedTryWaitEpochOptions, UniqueIdGenerator, }; // Global collector instance used for trace collection @@ -216,8 +216,14 @@ impl TraceSpan { Self::new_global_op(Operation::SealCurrentEpoch { epoch, opts }, storage_type) } - pub fn new_try_wait_epoch_span(epoch: HummockReadEpoch) -> MayTraceSpan { - Self::new_global_op(Operation::TryWaitEpoch(epoch.into()), StorageType::Global) + pub fn new_try_wait_epoch_span( + epoch: HummockReadEpoch, + options: TracedTryWaitEpochOptions, + ) -> MayTraceSpan { + Self::new_global_op( + Operation::TryWaitEpoch(epoch.into(), options), + StorageType::Global, + ) } pub fn new_get_span( diff --git a/src/storage/hummock_trace/src/opts.rs b/src/storage/hummock_trace/src/opts.rs index fe4280c264f4d..480f0315eb396 100644 --- a/src/storage/hummock_trace/src/opts.rs +++ b/src/storage/hummock_trace/src/opts.rs @@ -18,7 +18,7 @@ use risingwave_common::bitmap::Bitmap; use risingwave_common::cache::CachePriority; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::util::epoch::EpochPair; -use risingwave_hummock_sdk::HummockReadEpoch; +use risingwave_hummock_sdk::{HummockReadEpoch, HummockVersionId}; use risingwave_pb::common::PbBuffer; use crate::TracedBytes; @@ -170,6 +170,11 @@ pub struct TracedNewLocalOptions { pub vnodes: TracedBitmap, } +#[derive(Encode, Decode, PartialEq, Debug, Clone)] +pub struct TracedTryWaitEpochOptions { + pub table_id: TracedTableId, +} + #[cfg(test)] impl TracedNewLocalOptions { pub(crate) fn for_test(table_id: u32) -> Self { @@ -192,6 +197,7 @@ pub type TracedHummockEpoch = u64; #[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)] pub enum TracedHummockReadEpoch { Committed(TracedHummockEpoch), + BatchQueryReadCommitted(TracedHummockEpoch, u64), NoWait(TracedHummockEpoch), Backup(TracedHummockEpoch), TimeTravel(TracedHummockEpoch), @@ -201,6 +207,9 @@ impl From for TracedHummockReadEpoch { fn from(value: HummockReadEpoch) -> Self { match value { HummockReadEpoch::Committed(epoch) => Self::Committed(epoch), + HummockReadEpoch::BatchQueryCommitted(epoch, version_id) => { + Self::BatchQueryReadCommitted(epoch, version_id.to_u64()) + } HummockReadEpoch::NoWait(epoch) => Self::NoWait(epoch), HummockReadEpoch::Backup(epoch) => Self::Backup(epoch), HummockReadEpoch::TimeTravel(epoch) => Self::TimeTravel(epoch), @@ -212,6 +221,9 @@ impl From for HummockReadEpoch { fn from(value: TracedHummockReadEpoch) -> Self { match value { TracedHummockReadEpoch::Committed(epoch) => Self::Committed(epoch), + TracedHummockReadEpoch::BatchQueryReadCommitted(epoch, version_id) => { + Self::BatchQueryCommitted(epoch, HummockVersionId::new(version_id)) + } TracedHummockReadEpoch::NoWait(epoch) => Self::NoWait(epoch), TracedHummockReadEpoch::Backup(epoch) => Self::Backup(epoch), TracedHummockReadEpoch::TimeTravel(epoch) => Self::TimeTravel(epoch), diff --git a/src/storage/hummock_trace/src/record.rs b/src/storage/hummock_trace/src/record.rs index 2995b7f13c8a7..a9ae562f02b4c 100644 --- a/src/storage/hummock_trace/src/record.rs +++ b/src/storage/hummock_trace/src/record.rs @@ -23,7 +23,7 @@ use risingwave_pb::meta::SubscribeResponse; use crate::{ LocalStorageId, StorageType, TracedHummockReadEpoch, TracedInitOptions, TracedNewLocalOptions, - TracedReadOptions, TracedSealCurrentEpochOptions, + TracedReadOptions, TracedSealCurrentEpochOptions, TracedTryWaitEpochOptions, }; pub type RecordId = u64; @@ -164,7 +164,7 @@ pub enum Operation { LocalStorageInit(TracedInitOptions), /// Try wait epoch - TryWaitEpoch(TracedHummockReadEpoch), + TryWaitEpoch(TracedHummockReadEpoch, TracedTryWaitEpochOptions), /// Seal current epoch SealCurrentEpoch { diff --git a/src/storage/hummock_trace/src/replay/mod.rs b/src/storage/hummock_trace/src/replay/mod.rs index 91e80cad1c4b6..347ef30704570 100644 --- a/src/storage/hummock_trace/src/replay/mod.rs +++ b/src/storage/hummock_trace/src/replay/mod.rs @@ -33,7 +33,7 @@ use crate::error::Result; use crate::TraceError; use crate::{ LocalStorageId, Record, TracedBytes, TracedInitOptions, TracedNewLocalOptions, - TracedReadOptions, TracedSealCurrentEpochOptions, + TracedReadOptions, TracedSealCurrentEpochOptions, TracedTryWaitEpochOptions, }; pub type ReplayItem = (TracedBytes, TracedBytes); @@ -118,7 +118,11 @@ pub trait ReplayStateStore { async fn sync(&self, id: u64, table_ids: Vec) -> Result; async fn notify_hummock(&self, info: Info, op: RespOperation, version: u64) -> Result; async fn new_local(&self, opts: TracedNewLocalOptions) -> Box; - async fn try_wait_epoch(&self, epoch: HummockReadEpoch) -> Result<()>; + async fn try_wait_epoch( + &self, + epoch: HummockReadEpoch, + options: TracedTryWaitEpochOptions, + ) -> Result<()>; } // define mock trait for replay interfaces @@ -147,7 +151,7 @@ mock! { async fn notify_hummock(&self, info: Info, op: RespOperation, version: u64, ) -> Result; async fn new_local(&self, opts: TracedNewLocalOptions) -> Box; - async fn try_wait_epoch(&self, epoch: HummockReadEpoch) -> Result<()>; + async fn try_wait_epoch(&self, epoch: HummockReadEpoch,options: TracedTryWaitEpochOptions) -> Result<()>; } impl GlobalReplay for GlobalReplayInterface{} } diff --git a/src/storage/hummock_trace/src/replay/worker.rs b/src/storage/hummock_trace/src/replay/worker.rs index 8a02c3efde196..08d877cadf3ab 100644 --- a/src/storage/hummock_trace/src/replay/worker.rs +++ b/src/storage/hummock_trace/src/replay/worker.rs @@ -310,11 +310,11 @@ impl ReplayWorker { let local_storage = local_storages.get_mut(&storage_type).unwrap(); local_storage.init(options).await.unwrap(); } - Operation::TryWaitEpoch(epoch) => { + Operation::TryWaitEpoch(epoch, options) => { assert_eq!(storage_type, StorageType::Global); let res = res_rx.recv().await.expect("recv result failed"); if let OperationResult::TryWaitEpoch(expected) = res { - let actual = replay.try_wait_epoch(epoch.into()).await; + let actual = replay.try_wait_epoch(epoch.into(), options).await; assert_eq!(TraceResult::from(actual), expected, "try_wait_epoch wrong"); } else { panic!( diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index 61ee49c0b5340..80e16dba1dd4d 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -197,7 +197,7 @@ pub struct HummockEventHandler { /// A copy of `read_version_mapping` but owned by event handler local_read_version_mapping: HashMap, - version_update_notifier_tx: Arc>, + version_update_notifier_tx: Arc>, recent_versions: Arc>, write_conflict_detector: Option>, @@ -316,8 +316,7 @@ impl HummockEventHandler { ) -> Self { let (hummock_event_tx, hummock_event_rx) = event_channel(state_store_metrics.event_handler_pending_event.clone()); - let (version_update_notifier_tx, _) = - tokio::sync::watch::channel(pinned_version.visible_table_committed_epoch()); + let (version_update_notifier_tx, _) = tokio::sync::watch::channel(pinned_version.clone()); let version_update_notifier_tx = Arc::new(version_update_notifier_tx); let read_version_mapping = Arc::new(RwLock::new(HashMap::default())); let buffer_tracker = BufferTracker::from_storage_opts( @@ -371,7 +370,7 @@ impl HummockEventHandler { } } - pub fn version_update_notifier_tx(&self) -> Arc> { + pub fn version_update_notifier_tx(&self) -> Arc> { self.version_update_notifier_tx.clone() } @@ -648,18 +647,16 @@ impl HummockEventHandler { ); } - let prev_max_committed_epoch = pinned_version.visible_table_committed_epoch(); let max_committed_epoch = new_pinned_version.visible_table_committed_epoch(); - // only notify local_version_manager when MCE change self.version_update_notifier_tx.send_if_modified(|state| { - assert_eq!(prev_max_committed_epoch, *state); - if max_committed_epoch > *state { - *state = max_committed_epoch; - true - } else { - false + assert_eq!(pinned_version.id(), state.id()); + if state.id() == new_pinned_version.id() { + return false; } + assert!(new_pinned_version.id() > state.id()); + *state = new_pinned_version.clone(); + true }); if let Some(conflict_detector) = self.write_conflict_detector.as_ref() { diff --git a/src/storage/src/hummock/event_handler/uploader/mod.rs b/src/storage/src/hummock/event_handler/uploader/mod.rs index 0cf066f212056..9331d63340b42 100644 --- a/src/storage/src/hummock/event_handler/uploader/mod.rs +++ b/src/storage/src/hummock/event_handler/uploader/mod.rs @@ -1202,10 +1202,7 @@ impl HummockUploader { self.context .pinned_version .version() - .state_table_info - .info() - .get(table_id) - .map(|info| info.committed_epoch), + .table_committed_epoch(*table_id), ) }); table_data.new_epoch(epoch); diff --git a/src/storage/src/hummock/local_version/recent_versions.rs b/src/storage/src/hummock/local_version/recent_versions.rs index 47ddc634a1663..ad711b0053313 100644 --- a/src/storage/src/hummock/local_version/recent_versions.rs +++ b/src/storage/src/hummock/local_version/recent_versions.rs @@ -140,12 +140,7 @@ impl RecentVersions { ); } let result = self.recent_versions.binary_search_by(|version| { - let committed_epoch = version - .version() - .state_table_info - .info() - .get(&table_id) - .map(|info| info.committed_epoch); + let committed_epoch = version.version().table_committed_epoch(table_id); if let Some(committed_epoch) = committed_epoch { committed_epoch.cmp(&epoch) } else { diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index b832a8e865b68..df8ab9ab6a782 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -53,7 +53,7 @@ use crate::hummock::local_version::pinned_version::{start_pinned_version_worker, use crate::hummock::local_version::recent_versions::RecentVersions; use crate::hummock::observer_manager::HummockObserverNode; use crate::hummock::time_travel_version_cache::SimpleTimeTravelVersionCache; -use crate::hummock::utils::wait_for_epoch; +use crate::hummock::utils::{wait_for_epoch, wait_for_update}; use crate::hummock::write_limiter::{WriteLimiter, WriteLimiterRef}; use crate::hummock::{ HummockEpoch, HummockError, HummockResult, HummockStorageIterator, HummockStorageRevIterator, @@ -96,7 +96,7 @@ pub struct HummockStorage { buffer_tracker: BufferTracker, - version_update_notifier_tx: Arc>, + version_update_notifier_tx: Arc>, recent_versions: Arc>, @@ -628,15 +628,67 @@ impl StateStore for HummockStorage { /// Waits until the local hummock version contains the epoch. If `wait_epoch` is `Current`, /// we will only check whether it is le `sealed_epoch` and won't wait. - async fn try_wait_epoch(&self, wait_epoch: HummockReadEpoch) -> StorageResult<()> { - let wait_epoch = match wait_epoch { - HummockReadEpoch::Committed(epoch) => { - assert!(!is_max_epoch(epoch), "epoch should not be MAX EPOCH"); - epoch + async fn try_wait_epoch( + &self, + wait_epoch: HummockReadEpoch, + options: TryWaitEpochOptions, + ) -> StorageResult<()> { + match wait_epoch { + HummockReadEpoch::Committed(wait_epoch) => { + assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH"); + wait_for_epoch( + &self.version_update_notifier_tx, + wait_epoch, + options.table_id, + ) + .await?; + } + HummockReadEpoch::BatchQueryCommitted(wait_epoch, wait_version_id) => { + assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH"); + // fast path by checking recent_versions + { + let recent_versions = self.recent_versions.load(); + let latest_version = recent_versions.latest_version().version(); + if latest_version.id >= wait_version_id + && let Some(committed_epoch) = + latest_version.table_committed_epoch(options.table_id) + && committed_epoch >= wait_epoch + { + return Ok(()); + } + } + wait_for_update( + &self.version_update_notifier_tx, + |version| { + if wait_version_id > version.id() { + return Ok(false); + } + let committed_epoch = version + .version() + .table_committed_epoch(options.table_id) + .ok_or_else(|| { + // In batch query, since we have ensured that the current version must be after the + // `wait_version_id`, when seeing that the table_id not exist in the latest version, + // the table must have been dropped. + HummockError::wait_epoch(format!( + "table id {} has been dropped", + options.table_id + )) + })?; + Ok(committed_epoch >= wait_epoch) + }, + || { + format!( + "try_wait_epoch: epoch: {}, version_id: {:?}", + wait_epoch, wait_version_id + ) + }, + ) + .await?; } - _ => return Ok(()), + _ => {} }; - wait_for_epoch(&self.version_update_notifier_tx, wait_epoch).await + Ok(()) } fn sync(&self, epoch: u64, table_ids: HashSet) -> impl SyncFuture { @@ -700,13 +752,6 @@ impl HummockStorage { self.buffer_tracker.get_buffer_size() } - pub async fn try_wait_epoch_for_test(&self, wait_epoch: u64) { - let mut rx = self.version_update_notifier_tx.subscribe(); - while *(rx.borrow_and_update()) < wait_epoch { - rx.changed().await.unwrap(); - } - } - /// Creates a [`HummockStorage`] with default stats. Should only be used by tests. pub async fn for_test( options: Arc, diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index 2221b4350ebc9..d0082f21b31f9 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -25,7 +25,7 @@ use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::MAX_SPILL_TIMES; use risingwave_hummock_sdk::key::{is_empty_key_range, vnode_range, TableKey, TableKeyRange}; use risingwave_hummock_sdk::sstable_info::SstableInfo; -use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch}; +use risingwave_hummock_sdk::EpochWithGap; use tracing::{warn, Instrument}; use super::version::{StagingData, VersionUpdate}; @@ -37,6 +37,7 @@ use crate::hummock::iterator::{ Backward, BackwardUserIterator, ConcatIteratorInner, Forward, HummockIteratorUnion, IteratorFactory, MergeIterator, UserIterator, }; +use crate::hummock::local_version::pinned_version::PinnedVersion; use crate::hummock::shared_buffer::shared_buffer_batch::{ SharedBufferBatch, SharedBufferBatchIterator, SharedBufferBatchOldValues, SharedBufferItem, SharedBufferValue, @@ -96,7 +97,7 @@ pub struct LocalHummockStorage { write_limiter: WriteLimiterRef, - version_update_notifier_tx: Arc>, + version_update_notifier_tx: Arc>, mem_table_spill_threshold: usize, } @@ -135,7 +136,7 @@ impl LocalHummockStorage { } pub async fn wait_for_epoch(&self, wait_epoch: u64) -> StorageResult<()> { - wait_for_epoch(&self.version_update_notifier_tx, wait_epoch).await + wait_for_epoch(&self.version_update_notifier_tx, wait_epoch, self.table_id).await } pub async fn iter_flushed( @@ -658,7 +659,7 @@ impl LocalHummockStorage { memory_limiter: Arc, write_limiter: WriteLimiterRef, option: NewLocalOptions, - version_update_notifier_tx: Arc>, + version_update_notifier_tx: Arc>, mem_table_spill_threshold: usize, ) -> Self { let stats = hummock_version_reader.stats().clone(); diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 7d0dc49847398..23631ba2d81ea 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -428,10 +428,7 @@ impl HummockReadVersion { vnode_watermarks, self.committed .version() - .state_table_info - .info() - .get(&self.table_id) - .map(|info| info.committed_epoch), + .table_committed_epoch(self.table_id), )); } } diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index c2f6cbafed79b..87c40992ea513 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -19,22 +19,23 @@ use std::ops::Bound::{Excluded, Included, Unbounded}; use std::ops::{Bound, RangeBounds}; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering}; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use bytes::Bytes; use foyer::CacheContext; use parking_lot::Mutex; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::config::StorageMemoryConfig; +use risingwave_hummock_sdk::can_concat; use risingwave_hummock_sdk::key::{ bound_table_key_range, EmptySliceRef, FullKey, TableKey, UserKey, }; use risingwave_hummock_sdk::sstable_info::SstableInfo; -use risingwave_hummock_sdk::{can_concat, HummockEpoch}; use tokio::sync::oneshot::{channel, Receiver, Sender}; -use super::{HummockError, SstableStoreRef}; +use super::{HummockError, HummockResult, SstableStoreRef}; use crate::error::StorageResult; +use crate::hummock::local_version::pinned_version::PinnedVersion; use crate::hummock::CachePolicy; use crate::mem_table::{KeyOp, MemTableError}; use crate::monitor::MemoryCollector; @@ -574,16 +575,59 @@ pub(crate) fn filter_with_delete_range<'a>( }) } +/// Wait for the `committed_epoch` of `table_id` to reach `wait_epoch`. +/// +/// When the `table_id` does not exist in the latest version, we assume that +/// the table is not created yet, and will wait until the table is created. pub(crate) async fn wait_for_epoch( - notifier: &tokio::sync::watch::Sender, + notifier: &tokio::sync::watch::Sender, wait_epoch: u64, + table_id: TableId, ) -> StorageResult<()> { + let mut prev_committed_epoch = None; + let prev_committed_epoch = &mut prev_committed_epoch; + wait_for_update( + notifier, + |version| { + let committed_epoch = version.version().table_committed_epoch(table_id); + let ret = if let Some(committed_epoch) = committed_epoch { + if committed_epoch >= wait_epoch { + Ok(true) + } else { + Ok(false) + } + } else if prev_committed_epoch.is_none() { + Ok(false) + } else { + Err(HummockError::wait_epoch(format!( + "table {} has been dropped", + table_id + ))) + }; + *prev_committed_epoch = committed_epoch; + ret + }, + || { + format!( + "wait_for_epoch: epoch: {}, table_id: {}", + wait_epoch, table_id + ) + }, + ) + .await?; + Ok(()) +} + +pub(crate) async fn wait_for_update( + notifier: &tokio::sync::watch::Sender, + mut inspect_fn: impl FnMut(&PinnedVersion) -> HummockResult, + mut periodic_debug_info: impl FnMut() -> String, +) -> HummockResult<()> { let mut receiver = notifier.subscribe(); - // avoid unnecessary check in the loop if the value does not change - let max_committed_epoch = *receiver.borrow_and_update(); - if max_committed_epoch >= wait_epoch { + if inspect_fn(&receiver.borrow_and_update())? { return Ok(()); } + let start_time = Instant::now(); loop { match tokio::time::timeout(Duration::from_secs(30), receiver.changed()).await { Err(_) => { @@ -597,17 +641,17 @@ pub(crate) async fn wait_for_epoch( // CN with the same distribution as the upstream MV. // See #3845 for more details. tracing::warn!( - epoch = wait_epoch, - "wait_epoch timeout when waiting for version update", + info = periodic_debug_info(), + elapsed = ?start_time.elapsed(), + "timeout when waiting for version update", ); continue; } Ok(Err(_)) => { - return Err(HummockError::wait_epoch("tx dropped").into()); + return Err(HummockError::wait_epoch("tx dropped")); } Ok(Ok(_)) => { - let max_committed_epoch = *receiver.borrow(); - if max_committed_epoch >= wait_epoch { + if inspect_fn(&receiver.borrow_and_update())? { return Ok(()); } } diff --git a/src/storage/src/memory.rs b/src/storage/src/memory.rs index 7416f54688a5f..9558811a2bdb0 100644 --- a/src/storage/src/memory.rs +++ b/src/storage/src/memory.rs @@ -735,7 +735,11 @@ impl StateStore for RangeKvStateStore { type Local = MemtableLocalStateStore; #[allow(clippy::unused_async)] - async fn try_wait_epoch(&self, _epoch: HummockReadEpoch) -> StorageResult<()> { + async fn try_wait_epoch( + &self, + _epoch: HummockReadEpoch, + _options: TryWaitEpochOptions, + ) -> StorageResult<()> { // memory backend doesn't need to wait for epoch, so this is a no-op. Ok(()) } diff --git a/src/storage/src/monitor/monitored_store.rs b/src/storage/src/monitor/monitored_store.rs index ec2785c354229..a7be71307ffec 100644 --- a/src/storage/src/monitor/monitored_store.rs +++ b/src/storage/src/monitor/monitored_store.rs @@ -298,9 +298,10 @@ impl StateStore for MonitoredStateStore { fn try_wait_epoch( &self, epoch: HummockReadEpoch, + options: TryWaitEpochOptions, ) -> impl Future> + Send + '_ { self.inner - .try_wait_epoch(epoch) + .try_wait_epoch(epoch, options) .verbose_instrument_await("store_wait_epoch") .inspect_err(|e| error!(error = %e.as_report(), "Failed in wait_epoch")) } diff --git a/src/storage/src/monitor/traced_store.rs b/src/storage/src/monitor/traced_store.rs index dac8712a924d7..8bd8013ba3810 100644 --- a/src/storage/src/monitor/traced_store.rs +++ b/src/storage/src/monitor/traced_store.rs @@ -242,10 +242,14 @@ impl LocalStateStore for TracedStateStore { impl StateStore for TracedStateStore { type Local = TracedStateStore; - async fn try_wait_epoch(&self, epoch: HummockReadEpoch) -> StorageResult<()> { - let span = TraceSpan::new_try_wait_epoch_span(epoch); + async fn try_wait_epoch( + &self, + epoch: HummockReadEpoch, + options: TryWaitEpochOptions, + ) -> StorageResult<()> { + let span = TraceSpan::new_try_wait_epoch_span(epoch, options.clone().into()); - let res = self.inner.try_wait_epoch(epoch).await; + let res = self.inner.try_wait_epoch(epoch, options).await; span.may_send_result(OperationResult::TryWaitEpoch( res.as_ref().map(|o| *o).into(), )); diff --git a/src/storage/src/panic_store.rs b/src/storage/src/panic_store.rs index a7fb7c3643ed0..03b0471f90446 100644 --- a/src/storage/src/panic_store.rs +++ b/src/storage/src/panic_store.rs @@ -173,7 +173,11 @@ impl StateStore for PanicStateStore { type Local = Self; #[allow(clippy::unused_async)] - async fn try_wait_epoch(&self, _epoch: HummockReadEpoch) -> StorageResult<()> { + async fn try_wait_epoch( + &self, + _epoch: HummockReadEpoch, + _options: TryWaitEpochOptions, + ) -> StorageResult<()> { panic!("should not wait epoch from the panic state store!"); } diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 3f87d88617ca4..6eadf5ba8a64a 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -35,7 +35,8 @@ use risingwave_hummock_sdk::table_watermark::{VnodeWatermark, WatermarkDirection use risingwave_hummock_sdk::{HummockReadEpoch, SyncResult}; use risingwave_hummock_trace::{ TracedInitOptions, TracedNewLocalOptions, TracedOpConsistencyLevel, TracedPrefetchOptions, - TracedReadOptions, TracedSealCurrentEpochOptions, TracedWriteOptions, + TracedReadOptions, TracedSealCurrentEpochOptions, TracedTryWaitEpochOptions, + TracedWriteOptions, }; use risingwave_pb::hummock::PbVnodeWatermark; @@ -344,6 +345,34 @@ pub trait StateStoreWrite: StaticSendSync { pub trait SyncFuture = Future> + Send + 'static; +#[derive(Clone)] +pub struct TryWaitEpochOptions { + pub table_id: TableId, +} + +impl TryWaitEpochOptions { + #[cfg(any(test, feature = "test"))] + pub fn for_test(table_id: TableId) -> Self { + Self { table_id } + } +} + +impl From for TryWaitEpochOptions { + fn from(value: TracedTryWaitEpochOptions) -> Self { + Self { + table_id: value.table_id.into(), + } + } +} + +impl From for TracedTryWaitEpochOptions { + fn from(value: TryWaitEpochOptions) -> Self { + Self { + table_id: value.table_id.into(), + } + } +} + pub trait StateStore: StateStoreRead + StaticSendSync + Clone { type Local: LocalStateStore; @@ -352,6 +381,7 @@ pub trait StateStore: StateStoreRead + StaticSendSync + Clone { fn try_wait_epoch( &self, epoch: HummockReadEpoch, + options: TryWaitEpochOptions, ) -> impl Future> + Send + '_; fn sync(&self, epoch: u64, table_ids: HashSet) -> impl SyncFuture; diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index c995dc5ac5f6f..a1df9fcd82b99 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -568,8 +568,9 @@ pub mod verify { fn try_wait_epoch( &self, epoch: HummockReadEpoch, + options: TryWaitEpochOptions, ) -> impl Future> + Send + '_ { - self.actual.try_wait_epoch(epoch) + self.actual.try_wait_epoch(epoch, options) } fn sync(&self, epoch: u64, table_ids: HashSet) -> impl SyncFuture { @@ -1134,7 +1135,11 @@ pub mod boxed_state_store { #[async_trait::async_trait] pub trait DynamicDispatchedStateStoreExt: StaticSendSync { - async fn try_wait_epoch(&self, epoch: HummockReadEpoch) -> StorageResult<()>; + async fn try_wait_epoch( + &self, + epoch: HummockReadEpoch, + options: TryWaitEpochOptions, + ) -> StorageResult<()>; fn sync( &self, @@ -1147,8 +1152,12 @@ pub mod boxed_state_store { #[async_trait::async_trait] impl DynamicDispatchedStateStoreExt for S { - async fn try_wait_epoch(&self, epoch: HummockReadEpoch) -> StorageResult<()> { - self.try_wait_epoch(epoch).await + async fn try_wait_epoch( + &self, + epoch: HummockReadEpoch, + options: TryWaitEpochOptions, + ) -> StorageResult<()> { + self.try_wait_epoch(epoch, options).await } fn sync( @@ -1233,8 +1242,9 @@ pub mod boxed_state_store { fn try_wait_epoch( &self, epoch: HummockReadEpoch, + options: TryWaitEpochOptions, ) -> impl Future> + Send + '_ { - self.deref().try_wait_epoch(epoch) + self.deref().try_wait_epoch(epoch, options) } fn sync( diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index 254e8e73095b1..b5049c7ba1cda 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -49,6 +49,7 @@ use crate::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew}; use crate::row_serde::{find_columns_by_ids, ColumnMapping}; use crate::store::{ PrefetchOptions, ReadLogOptions, ReadOptions, StateStoreIter, StateStoreIterExt, + TryWaitEpochOptions, }; use crate::table::merge_sort::merge_sort; use crate::table::{ChangeLogRow, KeyedRow, TableDistribution, TableIter}; @@ -361,11 +362,15 @@ impl StorageTableInner { ) -> StorageResult> { let epoch = wait_epoch.get_epoch(); let read_backup = matches!(wait_epoch, HummockReadEpoch::Backup(_)); - let read_committed = matches!( - wait_epoch, - HummockReadEpoch::TimeTravel(_) | HummockReadEpoch::Committed(_) - ); - self.store.try_wait_epoch(wait_epoch).await?; + let read_committed = wait_epoch.is_read_committed(); + self.store + .try_wait_epoch( + wait_epoch, + TryWaitEpochOptions { + table_id: self.table_id, + }, + ) + .await?; let serialized_pk = serialize_pk_with_vnode( &pk, &self.pk_serializer, @@ -490,10 +495,7 @@ impl StorageTableInner { let iterators: Vec<_> = try_join_all(table_key_ranges.map(|table_key_range| { let prefix_hint = prefix_hint.clone(); let read_backup = matches!(wait_epoch, HummockReadEpoch::Backup(_)); - let read_committed = matches!( - wait_epoch, - HummockReadEpoch::TimeTravel(_) | HummockReadEpoch::Committed(_) - ); + let read_committed = wait_epoch.is_read_committed(); async move { let read_options = ReadOptions { prefix_hint, @@ -752,7 +754,7 @@ impl StorageTableInner { pub async fn batch_iter_log_with_pk_bounds( &self, start_epoch: u64, - end_epoch: u64, + end_epoch: HummockReadEpoch, ) -> StorageResult> + Send + 'static> { let pk_prefix = OwnedRow::default(); let start_key = self.serialize_pk_bound(&pk_prefix, Unbounded, true); @@ -872,7 +874,14 @@ impl StorageTableInnerIterInner { epoch: HummockReadEpoch, ) -> StorageResult { let raw_epoch = epoch.get_epoch(); - store.try_wait_epoch(epoch).await?; + store + .try_wait_epoch( + epoch, + TryWaitEpochOptions { + table_id: read_options.table_id, + }, + ) + .await?; let iter = store.iter(table_key_range, raw_epoch, read_options).await?; let iter = Self { iter, @@ -972,13 +981,22 @@ impl StorageTableInnerIterLogInner { table_key_range: TableKeyRange, read_options: ReadLogOptions, start_epoch: u64, - end_epoch: u64, + end_epoch: HummockReadEpoch, ) -> StorageResult { store - .try_wait_epoch(HummockReadEpoch::Committed(end_epoch)) + .try_wait_epoch( + end_epoch, + TryWaitEpochOptions { + table_id: read_options.table_id, + }, + ) .await?; let iter = store - .iter_log((start_epoch, end_epoch), table_key_range, read_options) + .iter_log( + (start_epoch, end_epoch.get_epoch()), + table_key_range, + read_options, + ) .await?; let iter = Self { iter, diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index 3a9249ae26259..578197bee2092 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -455,6 +455,7 @@ mod tests { use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_hummock_test::test_utils::prepare_hummock_test_env; use risingwave_storage::hummock::HummockStorage; + use risingwave_storage::store::TryWaitEpochOptions; use risingwave_storage::StateStore; use crate::common::log_store_impl::kv_log_store::reader::KvLogStoreReader; @@ -685,7 +686,10 @@ mod tests { .unwrap(); test_env .storage - .try_wait_epoch(HummockReadEpoch::Committed(epoch2)) + .try_wait_epoch( + HummockReadEpoch::Committed(epoch2), + TryWaitEpochOptions::for_test(table.id.into()), + ) .await .unwrap(); @@ -901,7 +905,10 @@ mod tests { test_env.commit_epoch(epoch2).await; test_env .storage - .try_wait_epoch(HummockReadEpoch::Committed(epoch2)) + .try_wait_epoch( + HummockReadEpoch::Committed(epoch2), + TryWaitEpochOptions::for_test(table.id.into()), + ) .await .unwrap(); @@ -1137,7 +1144,10 @@ mod tests { test_env.commit_epoch(epoch2).await; test_env .storage - .try_wait_epoch(HummockReadEpoch::Committed(epoch2)) + .try_wait_epoch( + HummockReadEpoch::Committed(epoch2), + TryWaitEpochOptions::for_test(table.id.into()), + ) .await .unwrap(); @@ -1401,7 +1411,10 @@ mod tests { test_env .storage - .try_wait_epoch(HummockReadEpoch::Committed(epoch3)) + .try_wait_epoch( + HummockReadEpoch::Committed(epoch3), + TryWaitEpochOptions::for_test(table.id.into()), + ) .await .unwrap(); diff --git a/src/stream/src/executor/backfill/snapshot_backfill.rs b/src/stream/src/executor/backfill/snapshot_backfill.rs index a3cba25a0572a..89801a3cf4133 100644 --- a/src/stream/src/executor/backfill/snapshot_backfill.rs +++ b/src/stream/src/executor/backfill/snapshot_backfill.rs @@ -218,13 +218,12 @@ impl SnapshotBackfillExecutor { // use `upstream_buffer.run_future` to poll upstream concurrently so that we won't have back-pressure // on the upstream. Otherwise, in `batch_iter_log_with_pk_bounds`, we may wait upstream epoch to be committed, // and the back-pressure may cause the upstream unable to consume the barrier and then cause deadlock. - let stream = - upstream_buffer - .run_future(self.upstream_table.batch_iter_log_with_pk_bounds( - barrier_epoch.prev, - barrier_epoch.prev, - )) - .await?; + let stream = upstream_buffer + .run_future(self.upstream_table.batch_iter_log_with_pk_bounds( + barrier_epoch.prev, + HummockReadEpoch::Committed(barrier_epoch.prev), + )) + .await?; let data_types = self.upstream_table.schema().data_types(); let builder = create_builder(None, self.chunk_size, data_types); let stream = read_change_log(stream, builder); diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index 77cdc90e546f5..6b9756733f51f 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -32,6 +32,7 @@ use risingwave_connector::source::{ SplitMetaData, }; use risingwave_hummock_sdk::HummockReadEpoch; +use risingwave_storage::store::TryWaitEpochOptions; use serde::{Deserialize, Serialize}; use thiserror_ext::AsReport; @@ -417,6 +418,7 @@ impl SourceBackfillExecutorInner { } let state_store = self.backfill_state_store.state_store.state_store().clone(); + let table_id = self.backfill_state_store.state_store.table_id().into(); static STATE_TABLE_INITIALIZED: Once = Once::new(); tokio::spawn(async move { // This is for self.backfill_finished() to be safe. @@ -424,7 +426,10 @@ impl SourceBackfillExecutorInner { let epoch = barrier.epoch.curr; tracing::info!("waiting for epoch: {}", epoch); state_store - .try_wait_epoch(HummockReadEpoch::Committed(epoch)) + .try_wait_epoch( + HummockReadEpoch::Committed(epoch), + TryWaitEpochOptions { table_id }, + ) .await .expect("failed to wait epoch"); STATE_TABLE_INITIALIZED.call_once(|| ()); diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index dd93ac85d1f1c..d4a02ce462441 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -20,6 +20,7 @@ use either::Either; use futures::TryStreamExt; use itertools::Itertools; use risingwave_common::array::ArrayRef; +use risingwave_common::catalog::TableId; use risingwave_common::metrics::{LabelGuardedIntCounter, GLOBAL_ERROR_METRICS}; use risingwave_common::system_param::local_manager::SystemParamsReaderRef; use risingwave_common::system_param::reader::SystemParamsRead; @@ -32,6 +33,7 @@ use risingwave_connector::source::{ SplitMetaData, WaitCheckpointTask, }; use risingwave_hummock_sdk::HummockReadEpoch; +use risingwave_storage::store::TryWaitEpochOptions; use thiserror_ext::AsReport; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::{mpsc, oneshot}; @@ -104,6 +106,7 @@ impl SourceExecutor { let wait_checkpoint_worker = WaitCheckpointWorker { wait_checkpoint_rx, state_store: core.split_state_store.state_table.state_store().clone(), + table_id: core.split_state_store.state_table.table_id().into(), }; tokio::spawn(wait_checkpoint_worker.run()); Ok(Some(WaitCheckpointTaskBuilder { @@ -820,6 +823,7 @@ impl WaitCheckpointTaskBuilder { struct WaitCheckpointWorker { wait_checkpoint_rx: UnboundedReceiver<(Epoch, WaitCheckpointTask)>, state_store: S, + table_id: TableId, } impl WaitCheckpointWorker { @@ -832,7 +836,12 @@ impl WaitCheckpointWorker { tracing::debug!("start to wait epoch {}", epoch.0); let ret = self .state_store - .try_wait_epoch(HummockReadEpoch::Committed(epoch.0)) + .try_wait_epoch( + HummockReadEpoch::Committed(epoch.0), + TryWaitEpochOptions { + table_id: self.table_id, + }, + ) .await; match ret {