diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index 2054bd4dbb5d..0f42c1775137 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -30,6 +30,7 @@ use risingwave_hummock_sdk::{ use risingwave_pb::hummock::compact_task::{self}; use risingwave_pb::hummock::HummockSnapshot; use sea_orm::TransactionTrait; +use tracing::warn; use crate::hummock::error::{Error, Result}; use crate::hummock::manager::compaction_group_manager::CompactionGroupManager; @@ -269,19 +270,28 @@ impl HummockManager { // TODO: remove the sanity check when supporting partial checkpoint assert_eq!(1, table_committed_epoch.len()); - assert_eq!( - table_committed_epoch.iter().next().expect("non-empty"), - ( - &epoch, - &version - .latest_version() - .state_table_info - .info() - .keys() - .cloned() - .collect() - ) - ); + { + let (table_committed_epoch, committed_table_ids) = + table_committed_epoch.iter().next().expect("non-empty"); + assert_eq!(*table_committed_epoch, epoch); + let table_ids: HashSet<_> = version + .latest_version() + .state_table_info + .info() + .keys() + .cloned() + .collect(); + assert!(table_ids.is_subset(committed_table_ids), "hummock table ids {table_ids:?} not a subset of table ids to commit{committed_table_ids:?}"); + if cfg!(debug_assertions) { + assert_eq!(&table_ids, committed_table_ids); + } else if table_ids != *committed_table_ids { + let extra_table_ids = committed_table_ids - &table_ids; + warn!( + ?extra_table_ids, + "ignore extra table ids that are not previously registered" + ); + } + } // Apply stats changes. let mut version_stats = HummockVersionStatsTransaction::new( diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 0fe019d9afcd..676472e969aa 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -358,10 +358,6 @@ impl HummockVersion { new_sst_start_id: u64, ) { let mut new_sst_id = new_sst_start_id; - if !self.levels.contains_key(&parent_group_id) { - warn!(parent_group_id, "non-existing parent group id to init from"); - return; - } if parent_group_id == StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId { if new_sst_start_id != 0 { if cfg!(debug_assertions) { @@ -378,6 +374,10 @@ impl HummockVersion { } return; } + if !self.levels.contains_key(&parent_group_id) { + warn!(parent_group_id, "non-existing parent group id to init from"); + return; + } let [parent_levels, cur_levels] = self .levels .get_many_mut([&parent_group_id, &group_id]) diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index cd177e4cf34f..5b254556575a 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -427,10 +427,18 @@ impl HummockEventHandler { } } if !pending.is_empty() { - warn!( - pending_count = pending.len(), - total_count, "cannot acquire lock for all read version" - ); + if pending.len() * 10 > total_count { + // Only print warn log when failed to acquire more than 10% + warn!( + pending_count = pending.len(), + total_count, "cannot acquire lock for all read version" + ); + } else { + debug!( + pending_count = pending.len(), + total_count, "cannot acquire lock for all read version" + ); + } } const TRY_LOCK_TIMEOUT: Duration = Duration::from_millis(1); diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index deef394d2882..23ada8660614 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -458,7 +458,7 @@ impl HummockStorage { v.vnodes().iter_ones().collect_vec() }) .collect_vec(); - panic!("There are {} read version associated with vnode {}. read_version_vnodes={:?}", read_version_vnodes.len(), vnode.to_index(), read_version_vnodes); + return Err(HummockError::other(format!("There are {} read version associated with vnode {}. read_version_vnodes={:?}", read_version_vnodes.len(), vnode.to_index(), read_version_vnodes)).into()); } read_filter_for_version( epoch, diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index ae0d775219c5..2221b4350ebc 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -48,8 +48,8 @@ use crate::hummock::utils::{ }; use crate::hummock::write_limiter::WriteLimiterRef; use crate::hummock::{ - BackwardSstableIterator, MemoryLimiter, SstableIterator, SstableIteratorReadOptions, - SstableStoreRef, + BackwardSstableIterator, HummockError, MemoryLimiter, SstableIterator, + SstableIteratorReadOptions, SstableStoreRef, }; use crate::mem_table::{KeyOp, MemTable, MemTableHummockIterator, MemTableHummockRevIterator}; use crate::monitor::{HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic}; @@ -495,7 +495,9 @@ impl LocalStateStore for LocalHummockStorage { instance_id: self.instance_id(), init_epoch: options.epoch.curr, }) - .expect("should succeed"); + .map_err(|_| { + HummockError::other("failed to send InitEpoch. maybe shutting down") + })?; } Ok(()) } @@ -528,14 +530,17 @@ impl LocalStateStore for LocalHummockStorage { }); } } - if !self.is_replicated { - self.event_sender + if !self.is_replicated + && self + .event_sender .send(HummockEvent::LocalSealEpoch { instance_id: self.instance_id(), next_epoch, opts, }) - .expect("should be able to send"); + .is_err() + { + warn!("failed to send LocalSealEpoch. maybe shutting down"); } } @@ -624,7 +629,9 @@ impl LocalHummockStorage { if !self.is_replicated { self.event_sender .send(HummockEvent::ImmToUploader { instance_id, imm }) - .unwrap(); + .map_err(|_| { + HummockError::other("failed to send imm to uploader. maybe shutting down") + })?; } imm_size } else {