From d338210554a2e2ffa48a549c4050f7f77fbb80a9 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Fri, 27 Sep 2024 15:07:14 +0800 Subject: [PATCH 1/5] refactor(storage): add SST sanity check during commit epoch --- proto/hummock.proto | 2 +- proto/stream_service.proto | 5 +++-- src/meta/src/barrier/mod.rs | 1 + src/meta/src/hummock/manager/context.rs | 10 ++++++++++ src/meta/src/hummock/manager/gc.rs | 15 ++++++++++++++- src/meta/src/hummock/manager/mod.rs | 3 +++ src/storage/hummock_sdk/src/lib.rs | 5 ++++- src/storage/hummock_test/src/vacuum_tests.rs | 6 +++--- src/storage/src/hummock/sstable/builder.rs | 15 ++++++++++++++- src/storage/src/hummock/vacuum.rs | 12 ++---------- src/stream/src/task/barrier_manager.rs | 6 ++++-- 11 files changed, 59 insertions(+), 21 deletions(-) diff --git a/proto/hummock.proto b/proto/hummock.proto index 546e7edc08370..cbb5a7dc74a3e 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -543,7 +543,7 @@ message VacuumTask { // Scan object store to get candidate orphan SSTs. message FullScanTask { - uint64 sst_retention_time_sec = 1; + uint64 sst_retention_watermark = 1; optional string prefix = 2; optional string start_after = 3; optional uint64 limit = 4; diff --git a/proto/stream_service.proto b/proto/stream_service.proto index ab56c9f7e4050..45703554c2367 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -32,13 +32,14 @@ message BarrierCompleteResponse { string request_id = 1; common.Status status = 2; repeated CreateMviewProgress create_mview_progress = 3; - message GroupedSstableInfo { + message LocalSstableInfo { reserved 1; reserved "compaction_group_id"; hummock.SstableInfo sst = 2; map table_stats_map = 3; + uint64 created_at = 4; } - repeated GroupedSstableInfo synced_sstables = 4; + repeated LocalSstableInfo synced_sstables = 4; uint32 worker_id = 5; map table_watermarks = 6; repeated hummock.SstableInfo old_value_sstables = 7; diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 1b00c9ddb7339..875bee7659730 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -1678,6 +1678,7 @@ fn collect_resp_info( LocalSstableInfo::new( sst_info.into(), from_prost_table_stats_map(grouped.table_stats_map), + grouped.created_at, ) }); synced_ssts.extend(ssts_iter); diff --git a/src/meta/src/hummock/manager/context.rs b/src/meta/src/hummock/manager/context.rs index 8f1a510dc07eb..2049a7736a953 100644 --- a/src/meta/src/hummock/manager/context.rs +++ b/src/meta/src/hummock/manager/context.rs @@ -241,6 +241,16 @@ impl HummockManager { } } + // TODO: add sanity check for serverless compaction + // sanity check to ensure SSTs to commit have not been full GCed yet. + let now = self.now(); + let sst_retention_watermark = now.saturating_sub(self.env.opts.min_sst_retention_time_sec); + for sst in sstables { + if sst.created_at < sst_retention_watermark { + return Err(anyhow::anyhow!("SST may have been GCed: SST timestamp {}, meta node timestamp {}, retention_sec {}, watermark {}", sst.created_at, now, self.env.opts.min_sst_retention_time_sec, sst_retention_watermark).into()); + } + } + async { if !self.env.opts.enable_committed_sst_sanity_check { return; diff --git a/src/meta/src/hummock/manager/gc.rs b/src/meta/src/hummock/manager/gc.rs index a548025781e17..2f77ab387a650 100644 --- a/src/meta/src/hummock/manager/gc.rs +++ b/src/meta/src/hummock/manager/gc.rs @@ -205,9 +205,10 @@ impl HummockManager { } Some(compactor) => compactor, }; + let sst_retention_watermark = self.now().saturating_sub(sst_retention_time.as_secs()); compactor .send_event(ResponseEvent::FullScanTask(FullScanTask { - sst_retention_time_sec: sst_retention_time.as_secs(), + sst_retention_watermark, prefix, start_after, limit, @@ -262,6 +263,18 @@ impl HummockManager { tracing::info!("GC watermark is {watermark}. Object full scan returns {candidate_object_number} objects. {after_watermark} remains after filtered by GC watermark. {after_time_travel} remains after filtered by time travel archives. {selected_object_number} remains after filtered by hummock version."); Ok(selected_object_number) } + + pub fn now(&self) -> u64 { + // TODO: persist now to maintain non-decreasing even after a meta node reboot. + let mut guard = self.now.lock(); + let new_now = chrono::Utc::now().timestamp().try_into().unwrap(); + if new_now < *guard { + tracing::warn!(old = *guard, new = new_now, "unexpected decreasing now"); + return *guard; + } + *guard = new_now; + new_now + } } pub struct FullGcState { diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index dea2314df75aa..cd15e655dbd64 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use arc_swap::ArcSwap; use bytes::Bytes; use itertools::Itertools; +use parking_lot::Mutex; use risingwave_common::monitor::MonitoredRwLock; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::util::epoch::INVALID_EPOCH; @@ -114,6 +115,7 @@ pub struct HummockManager { // and suggest types with a certain priority. pub compaction_state: CompactionState, full_gc_state: FullGcState, + now: Mutex, } pub type HummockManagerRef = Arc; @@ -287,6 +289,7 @@ impl HummockManager { compactor_streams_change_tx, compaction_state: CompactionState::new(), full_gc_state: FullGcState::new(Some(full_gc_object_limit)), + now: Mutex::new(0), }; let instance = Arc::new(instance); instance.init_time_travel_state().await?; diff --git a/src/storage/hummock_sdk/src/lib.rs b/src/storage/hummock_sdk/src/lib.rs index 6058d3a16d5e8..ec0c390fb010e 100644 --- a/src/storage/hummock_sdk/src/lib.rs +++ b/src/storage/hummock_sdk/src/lib.rs @@ -176,13 +176,15 @@ pub struct SyncResult { pub struct LocalSstableInfo { pub sst_info: SstableInfo, pub table_stats: TableStatsMap, + pub created_at: u64, } impl LocalSstableInfo { - pub fn new(sst_info: SstableInfo, table_stats: TableStatsMap) -> Self { + pub fn new(sst_info: SstableInfo, table_stats: TableStatsMap, created_at: u64) -> Self { Self { sst_info, table_stats, + created_at, } } @@ -190,6 +192,7 @@ impl LocalSstableInfo { Self { sst_info, table_stats: Default::default(), + created_at: u64::MAX, } } diff --git a/src/storage/hummock_test/src/vacuum_tests.rs b/src/storage/hummock_test/src/vacuum_tests.rs index e68a2de4b00ac..1de752339841b 100644 --- a/src/storage/hummock_test/src/vacuum_tests.rs +++ b/src/storage/hummock_test/src/vacuum_tests.rs @@ -91,7 +91,7 @@ async fn test_full_scan() { let object_metadata_iter = Box::pin(stream::iter(object_store_list_result.into_iter().map(Ok))); let task = FullScanTask { - sst_retention_time_sec: 10000, + sst_retention_watermark: 0, prefix: None, start_after: None, limit: None, @@ -102,7 +102,7 @@ async fn test_full_scan() { assert!(scan_result.is_empty()); let task = FullScanTask { - sst_retention_time_sec: 6000, + sst_retention_watermark: now_ts.sub(Duration::from_secs(6000)).as_secs(), prefix: None, start_after: None, limit: None, @@ -113,7 +113,7 @@ async fn test_full_scan() { assert_eq!(scan_result.into_iter().sorted().collect_vec(), vec![1]); let task = FullScanTask { - sst_retention_time_sec: 2000, + sst_retention_watermark: u64::MAX, prefix: None, start_after: None, limit: None, diff --git a/src/storage/src/hummock/sstable/builder.rs b/src/storage/src/hummock/sstable/builder.rs index 49caf0ba02568..24e7e14e02e0f 100644 --- a/src/storage/src/hummock/sstable/builder.rs +++ b/src/storage/src/hummock/sstable/builder.rs @@ -14,6 +14,7 @@ use std::collections::BTreeSet; use std::sync::Arc; +use std::time::SystemTime; use bytes::{Bytes, BytesMut}; use risingwave_hummock_sdk::key::{user_key, FullKey, MAX_KEY_LEN}; @@ -540,8 +541,20 @@ impl SstableBuilder { } let writer_output = self.writer.finish(meta).await?; + // The timestamp is only used during full GC. + // + // Ideally object store object's last_modified should be used. + // However, it'll incur additional IO overhead since S3 lacks an interface to retrieve the last_modified timestamp after the PUT operation on an object. + // + // The local timestamp below is expected to precede the last_modified of object store object, given that the object store object is created afterward. + // It should help alleviate the clock drift issue. + + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("Clock may have gone backwards") + .as_secs(); Ok(SstableBuilderOutput:: { - sst_info: LocalSstableInfo::new(sst_info, self.table_stats), + sst_info: LocalSstableInfo::new(sst_info, self.table_stats, now), writer_output, stats: SstableBuilderOutputStats { bloom_filter_size, diff --git a/src/storage/src/hummock/vacuum.rs b/src/storage/src/hummock/vacuum.rs index 17095e994e287..09c8f512adbeb 100644 --- a/src/storage/src/hummock/vacuum.rs +++ b/src/storage/src/hummock/vacuum.rs @@ -12,9 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::ops::Sub; use std::sync::Arc; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; use futures::{StreamExt, TryStreamExt}; use risingwave_hummock_sdk::HummockSstableObjectId; @@ -60,12 +58,6 @@ impl Vacuum { full_scan_task: FullScanTask, metadata_iter: ObjectMetadataIter, ) -> HummockResult<(Vec, u64, u64, Option)> { - let timestamp_watermark = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .sub(Duration::from_secs(full_scan_task.sst_retention_time_sec)) - .as_secs_f64(); - let mut total_object_count = 0; let mut total_object_size = 0; let mut next_start_after: Option = None; @@ -83,7 +75,7 @@ impl Vacuum { next_start_after = Some(o.key.clone()); tracing::debug!(next_start_after, "set next start after"); } - if o.last_modified < timestamp_watermark { + if o.last_modified < full_scan_task.sst_retention_watermark as f64 { Some(Ok(SstableStore::get_object_id_from_path(&o.key))) } else { None @@ -109,7 +101,7 @@ impl Vacuum { sstable_store: SstableStoreRef, ) -> HummockResult<(Vec, u64, u64, Option)> { tracing::info!( - timestamp = full_scan_task.sst_retention_time_sec, + sst_retention_watermark = full_scan_task.sst_retention_watermark, prefix = full_scan_task.prefix.as_ref().unwrap_or(&String::from("")), start_after = full_scan_task.start_after, limit = full_scan_task.limit, diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 242f21c17272b..139f58a696f42 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -24,7 +24,7 @@ use futures::StreamExt; use itertools::Itertools; use risingwave_common::error::tonic::extra::Score; use risingwave_pb::stream_service::barrier_complete_response::{ - GroupedSstableInfo, PbCreateMviewProgress, + PbCreateMviewProgress, PbLocalSstableInfo, }; use risingwave_rpc_client::error::{ToTonicStatus, TonicStatusWrapper}; use thiserror_ext::AsReport; @@ -494,9 +494,11 @@ impl LocalBarrierWorker { |LocalSstableInfo { sst_info, table_stats, - }| GroupedSstableInfo { + created_at, + }| PbLocalSstableInfo { sst: Some(sst_info.into()), table_stats_map: to_prost_table_stats_map(table_stats), + created_at, }, ) .collect_vec(), From 8c51252adfe0879e167320b0b69e50111e9f142a Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Sun, 29 Sep 2024 15:51:07 +0800 Subject: [PATCH 2/5] fix tests --- src/meta/src/hummock/manager/context.rs | 3 +- src/meta/src/hummock/manager/gc.rs | 16 ++------ src/meta/src/hummock/manager/tests.rs | 38 +++++++++++++++++++ .../hummock_test/src/hummock_storage_tests.rs | 1 + 4 files changed, 44 insertions(+), 14 deletions(-) diff --git a/src/meta/src/hummock/manager/context.rs b/src/meta/src/hummock/manager/context.rs index 2049a7736a953..164c112cadff0 100644 --- a/src/meta/src/hummock/manager/context.rs +++ b/src/meta/src/hummock/manager/context.rs @@ -242,12 +242,13 @@ impl HummockManager { } // TODO: add sanity check for serverless compaction + // sanity check to ensure SSTs to commit have not been full GCed yet. let now = self.now(); let sst_retention_watermark = now.saturating_sub(self.env.opts.min_sst_retention_time_sec); for sst in sstables { if sst.created_at < sst_retention_watermark { - return Err(anyhow::anyhow!("SST may have been GCed: SST timestamp {}, meta node timestamp {}, retention_sec {}, watermark {}", sst.created_at, now, self.env.opts.min_sst_retention_time_sec, sst_retention_watermark).into()); + return Err(anyhow::anyhow!("SST {} is rejected from being committed since it's below watermark: SST timestamp {}, meta node timestamp {}, retention_sec {}, watermark {}", sst.sst_info.sst_id, sst.created_at, now, self.env.opts.min_sst_retention_time_sec, sst_retention_watermark).into()); } } diff --git a/src/meta/src/hummock/manager/gc.rs b/src/meta/src/hummock/manager/gc.rs index 2f77ab387a650..bbb0114cf568f 100644 --- a/src/meta/src/hummock/manager/gc.rs +++ b/src/meta/src/hummock/manager/gc.rs @@ -183,7 +183,7 @@ impl HummockManager { prefix: Option, ) -> Result { self.metrics.full_gc_trigger_count.inc(); - // Set a minimum sst_retention_time to avoid deleting SSTs of on-going write op. + // Set a minimum sst_retention_time. let sst_retention_time = cmp::max( sst_retention_time, Duration::from_secs(self.env.opts.min_sst_retention_time_sec), @@ -419,17 +419,12 @@ mod tests { None ) .unwrap()); - let full_scan_task = match receiver.recv().await.unwrap().unwrap().event.unwrap() { + let _full_scan_task = match receiver.recv().await.unwrap().unwrap().event.unwrap() { ResponseEvent::FullScanTask(task) => task, _ => { panic!() } }; - // min_sst_retention_time_sec override user provided value. - assert_eq!( - hummock_manager.env.opts.min_sst_retention_time_sec, - full_scan_task.sst_retention_time_sec - ); assert!(hummock_manager .start_full_gc( @@ -437,17 +432,12 @@ mod tests { None ) .unwrap()); - let full_scan_task = match receiver.recv().await.unwrap().unwrap().event.unwrap() { + let _full_scan_task = match receiver.recv().await.unwrap().unwrap().event.unwrap() { ResponseEvent::FullScanTask(task) => task, _ => { panic!() } }; - // min_sst_retention_time_sec doesn't override user provided value. - assert_eq!( - hummock_manager.env.opts.min_sst_retention_time_sec + 1, - full_scan_task.sst_retention_time_sec - ); // Empty input results immediate return, without waiting heartbeat. hummock_manager diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index 9fa9b11a026cb..5a9aa15f7bee0 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -89,6 +89,7 @@ fn gen_local_sstable_info(sst_id: u64, table_ids: Vec, epoch: u64) -> Local LocalSstableInfo { sst_info: gen_sstable_info(sst_id, table_ids, epoch), table_stats: Default::default(), + created_at: u64::MAX, } } fn get_compaction_group_object_ids( @@ -794,6 +795,31 @@ async fn test_invalid_sst_id() { ); } + // reject due to SST's timestamp is below watermark + let ssts_below_watermerk = ssts + .iter() + .map(|s| LocalSstableInfo { + sst_info: s.sst_info.clone(), + table_stats: s.table_stats.clone(), + created_at: 0, + }) + .collect(); + let error = hummock_meta_client + .commit_epoch( + epoch, + SyncResult { + uncommitted_ssts: ssts_below_watermerk, + ..Default::default() + }, + false, + ) + .await + .unwrap_err(); + assert!(error + .as_report() + .to_string() + .contains("is rejected from being committed since it's below watermark")); + hummock_meta_client .commit_epoch( epoch, @@ -1275,6 +1301,7 @@ async fn test_version_stats() { .iter() .map(|table_id| (*table_id, table_stats_change.clone())) .collect(), + created_at: u64::MAX, }) .collect_vec(); hummock_meta_client @@ -1394,6 +1421,7 @@ async fn test_split_compaction_group_on_commit() { }, ), ]), + created_at: u64::MAX, }; hummock_meta_client .commit_epoch( @@ -1489,6 +1517,7 @@ async fn test_split_compaction_group_on_demand_basic() { ..Default::default() }, table_stats: Default::default(), + created_at: u64::MAX, }; let sst_2 = LocalSstableInfo { sst_info: SstableInfo { @@ -1507,6 +1536,7 @@ async fn test_split_compaction_group_on_demand_basic() { ..Default::default() }, table_stats: Default::default(), + created_at: u64::MAX, }; hummock_meta_client .commit_epoch( @@ -1598,6 +1628,7 @@ async fn test_split_compaction_group_on_demand_non_trivial() { ..Default::default() }, table_stats: Default::default(), + created_at: u64::MAX, }; hummock_manager .register_table_ids_for_test(&[(100, 2), (101, 2)]) @@ -1694,6 +1725,7 @@ async fn test_split_compaction_group_trivial_expired() { ..Default::default() }, table_stats: Default::default(), + created_at: u64::MAX, }; let sst_2 = LocalSstableInfo { sst_info: SstableInfo { @@ -1712,6 +1744,7 @@ async fn test_split_compaction_group_trivial_expired() { ..Default::default() }, table_stats: Default::default(), + created_at: u64::MAX, }; let mut sst_3 = sst_2.clone(); let mut sst_4 = sst_1.clone(); @@ -1857,6 +1890,7 @@ async fn test_split_compaction_group_on_demand_bottom_levels() { ..Default::default() }, table_stats: Default::default(), + created_at: u64::MAX, }; hummock_meta_client .commit_epoch( @@ -2019,6 +2053,7 @@ async fn test_compaction_task_expiration_due_to_split_group() { ..Default::default() }, table_stats: Default::default(), + created_at: u64::MAX, }; let sst_2 = LocalSstableInfo { sst_info: SstableInfo { @@ -2037,6 +2072,7 @@ async fn test_compaction_task_expiration_due_to_split_group() { ..Default::default() }, table_stats: Default::default(), + created_at: u64::MAX, }; hummock_meta_client @@ -2379,6 +2415,7 @@ async fn test_unregister_moved_table() { ..Default::default() }, table_stats: Default::default(), + created_at: u64::MAX, }; let sst_2 = LocalSstableInfo { sst_info: SstableInfo { @@ -2397,6 +2434,7 @@ async fn test_unregister_moved_table() { ..Default::default() }, table_stats: Default::default(), + created_at: u64::MAX, }; hummock_meta_client diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index c477693b114fb..13ee6f68cf76e 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -2607,6 +2607,7 @@ async fn test_commit_multi_epoch() { }) .collect(), sst_info: sst, + created_at: u64::MAX, }], new_table_fragment_info, change_log_delta: Default::default(), From 924ad20da5ec6e3b25937f422dd02f4c9d1d68ab Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Mon, 30 Sep 2024 13:55:25 +0800 Subject: [PATCH 3/5] refactor --- src/meta/src/hummock/manager/gc.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/meta/src/hummock/manager/gc.rs b/src/meta/src/hummock/manager/gc.rs index bbb0114cf568f..94a969e963f70 100644 --- a/src/meta/src/hummock/manager/gc.rs +++ b/src/meta/src/hummock/manager/gc.rs @@ -16,7 +16,7 @@ use std::cmp; use std::collections::HashSet; use std::ops::Bound::{Excluded, Included}; use std::ops::DerefMut; -use std::time::Duration; +use std::time::{Duration, SystemTime}; use anyhow::Context; use futures::{stream, StreamExt}; @@ -267,7 +267,10 @@ impl HummockManager { pub fn now(&self) -> u64 { // TODO: persist now to maintain non-decreasing even after a meta node reboot. let mut guard = self.now.lock(); - let new_now = chrono::Utc::now().timestamp().try_into().unwrap(); + let new_now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("Clock may have gone backwards") + .as_secs(); if new_now < *guard { tracing::warn!(old = *guard, new = new_now, "unexpected decreasing now"); return *guard; From b21d71295b39f54480dfe5e2ca0c5eb38a1977ae Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Mon, 30 Sep 2024 14:26:55 +0800 Subject: [PATCH 4/5] persist hummock now --- src/meta/model_v2/src/hummock_sequence.rs | 2 + src/meta/service/src/hummock_service.rs | 3 +- .../src/hummock/manager/compaction/mod.rs | 1 + src/meta/src/hummock/manager/context.rs | 18 +++--- src/meta/src/hummock/manager/gc.rs | 55 ++++++++++++++++--- src/meta/src/hummock/manager/mod.rs | 5 +- src/meta/src/hummock/manager/timer_task.rs | 1 + 7 files changed, 67 insertions(+), 18 deletions(-) diff --git a/src/meta/model_v2/src/hummock_sequence.rs b/src/meta/model_v2/src/hummock_sequence.rs index 58156c33266f7..04f8d6dc5540c 100644 --- a/src/meta/model_v2/src/hummock_sequence.rs +++ b/src/meta/model_v2/src/hummock_sequence.rs @@ -18,6 +18,8 @@ pub const COMPACTION_TASK_ID: &str = "compaction_task"; pub const COMPACTION_GROUP_ID: &str = "compaction_group"; pub const SSTABLE_OBJECT_ID: &str = "sstable_object"; pub const META_BACKUP_ID: &str = "meta_backup"; +/// The read & write of now is different from other sequences. It merely reuses the hummock_sequence table. +pub const HUMMOCK_NOW: &str = "now"; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Default, Serialize, Deserialize)] #[sea_orm(table_name = "hummock_sequence")] diff --git a/src/meta/service/src/hummock_service.rs b/src/meta/service/src/hummock_service.rs index 17b87e3a6b376..eed105510aa22 100644 --- a/src/meta/service/src/hummock_service.rs +++ b/src/meta/service/src/hummock_service.rs @@ -298,7 +298,8 @@ impl HummockManagerService for HummockServiceImpl { ) -> Result, Status> { let req = request.into_inner(); self.hummock_manager - .start_full_gc(Duration::from_secs(req.sst_retention_time_sec), req.prefix)?; + .start_full_gc(Duration::from_secs(req.sst_retention_time_sec), req.prefix) + .await?; Ok(Response::new(TriggerFullGcResponse { status: None })) } diff --git a/src/meta/src/hummock/manager/compaction/mod.rs b/src/meta/src/hummock/manager/compaction/mod.rs index be430dd174994..66122ca54232f 100644 --- a/src/meta/src/hummock/manager/compaction/mod.rs +++ b/src/meta/src/hummock/manager/compaction/mod.rs @@ -1129,6 +1129,7 @@ impl HummockManager { /// or the task is not owned by `context_id` when `context_id` is not None. pub async fn report_compact_tasks(&self, report_tasks: Vec) -> Result> { + // TODO: add sanity check for serverless compaction let mut guard = self.compaction.write().await; let deterministic_mode = self.env.opts.compaction_deterministic_test; let compaction: &mut Compaction = &mut guard; diff --git a/src/meta/src/hummock/manager/context.rs b/src/meta/src/hummock/manager/context.rs index 164c112cadff0..649e4e412589b 100644 --- a/src/meta/src/hummock/manager/context.rs +++ b/src/meta/src/hummock/manager/context.rs @@ -241,14 +241,16 @@ impl HummockManager { } } - // TODO: add sanity check for serverless compaction - - // sanity check to ensure SSTs to commit have not been full GCed yet. - let now = self.now(); - let sst_retention_watermark = now.saturating_sub(self.env.opts.min_sst_retention_time_sec); - for sst in sstables { - if sst.created_at < sst_retention_watermark { - return Err(anyhow::anyhow!("SST {} is rejected from being committed since it's below watermark: SST timestamp {}, meta node timestamp {}, retention_sec {}, watermark {}", sst.sst_info.sst_id, sst.created_at, now, self.env.opts.min_sst_retention_time_sec, sst_retention_watermark).into()); + // HummockManager::now requires a write to the meta store. Thus, it should be avoided whenever feasible. + if !sstables.is_empty() { + // sanity check to ensure SSTs to commit have not been full GCed yet. + let now = self.now().await?; + let sst_retention_watermark = + now.saturating_sub(self.env.opts.min_sst_retention_time_sec); + for sst in sstables { + if sst.created_at < sst_retention_watermark { + return Err(anyhow::anyhow!("SST {} is rejected from being committed since it's below watermark: SST timestamp {}, meta node timestamp {}, retention_sec {}, watermark {}", sst.sst_info.sst_id, sst.created_at, now, self.env.opts.min_sst_retention_time_sec, sst_retention_watermark).into()); + } } } diff --git a/src/meta/src/hummock/manager/gc.rs b/src/meta/src/hummock/manager/gc.rs index 94a969e963f70..119a7706ad8bd 100644 --- a/src/meta/src/hummock/manager/gc.rs +++ b/src/meta/src/hummock/manager/gc.rs @@ -23,10 +23,14 @@ use futures::{stream, StreamExt}; use itertools::Itertools; use parking_lot::Mutex; use risingwave_hummock_sdk::HummockSstableObjectId; +use risingwave_meta_model_migration::OnConflict; +use risingwave_meta_model_v2::hummock_sequence; +use risingwave_meta_model_v2::hummock_sequence::HUMMOCK_NOW; use risingwave_pb::common::worker_node::State::Running; use risingwave_pb::common::WorkerType; use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent; use risingwave_pb::hummock::FullScanTask; +use sea_orm::{ActiveValue, EntityTrait}; use crate::hummock::error::{Error, Result}; use crate::hummock::manager::commit_multi_var; @@ -177,7 +181,7 @@ impl HummockManager { /// 3. Meta node decides which SSTs to delete. See `HummockManager::complete_full_gc`. /// /// Returns Ok(false) if there is no worker available. - pub fn start_full_gc( + pub async fn start_full_gc( &self, sst_retention_time: Duration, prefix: Option, @@ -205,7 +209,10 @@ impl HummockManager { } Some(compactor) => compactor, }; - let sst_retention_watermark = self.now().saturating_sub(sst_retention_time.as_secs()); + let sst_retention_watermark = self + .now() + .await? + .saturating_sub(sst_retention_time.as_secs()); compactor .send_event(ResponseEvent::FullScanTask(FullScanTask { sst_retention_watermark, @@ -264,19 +271,48 @@ impl HummockManager { Ok(selected_object_number) } - pub fn now(&self) -> u64 { - // TODO: persist now to maintain non-decreasing even after a meta node reboot. - let mut guard = self.now.lock(); + pub async fn now(&self) -> Result { + let mut guard = self.now.lock().await; let new_now = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .expect("Clock may have gone backwards") .as_secs(); if new_now < *guard { - tracing::warn!(old = *guard, new = new_now, "unexpected decreasing now"); - return *guard; + return Err(anyhow::anyhow!(format!( + "unexpected decreasing now, old={}, new={}", + *guard, new_now + )) + .into()); } *guard = new_now; - new_now + drop(guard); + // Persist now to maintain non-decreasing even after a meta node reboot. + if let Some(sql) = self.sql_store() { + let m = hummock_sequence::ActiveModel { + name: ActiveValue::Set(HUMMOCK_NOW.into()), + seq: ActiveValue::Set(new_now.try_into().unwrap()), + }; + hummock_sequence::Entity::insert(m) + .on_conflict( + OnConflict::column(hummock_sequence::Column::Name) + .update_column(hummock_sequence::Column::Seq) + .to_owned(), + ) + .exec(&sql.conn) + .await?; + } + Ok(new_now) + } + + pub(crate) async fn load_now(&self) -> Result> { + let Some(sql) = self.sql_store() else { + return Ok(None); + }; + let now = hummock_sequence::Entity::find_by_id(HUMMOCK_NOW.to_string()) + .one(&sql.conn) + .await? + .map(|m| m.seq.try_into().unwrap()); + Ok(now) } } @@ -412,6 +448,7 @@ mod tests { Duration::from_secs(hummock_manager.env.opts.min_sst_retention_time_sec - 1,), None ) + .await .unwrap()); let mut receiver = compactor_manager.add_compactor(context_id); @@ -421,6 +458,7 @@ mod tests { Duration::from_secs(hummock_manager.env.opts.min_sst_retention_time_sec - 1), None ) + .await .unwrap()); let _full_scan_task = match receiver.recv().await.unwrap().unwrap().event.unwrap() { ResponseEvent::FullScanTask(task) => task, @@ -434,6 +472,7 @@ mod tests { Duration::from_secs(hummock_manager.env.opts.min_sst_retention_time_sec + 1), None ) + .await .unwrap()); let _full_scan_task = match receiver.recv().await.unwrap().unwrap().event.unwrap() { ResponseEvent::FullScanTask(task) => task, diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index cd15e655dbd64..8c8f7e115c095 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -20,7 +20,6 @@ use std::sync::Arc; use arc_swap::ArcSwap; use bytes::Bytes; use itertools::Itertools; -use parking_lot::Mutex; use risingwave_common::monitor::MonitoredRwLock; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::util::epoch::INVALID_EPOCH; @@ -39,6 +38,7 @@ use risingwave_pb::hummock::{ }; use risingwave_pb::meta::subscribe_response::Operation; use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::Mutex; use tonic::Streaming; use crate::hummock::compaction::CompactStatus; @@ -307,6 +307,9 @@ impl HummockManager { /// Load state from meta store. async fn load_meta_store_state(&self) -> Result<()> { + let now = self.load_now().await?; + *self.now.lock().await = now.unwrap_or(0); + let mut compaction_guard = self.compaction.write().await; let mut versioning_guard = self.versioning.write().await; let mut context_info_guard = self.context_info.write().await; diff --git a/src/meta/src/hummock/manager/timer_task.rs b/src/meta/src/hummock/manager/timer_task.rs index eb0b2655d0041..0cefe157d8493 100644 --- a/src/meta/src/hummock/manager/timer_task.rs +++ b/src/meta/src/hummock/manager/timer_task.rs @@ -344,6 +344,7 @@ impl HummockManager { hummock_manager.env.opts.min_sst_retention_time_sec; if hummock_manager .start_full_gc(Duration::from_secs(retention_sec), None) + .await .is_ok() { tracing::info!("Start full GC from meta node."); From d2a48cb5bbe2d670a1d13533a57c6c958118bfaf Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Mon, 30 Sep 2024 15:50:18 +0800 Subject: [PATCH 5/5] fix tests --- src/meta/src/hummock/manager/tests.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index bf5fe0cd0f96f..82b5dda90dd69 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -1806,11 +1806,13 @@ async fn test_move_state_tables_to_dedicated_compaction_group_on_demand_bottom_l let sst_2 = LocalSstableInfo { sst_info: gen_sstable_info(11, vec![101, 102], epoch), table_stats: Default::default(), + created_at: u64::MAX, }; let sst_3 = LocalSstableInfo { sst_info: gen_sstable_info(12, vec![103], epoch), table_stats: Default::default(), + created_at: u64::MAX, }; hummock_meta_client