diff --git a/proto/common.proto b/proto/common.proto index 05d938cc26523..1030d07d7c343 100644 --- a/proto/common.proto +++ b/proto/common.proto @@ -105,9 +105,14 @@ message WorkerSlotMapping { repeated uint64 data = 2; } +message BatchQueryCommittedEpoch { + uint64 epoch = 1; + uint64 hummock_version_id = 2; +} + message BatchQueryEpoch { oneof epoch { - uint64 committed = 1; + BatchQueryCommittedEpoch committed = 1; uint64 current = 2; uint64 backup = 3; uint64 time_travel = 4; diff --git a/src/batch/src/executor/log_row_seq_scan.rs b/src/batch/src/executor/log_row_seq_scan.rs index a36440f1e010d..b71772e18768e 100644 --- a/src/batch/src/executor/log_row_seq_scan.rs +++ b/src/batch/src/executor/log_row_seq_scan.rs @@ -25,6 +25,7 @@ use risingwave_common::catalog::{ColumnId, Field, Schema}; use risingwave_common::hash::VirtualNode; use risingwave_common::row::{Row, RowExt}; use risingwave_common::types::ScalarImpl; +use risingwave_hummock_sdk::{HummockReadEpoch, HummockVersionId}; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch}; use risingwave_pb::plan_common::StorageTableDesc; @@ -50,6 +51,7 @@ pub struct LogRowSeqScanExecutor { table: StorageTable, old_epoch: u64, new_epoch: u64, + version_id: HummockVersionId, } impl LogRowSeqScanExecutor { @@ -57,6 +59,7 @@ impl LogRowSeqScanExecutor { table: StorageTable, old_epoch: u64, new_epoch: u64, + version_id: HummockVersionId, chunk_size: usize, identity: String, metrics: Option, @@ -74,6 +77,7 @@ impl LogRowSeqScanExecutor { table, old_epoch, new_epoch, + version_id, } } } @@ -128,12 +132,18 @@ impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder { unreachable!("invalid new epoch: {:?}", log_store_seq_scan_node.new_epoch) }; + assert_eq!(old_epoch.hummock_version_id, new_epoch.hummock_version_id); + let version_id = old_epoch.hummock_version_id; + let old_epoch = old_epoch.epoch; + let new_epoch = new_epoch.epoch; + dispatch_state_store!(source.context().state_store(), state_store, { let table = StorageTable::new_partial(state_store, column_ids, vnodes, table_desc); Ok(Box::new(LogRowSeqScanExecutor::new( table, - *old_epoch, - *new_epoch, + old_epoch, + new_epoch, + HummockVersionId::new(version_id), chunk_size as usize, source.plan_node().get_identity().clone(), metrics, @@ -164,6 +174,7 @@ impl LogRowSeqScanExecutor { table, old_epoch, new_epoch, + version_id, schema, .. } = *self; @@ -180,6 +191,7 @@ impl LogRowSeqScanExecutor { table.clone(), old_epoch, new_epoch, + version_id, chunk_size, histogram, Arc::new(schema.clone()), @@ -196,13 +208,17 @@ impl LogRowSeqScanExecutor { table: Arc>, old_epoch: u64, new_epoch: u64, + version_id: HummockVersionId, chunk_size: usize, histogram: Option>, schema: Arc, ) { // Range Scan. let iter = table - .batch_iter_log_with_pk_bounds(old_epoch, new_epoch) + .batch_iter_log_with_pk_bounds( + old_epoch, + HummockReadEpoch::BatchQueryCommitted(new_epoch, version_id), + ) .await? .flat_map(|r| { futures::stream::iter(std::iter::from_coroutine( diff --git a/src/batch/src/executor/mod.rs b/src/batch/src/executor/mod.rs index 07be18ca72988..53dabccaf260f 100644 --- a/src/batch/src/executor/mod.rs +++ b/src/batch/src/executor/mod.rs @@ -260,7 +260,7 @@ impl<'a, C: BatchTaskContext> ExecutorBuilder<'a, C> { #[cfg(test)] mod tests { - use risingwave_hummock_sdk::to_committed_batch_query_epoch; + use risingwave_hummock_sdk::test_batch_query_epoch; use risingwave_pb::batch_plan::PlanNode; use crate::executor::ExecutorBuilder; @@ -278,7 +278,7 @@ mod tests { &plan_node, task_id, ComputeNodeContext::for_test(), - to_committed_batch_query_epoch(u64::MAX), + test_batch_query_epoch(), ShutdownToken::empty(), ); let child_plan = &PlanNode::default(); diff --git a/src/batch/src/task/task_manager.rs b/src/batch/src/task/task_manager.rs index f25ce88379905..4db15df2dbe85 100644 --- a/src/batch/src/task/task_manager.rs +++ b/src/batch/src/task/task_manager.rs @@ -137,12 +137,12 @@ impl BatchManager { tid: &PbTaskId, plan: PlanFragment, ) -> Result<()> { - use risingwave_hummock_sdk::to_committed_batch_query_epoch; + use risingwave_hummock_sdk::test_batch_query_epoch; self.fire_task( tid, plan, - to_committed_batch_query_epoch(0), + test_batch_query_epoch(), ComputeNodeContext::for_test(), StateReporter::new_with_test(), TracingContext::none(), diff --git a/src/compute/tests/cdc_tests.rs b/src/compute/tests/cdc_tests.rs index 8fea0f48fa82d..fa92e4eefc3a9 100644 --- a/src/compute/tests/cdc_tests.rs +++ b/src/compute/tests/cdc_tests.rs @@ -39,7 +39,7 @@ use risingwave_connector::source::cdc::external::{ }; use risingwave_connector::source::cdc::DebeziumCdcSplit; use risingwave_connector::source::SplitImpl; -use risingwave_hummock_sdk::to_committed_batch_query_epoch; +use risingwave_hummock_sdk::test_batch_query_epoch; use risingwave_storage::memory::MemoryStateStore; use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_stream::common::table::state_table::StateTable; @@ -384,7 +384,7 @@ async fn test_cdc_backfill() -> StreamResult<()> { table.clone(), vec![ScanRange::full()], true, - to_committed_batch_query_epoch(u64::MAX), + test_batch_query_epoch(), 1024, "RowSeqExecutor2".to_string(), None, diff --git a/src/compute/tests/integration_tests.rs b/src/compute/tests/integration_tests.rs index 13a76c6989b48..f36cd6cf8164f 100644 --- a/src/compute/tests/integration_tests.rs +++ b/src/compute/tests/integration_tests.rs @@ -41,7 +41,7 @@ use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_connector::source::reader::desc::test_utils::create_source_desc_builder; use risingwave_dml::dml_manager::DmlManager; -use risingwave_hummock_sdk::to_committed_batch_query_epoch; +use risingwave_hummock_sdk::test_batch_query_epoch; use risingwave_pb::catalog::StreamSourceInfo; use risingwave_pb::plan_common::PbRowFormatType; use risingwave_storage::memory::MemoryStateStore; @@ -263,7 +263,7 @@ async fn test_table_materialize() -> StreamResult<()> { table.clone(), vec![ScanRange::full()], true, - to_committed_batch_query_epoch(u64::MAX), + test_batch_query_epoch(), 1024, "RowSeqExecutor2".to_string(), None, @@ -334,7 +334,7 @@ async fn test_table_materialize() -> StreamResult<()> { table.clone(), vec![ScanRange::full()], true, - to_committed_batch_query_epoch(u64::MAX), + test_batch_query_epoch(), 1024, "RowSeqScanExecutor2".to_string(), None, @@ -414,7 +414,7 @@ async fn test_table_materialize() -> StreamResult<()> { table, vec![ScanRange::full()], true, - to_committed_batch_query_epoch(u64::MAX), + test_batch_query_epoch(), 1024, "RowSeqScanExecutor2".to_string(), None, @@ -490,7 +490,7 @@ async fn test_row_seq_scan() -> StreamResult<()> { table, vec![ScanRange::full()], true, - to_committed_batch_query_epoch(u64::MAX), + test_batch_query_epoch(), 1, "RowSeqScanExecutor2".to_string(), None, diff --git a/src/frontend/src/optimizer/plan_node/batch_log_seq_scan.rs b/src/frontend/src/optimizer/plan_node/batch_log_seq_scan.rs index 93132ce65e51c..de8cadda4502d 100644 --- a/src/frontend/src/optimizer/plan_node/batch_log_seq_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_log_seq_scan.rs @@ -15,7 +15,7 @@ use pretty_xmlish::{Pretty, XmlNode}; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::LogRowSeqScanNode; -use risingwave_pb::common::BatchQueryEpoch; +use risingwave_pb::common::{BatchQueryCommittedEpoch, BatchQueryEpoch}; use super::batch::prelude::*; use super::utils::{childless_record, Distill}; @@ -112,12 +112,18 @@ impl TryToBatchPb for BatchLogSeqScan { vnode_bitmap: None, old_epoch: Some(BatchQueryEpoch { epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::Committed( - self.core.old_epoch, + BatchQueryCommittedEpoch { + epoch: self.core.old_epoch, + hummock_version_id: 0, + }, )), }), new_epoch: Some(BatchQueryEpoch { epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::Committed( - self.core.new_epoch, + BatchQueryCommittedEpoch { + epoch: self.core.new_epoch, + hummock_version_id: 0, + }, )), }), })) diff --git a/src/frontend/src/optimizer/plan_node/generic/log_scan.rs b/src/frontend/src/optimizer/plan_node/generic/log_scan.rs index 9a1e53aeab758..793cbaea4f205 100644 --- a/src/frontend/src/optimizer/plan_node/generic/log_scan.rs +++ b/src/frontend/src/optimizer/plan_node/generic/log_scan.rs @@ -20,6 +20,7 @@ use pretty_xmlish::Pretty; use risingwave_common::catalog::{Field, Schema, TableDesc}; use risingwave_common::types::DataType; use risingwave_common::util::sort_util::ColumnOrder; +use risingwave_hummock_sdk::HummockVersionId; use crate::catalog::ColumnId; use crate::optimizer::optimizer_context::OptimizerContextRef; @@ -44,6 +45,7 @@ pub struct LogScan { pub old_epoch: u64, pub new_epoch: u64, + pub version_id: HummockVersionId, } impl LogScan { @@ -101,6 +103,7 @@ impl LogScan { ctx: OptimizerContextRef, old_epoch: u64, new_epoch: u64, + version_id: HummockVersionId, ) -> Self { Self { table_name, @@ -110,6 +113,7 @@ impl LogScan { ctx, old_epoch, new_epoch, + version_id, } } diff --git a/src/frontend/src/scheduler/snapshot.rs b/src/frontend/src/scheduler/snapshot.rs index 49667b2aa143b..bb1d98aa5f8f7 100644 --- a/src/frontend/src/scheduler/snapshot.rs +++ b/src/frontend/src/scheduler/snapshot.rs @@ -22,7 +22,7 @@ use risingwave_hummock_sdk::version::HummockVersionStateTableInfo; use risingwave_hummock_sdk::{ FrontendHummockVersion, FrontendHummockVersionDelta, HummockVersionId, INVALID_VERSION_ID, }; -use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch}; +use risingwave_pb::common::{batch_query_epoch, BatchQueryCommittedEpoch, BatchQueryEpoch}; use risingwave_pb::hummock::{HummockVersionDeltas, StateTableInfoDelta}; use tokio::sync::watch; @@ -55,7 +55,10 @@ impl ReadSnapshot { Ok(match self { ReadSnapshot::FrontendPinned { snapshot } => BatchQueryEpoch { epoch: Some(batch_query_epoch::Epoch::Committed( - snapshot.batch_query_epoch(read_storage_tables)?.0, + BatchQueryCommittedEpoch { + epoch: snapshot.batch_query_epoch(read_storage_tables)?.0, + hummock_version_id: snapshot.value.id.to_u64(), + }, )), }, ReadSnapshot::ReadUncommitted => BatchQueryEpoch { diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index aa798a236b5c8..5270aced1f13f 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -30,6 +30,7 @@ use risingwave_common::catalog::Field; use risingwave_common::error::BoxedError; use risingwave_common::session_config::QueryMode; use risingwave_common::types::DataType; +use risingwave_hummock_sdk::HummockVersionId; use risingwave_sqlparser::ast::{Ident, ObjectName, Statement}; use super::SessionImpl; @@ -523,6 +524,18 @@ impl SubscriptionCursor { let init_query_timer = Instant::now(); let (chunk_stream, fields) = if let Some(rw_timestamp) = rw_timestamp { let context = OptimizerContext::from_handler_args(handle_args); + let version_id = { + let version = session.env.hummock_snapshot_manager.acquire(); + let version = version.version(); + if !version + .state_table_info + .info() + .contains_key(dependent_table_id) + { + return Err(anyhow!("table id {dependent_table_id} has been dropped").into()); + } + version.id + }; let plan_fragmenter_result = gen_batch_plan_fragmenter( &session, Self::create_batch_plan_for_cursor( @@ -531,6 +544,7 @@ impl SubscriptionCursor { context.into(), rw_timestamp, rw_timestamp, + version_id, )?, )?; create_chunk_stream_for_cursor(session, plan_fragmenter_result).await? @@ -606,6 +620,7 @@ impl SubscriptionCursor { context: OptimizerContextRef, old_epoch: u64, new_epoch: u64, + version_id: HummockVersionId, ) -> Result { let out_col_idx = table_catalog .columns @@ -621,6 +636,7 @@ impl SubscriptionCursor { context, old_epoch, new_epoch, + version_id, ); let batch_log_seq_scan = BatchLogSeqScan::new(core); let schema = batch_log_seq_scan diff --git a/src/storage/hummock_sdk/src/lib.rs b/src/storage/hummock_sdk/src/lib.rs index 9eed880f8a5be..f1fee55fb1fca 100644 --- a/src/storage/hummock_sdk/src/lib.rs +++ b/src/storage/hummock_sdk/src/lib.rs @@ -208,8 +208,10 @@ impl PartialEq for LocalSstableInfo { /// Package read epoch of hummock, it be used for `wait_epoch` #[derive(Debug, Clone, Copy)] pub enum HummockReadEpoch { - /// We need to wait the `max_committed_epoch` + /// We need to wait the `committed_epoch` of the read table Committed(HummockEpoch), + /// We need to wait the `committed_epoch` of the read table and also the hummock version to the version id + BatchQueryCommitted(HummockEpoch, HummockVersionId), /// We don't need to wait epoch, we usually do stream reading with it. NoWait(HummockEpoch), /// We don't need to wait epoch. @@ -220,7 +222,10 @@ pub enum HummockReadEpoch { impl From for HummockReadEpoch { fn from(e: BatchQueryEpoch) -> Self { match e.epoch.unwrap() { - batch_query_epoch::Epoch::Committed(epoch) => HummockReadEpoch::Committed(epoch), + batch_query_epoch::Epoch::Committed(epoch) => HummockReadEpoch::BatchQueryCommitted( + epoch.epoch, + HummockVersionId::new(epoch.hummock_version_id), + ), batch_query_epoch::Epoch::Current(epoch) => { if epoch != HummockEpoch::MAX { warn!( @@ -236,19 +241,20 @@ impl From for HummockReadEpoch { } } -pub fn to_committed_batch_query_epoch(epoch: u64) -> BatchQueryEpoch { +pub fn test_batch_query_epoch() -> BatchQueryEpoch { BatchQueryEpoch { - epoch: Some(batch_query_epoch::Epoch::Committed(epoch)), + epoch: Some(batch_query_epoch::Epoch::Current(u64::MAX)), } } impl HummockReadEpoch { pub fn get_epoch(&self) -> HummockEpoch { *match self { - HummockReadEpoch::Committed(epoch) => epoch, - HummockReadEpoch::NoWait(epoch) => epoch, - HummockReadEpoch::Backup(epoch) => epoch, - HummockReadEpoch::TimeTravel(epoch) => epoch, + HummockReadEpoch::Committed(epoch) + | HummockReadEpoch::BatchQueryCommitted(epoch, _) + | HummockReadEpoch::NoWait(epoch) + | HummockReadEpoch::Backup(epoch) + | HummockReadEpoch::TimeTravel(epoch) => epoch, } } } diff --git a/src/storage/hummock_trace/src/opts.rs b/src/storage/hummock_trace/src/opts.rs index e337152289bcc..480f0315eb396 100644 --- a/src/storage/hummock_trace/src/opts.rs +++ b/src/storage/hummock_trace/src/opts.rs @@ -18,7 +18,7 @@ use risingwave_common::bitmap::Bitmap; use risingwave_common::cache::CachePriority; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::util::epoch::EpochPair; -use risingwave_hummock_sdk::HummockReadEpoch; +use risingwave_hummock_sdk::{HummockReadEpoch, HummockVersionId}; use risingwave_pb::common::PbBuffer; use crate::TracedBytes; @@ -197,6 +197,7 @@ pub type TracedHummockEpoch = u64; #[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)] pub enum TracedHummockReadEpoch { Committed(TracedHummockEpoch), + BatchQueryReadCommitted(TracedHummockEpoch, u64), NoWait(TracedHummockEpoch), Backup(TracedHummockEpoch), TimeTravel(TracedHummockEpoch), @@ -206,6 +207,9 @@ impl From for TracedHummockReadEpoch { fn from(value: HummockReadEpoch) -> Self { match value { HummockReadEpoch::Committed(epoch) => Self::Committed(epoch), + HummockReadEpoch::BatchQueryCommitted(epoch, version_id) => { + Self::BatchQueryReadCommitted(epoch, version_id.to_u64()) + } HummockReadEpoch::NoWait(epoch) => Self::NoWait(epoch), HummockReadEpoch::Backup(epoch) => Self::Backup(epoch), HummockReadEpoch::TimeTravel(epoch) => Self::TimeTravel(epoch), @@ -217,6 +221,9 @@ impl From for HummockReadEpoch { fn from(value: TracedHummockReadEpoch) -> Self { match value { TracedHummockReadEpoch::Committed(epoch) => Self::Committed(epoch), + TracedHummockReadEpoch::BatchQueryReadCommitted(epoch, version_id) => { + Self::BatchQueryCommitted(epoch, HummockVersionId::new(version_id)) + } TracedHummockReadEpoch::NoWait(epoch) => Self::NoWait(epoch), TracedHummockReadEpoch::Backup(epoch) => Self::Backup(epoch), TracedHummockReadEpoch::TimeTravel(epoch) => Self::TimeTravel(epoch), diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index f665bf3edc15f..434a4ba989d52 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -53,7 +53,7 @@ use crate::hummock::local_version::pinned_version::{start_pinned_version_worker, use crate::hummock::local_version::recent_versions::RecentVersions; use crate::hummock::observer_manager::HummockObserverNode; use crate::hummock::time_travel_version_cache::SimpleTimeTravelVersionCache; -use crate::hummock::utils::wait_for_epoch; +use crate::hummock::utils::wait_for_update; use crate::hummock::write_limiter::{WriteLimiter, WriteLimiterRef}; use crate::hummock::{ HummockEpoch, HummockError, HummockResult, HummockStorageIterator, HummockStorageRevIterator, @@ -633,14 +633,54 @@ impl StateStore for HummockStorage { wait_epoch: HummockReadEpoch, options: TryWaitEpochOptions, ) -> StorageResult<()> { - let wait_epoch = match wait_epoch { - HummockReadEpoch::Committed(epoch) => { - assert!(!is_max_epoch(epoch), "epoch should not be MAX EPOCH"); - epoch - } + let (wait_epoch, wait_version) = match wait_epoch { + HummockReadEpoch::Committed(epoch) => (epoch, None), + HummockReadEpoch::BatchQueryCommitted(epoch, version_id) => (epoch, Some(version_id)), _ => return Ok(()), }; - wait_for_epoch(&self.version_update_notifier_tx, wait_epoch, options).await + assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH"); + let is_valid_version_id = |id| match wait_version { + None => true, + Some(version_id) => id >= version_id, + }; + // fast path by checking recent_versions + { + let recent_versions = self.recent_versions.load(); + let latest_version = recent_versions.latest_version().version(); + if is_valid_version_id(latest_version.id) + && let Some(committed_epoch) = + latest_version.table_committed_epoch(options.table_id) + && committed_epoch >= wait_epoch + { + return Ok(()); + } + } + wait_for_update( + &self.version_update_notifier_tx, + |version| { + if !is_valid_version_id(version.version().id) { + return Ok(false); + } + let committed_epoch = version + .version() + .table_committed_epoch(options.table_id) + .ok_or_else(|| { + HummockError::wait_epoch(format!( + "table id {} has been dropped", + options.table_id + )) + })?; + Ok(committed_epoch >= wait_epoch) + }, + || { + format!( + "try_wait_epoch: epoch: {}, version_id: {:?}", + wait_epoch, wait_version + ) + }, + ) + .await?; + Ok(()) } fn sync(&self, epoch: u64, table_ids: HashSet) -> impl SyncFuture { diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index ae3815ca551a5..95cee5899cdbf 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -45,7 +45,7 @@ use crate::hummock::shared_buffer::shared_buffer_batch::{ use crate::hummock::store::version::{read_filter_for_version, HummockVersionReader}; use crate::hummock::utils::{ do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, sanity_check_enabled, - wait_for_epoch, + wait_for_update, }; use crate::hummock::write_limiter::WriteLimiterRef; use crate::hummock::{ @@ -136,14 +136,43 @@ impl LocalHummockStorage { } async fn wait_for_epoch(&self, wait_epoch: u64) -> StorageResult<()> { - wait_for_epoch( + let mut prev_committed_epoch = None; + let prev_committed_epoch = &mut prev_committed_epoch; + wait_for_update( &self.version_update_notifier_tx, - wait_epoch, - TryWaitEpochOptions { - table_id: self.table_id, + |version| { + let committed_epoch = version.version().table_committed_epoch(self.table_id); + let ret = if let Some(committed_epoch) = committed_epoch { + if committed_epoch >= wait_epoch { + Ok(true) + } else { + Ok(false) + } + } else if prev_committed_epoch.is_none() { + warn!( + table_id = self.table_id.table_id, + version_id = version.id().to_u64(), + "table id not exist yet, wait for registering" + ); + Ok(false) + } else { + Err(HummockError::wait_epoch(format!( + "table {} has been dropped", + self.table_id + ))) + }; + *prev_committed_epoch = committed_epoch; + ret + }, + || { + format!( + "wait_for_epoch: epoch: {}, table_id: {}", + wait_epoch, self.table_id + ) }, ) - .await + .await?; + Ok(()) } pub async fn iter_flushed( diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index 235edc884ae5b..d9a006cdafede 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -33,13 +33,13 @@ use risingwave_hummock_sdk::key::{ use risingwave_hummock_sdk::sstable_info::SstableInfo; use tokio::sync::oneshot::{channel, Receiver, Sender}; -use super::{HummockError, SstableStoreRef}; +use super::{HummockError, HummockResult, SstableStoreRef}; use crate::error::StorageResult; use crate::hummock::local_version::pinned_version::PinnedVersion; use crate::hummock::CachePolicy; use crate::mem_table::{KeyOp, MemTableError}; use crate::monitor::MemoryCollector; -use crate::store::{OpConsistencyLevel, ReadOptions, StateStoreRead, TryWaitEpochOptions}; +use crate::store::{OpConsistencyLevel, ReadOptions, StateStoreRead}; pub fn range_overlap( search_key_range: &R, @@ -575,25 +575,15 @@ pub(crate) fn filter_with_delete_range<'a>( }) } -pub(crate) async fn wait_for_epoch( +pub(crate) async fn wait_for_update( notifier: &tokio::sync::watch::Sender, - wait_epoch: u64, - options: TryWaitEpochOptions, -) -> StorageResult<()> { + mut inspect_fn: impl FnMut(&PinnedVersion) -> HummockResult, + mut periodic_debug_info: impl FnMut() -> String, +) -> HummockResult<()> { let mut receiver = notifier.subscribe(); - let mut committed_epoch = { - // avoid unnecessary check in the loop if the value does not change - let committed_epoch = receiver - .borrow_and_update() - .version() - .table_committed_epoch(options.table_id); - if let Some(committed_epoch) = committed_epoch - && committed_epoch >= wait_epoch - { - return Ok(()); - } - committed_epoch - }; + if inspect_fn(&receiver.borrow_and_update())? { + return Ok(()); + } let start_time = Instant::now(); loop { match tokio::time::timeout(Duration::from_secs(30), receiver.changed()).await { @@ -608,29 +598,19 @@ pub(crate) async fn wait_for_epoch( // CN with the same distribution as the upstream MV. // See #3845 for more details. tracing::warn!( - epoch = wait_epoch, - ?committed_epoch, - table_id = options.table_id.table_id, + info = periodic_debug_info(), elapsed = ?start_time.elapsed(), - "wait_epoch timeout when waiting for version update", + "timeout when waiting for version update", ); continue; } Ok(Err(_)) => { - return Err(HummockError::wait_epoch("tx dropped").into()); + return Err(HummockError::wait_epoch("tx dropped")); } Ok(Ok(_)) => { - // TODO: should handle the corner case of drop table - let new_committed_epoch = receiver - .borrow() - .version() - .table_committed_epoch(options.table_id); - if let Some(committed_epoch) = new_committed_epoch - && committed_epoch >= wait_epoch - { + if inspect_fn(&receiver.borrow())? { return Ok(()); } - committed_epoch = new_committed_epoch; } } } diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index d368542db4a06..f7a8ac745cca6 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -760,7 +760,7 @@ impl StorageTableInner { pub async fn batch_iter_log_with_pk_bounds( &self, start_epoch: u64, - end_epoch: u64, + end_epoch: HummockReadEpoch, ) -> StorageResult> + Send + 'static> { let pk_prefix = OwnedRow::default(); let start_key = self.serialize_pk_bound(&pk_prefix, Unbounded, true); @@ -987,18 +987,22 @@ impl StorageTableInnerIterLogInner { table_key_range: TableKeyRange, read_options: ReadLogOptions, start_epoch: u64, - end_epoch: u64, + end_epoch: HummockReadEpoch, ) -> StorageResult { store .try_wait_epoch( - HummockReadEpoch::Committed(end_epoch), + end_epoch, TryWaitEpochOptions { table_id: read_options.table_id, }, ) .await?; let iter = store - .iter_log((start_epoch, end_epoch), table_key_range, read_options) + .iter_log( + (start_epoch, end_epoch.get_epoch()), + table_key_range, + read_options, + ) .await?; let iter = Self { iter, diff --git a/src/stream/src/executor/backfill/snapshot_backfill.rs b/src/stream/src/executor/backfill/snapshot_backfill.rs index a3cba25a0572a..89801a3cf4133 100644 --- a/src/stream/src/executor/backfill/snapshot_backfill.rs +++ b/src/stream/src/executor/backfill/snapshot_backfill.rs @@ -218,13 +218,12 @@ impl SnapshotBackfillExecutor { // use `upstream_buffer.run_future` to poll upstream concurrently so that we won't have back-pressure // on the upstream. Otherwise, in `batch_iter_log_with_pk_bounds`, we may wait upstream epoch to be committed, // and the back-pressure may cause the upstream unable to consume the barrier and then cause deadlock. - let stream = - upstream_buffer - .run_future(self.upstream_table.batch_iter_log_with_pk_bounds( - barrier_epoch.prev, - barrier_epoch.prev, - )) - .await?; + let stream = upstream_buffer + .run_future(self.upstream_table.batch_iter_log_with_pk_bounds( + barrier_epoch.prev, + HummockReadEpoch::Committed(barrier_epoch.prev), + )) + .await?; let data_types = self.upstream_table.schema().data_types(); let builder = create_builder(None, self.chunk_size, data_types); let stream = read_change_log(stream, builder);