diff --git a/proto/hummock.proto b/proto/hummock.proto index 0fab515e546e6..f52854b727045 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -306,6 +306,7 @@ message CompactTask { TRACK_SST_OBJECT_ID_FAILED = 12; NO_AVAIL_CPU_RESOURCE_CANCELED = 13; HEARTBEAT_PROGRESS_CANCELED = 14; + RETENTION_TIME_REJECTED = 17; // for serverless compaction SERVERLESS_SEND_FAIL_CANCELED = 15; @@ -460,6 +461,7 @@ message SubscribeCompactionEventRequest { uint64 task_id = 4; CompactTask.TaskStatus task_status = 5; repeated SstableInfo sorted_output_ssts = 6; + map object_timestamps = 7; } // HeartBeat provides the progress status of all tasks on the Compactor. @@ -507,6 +509,7 @@ message ReportCompactionTaskRequest { message ReportTask { CompactTask compact_task = 2; map table_stats_change = 3; + map object_timestamps = 4; } // HeartBeat provides the progress status of all tasks on the Compactor. message HeartBeat { diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 463de3f6febe4..5d2d49196cd35 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -1668,13 +1668,13 @@ fn collect_resp_info( let mut old_value_ssts = Vec::with_capacity(resps.len()); for resp in resps { - let ssts_iter = resp.synced_sstables.into_iter().map(|grouped| { - let sst_info = grouped.sst.expect("field not None"); + let ssts_iter = resp.synced_sstables.into_iter().map(|local_sst| { + let sst_info = local_sst.sst.expect("field not None"); sst_to_worker.insert(sst_info.object_id, resp.worker_id); LocalSstableInfo::new( sst_info.into(), - from_prost_table_stats_map(grouped.table_stats_map), - grouped.created_at, + from_prost_table_stats_map(local_sst.table_stats_map), + local_sst.created_at, ) }); synced_ssts.extend(ssts_iter); diff --git a/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs index 9eae55e622b0f..bb2a92dfd24c5 100644 --- a/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs +++ b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs @@ -245,6 +245,7 @@ impl HummockManager { task_status: TaskStatus::ManualCanceled, table_stats_change: HashMap::default(), sorted_output_ssts: vec![], + object_timestamps: HashMap::default(), }); } }); @@ -556,6 +557,7 @@ impl HummockManager { task_status: TaskStatus::ManualCanceled, table_stats_change: HashMap::default(), sorted_output_ssts: vec![], + object_timestamps: HashMap::default(), }); } } diff --git a/src/meta/src/hummock/manager/compaction/mod.rs b/src/meta/src/hummock/manager/compaction/mod.rs index 98e4216df0327..f94c01efbd7ae 100644 --- a/src/meta/src/hummock/manager/compaction/mod.rs +++ b/src/meta/src/hummock/manager/compaction/mod.rs @@ -51,7 +51,7 @@ use risingwave_hummock_sdk::table_stats::{ use risingwave_hummock_sdk::version::{GroupDelta, HummockVersion, IntraLevelDelta}; use risingwave_hummock_sdk::{ compact_task_to_string, statistics_compact_task, CompactionGroupId, HummockCompactionTaskId, - HummockVersionId, + HummockSstableObjectId, HummockVersionId, }; use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType}; use risingwave_pb::hummock::subscribe_compaction_event_request::{ @@ -994,6 +994,7 @@ impl HummockManager { task_status, sorted_output_ssts: vec![], table_stats_change: HashMap::default(), + object_timestamps: HashMap::default(), }) .collect_vec(); let rets = self.report_compact_tasks(tasks).await?; @@ -1108,6 +1109,7 @@ impl HummockManager { task_status: TaskStatus, sorted_output_ssts: Vec, table_stats_change: Option, + object_timestamps: HashMap, ) -> Result { let rets = self .report_compact_tasks(vec![ReportTask { @@ -1115,6 +1117,7 @@ impl HummockManager { task_status, sorted_output_ssts, table_stats_change: table_stats_change.unwrap_or_default(), + object_timestamps, }]) .await?; Ok(rets[0]) @@ -1129,7 +1132,6 @@ impl HummockManager { /// or the task is not owned by `context_id` when `context_id` is not None. pub async fn report_compact_tasks(&self, report_tasks: Vec) -> Result> { - // TODO: add sanity check for serverless compaction let mut guard = self.compaction.write().await; let deterministic_mode = self.env.opts.compaction_deterministic_test; let compaction: &mut Compaction = &mut guard; @@ -1205,10 +1207,19 @@ impl HummockManager { .map(|level| level.level_idx) .collect(); let is_success = if let TaskStatus::Success = compact_task.task_status { - // if member_table_ids changes, the data of sstable may stale. - let is_expired = - Self::is_compact_task_expired(&compact_task, version.latest_version()); - if is_expired { + if let Err(e) = self + .report_compaction_sanity_check(&task.object_timestamps) + .await + { + warn!( + "failed to commit compaction task {} {}", + compact_task.task_id, + e.as_report() + ); + compact_task.task_status = TaskStatus::RetentionTimeRejected; + false + } else if Self::is_compact_task_expired(&compact_task, version.latest_version()) { + // if member_table_ids changes, the data of sstable may stale. compact_task.task_status = TaskStatus::InputOutdatedCanceled; false } else { @@ -1567,6 +1578,7 @@ impl HummockManager { task_status, sorted_output_ssts, table_stats_change: table_stats_change.unwrap_or_default(), + object_timestamps: HashMap::default(), }]) .await?; Ok(()) diff --git a/src/meta/src/hummock/manager/context.rs b/src/meta/src/hummock/manager/context.rs index b42dd9e54d0ed..3ceb9d367f7df 100644 --- a/src/meta/src/hummock/manager/context.rs +++ b/src/meta/src/hummock/manager/context.rs @@ -233,13 +233,13 @@ impl HummockManager { if !sstables.is_empty() { // sanity check to ensure SSTs to commit have not been full GCed yet. let now = self.now().await?; - let sst_retention_watermark = - now.saturating_sub(self.env.opts.min_sst_retention_time_sec); - for sst in sstables { - if sst.created_at < sst_retention_watermark { - return Err(anyhow::anyhow!("SST {} is rejected from being committed since it's below watermark: SST timestamp {}, meta node timestamp {}, retention_sec {}, watermark {}", sst.sst_info.sst_id, sst.created_at, now, self.env.opts.min_sst_retention_time_sec, sst_retention_watermark).into()); - } - } + check_sst_retention( + now, + self.env.opts.min_sst_retention_time_sec, + sstables + .iter() + .map(|s| (s.sst_info.object_id, s.created_at)), + )?; } async { @@ -278,6 +278,36 @@ impl HummockManager { pub async fn release_meta_context(&self) -> Result<()> { self.release_contexts([META_NODE_ID]).await } + + pub(crate) async fn report_compaction_sanity_check( + &self, + object_timestamps: &HashMap, + ) -> Result<()> { + // HummockManager::now requires a write to the meta store. Thus, it should be avoided whenever feasible. + if object_timestamps.is_empty() { + return Ok(()); + } + let now = self.now().await?; + check_sst_retention( + now, + self.env.opts.min_sst_retention_time_sec, + object_timestamps.iter().map(|(k, v)| (*k, *v)), + ) + } +} + +fn check_sst_retention( + now: u64, + retention_sec: u64, + sst_infos: impl Iterator, +) -> Result<()> { + let sst_retention_watermark = now.saturating_sub(retention_sec); + for (object_id, created_at) in sst_infos { + if created_at < sst_retention_watermark { + return Err(anyhow::anyhow!("object {object_id} is rejected from being committed since it's below watermark: object timestamp {created_at}, meta node timestamp {now}, retention_sec {retention_sec}, watermark {sst_retention_watermark}").into()); + } + } + Ok(()) } // pin and unpin method diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index 74668e67a27c2..97cd6f957515a 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -210,7 +210,13 @@ async fn test_hummock_compaction_task() { // Finish the task and succeed. assert!(hummock_manager - .report_compact_task(compact_task.task_id, TaskStatus::Success, vec![], None) + .report_compact_task( + compact_task.task_id, + TaskStatus::Success, + vec![], + None, + HashMap::default() + ) .await .unwrap()); } @@ -1383,6 +1389,7 @@ async fn test_version_stats() { TaskStatus::Success, vec![], Some(to_prost_table_stats_map(compact_table_stats_change)), + HashMap::default(), ) .await .unwrap(); @@ -1770,12 +1777,19 @@ async fn test_move_state_tables_to_dedicated_compaction_group_trivial_expired() ..Default::default() }], None, + HashMap::default(), ) .await .unwrap(); assert!(ret); let ret = hummock_manager - .report_compact_task(task.task_id, TaskStatus::Success, vec![], None) + .report_compact_task( + task.task_id, + TaskStatus::Success, + vec![], + None, + HashMap::default(), + ) .await .unwrap(); // the task has been canceled @@ -1845,7 +1859,6 @@ async fn test_move_state_tables_to_dedicated_compaction_group_on_demand_bottom_l let compaction_group_id = get_compaction_group_id_by_table_id(hummock_manager.clone(), 100).await; - let manual_compcation_option = ManualCompactionOption { level: 0, ..Default::default() @@ -1990,7 +2003,13 @@ async fn test_compaction_task_expiration_due_to_split_group() { let version_1 = hummock_manager.get_current_version().await; assert!(!hummock_manager - .report_compact_task(compaction_task.task_id, TaskStatus::Success, vec![], None) + .report_compact_task( + compaction_task.task_id, + TaskStatus::Success, + vec![], + None, + HashMap::default() + ) .await .unwrap()); let version_2 = hummock_manager.get_current_version().await; @@ -2005,7 +2024,13 @@ async fn test_compaction_task_expiration_due_to_split_group() { get_manual_compact_task(hummock_manager.clone(), compaction_group_id, 0).await; assert_eq!(compaction_task.input_ssts[0].table_infos.len(), 2); hummock_manager - .report_compact_task(compaction_task.task_id, TaskStatus::Success, vec![], None) + .report_compact_task( + compaction_task.task_id, + TaskStatus::Success, + vec![], + None, + HashMap::default(), + ) .await .unwrap(); @@ -2254,7 +2279,13 @@ async fn test_partition_level() { sst.sst_size = sst.file_size; global_sst_id += 1; let ret = hummock_manager - .report_compact_task(task.task_id, TaskStatus::Success, vec![sst], None) + .report_compact_task( + task.task_id, + TaskStatus::Success, + vec![sst], + None, + HashMap::default(), + ) .await .unwrap(); assert!(ret); diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index ba54bc64969ad..dffbcc9bb3310 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -346,6 +346,7 @@ impl HummockMetaClient for MockHummockMetaClient { task_status, sorted_output_ssts, table_stats_change, + object_timestamps, }) = item.event.unwrap() { if let Err(e) = hummock_manager_compact @@ -357,6 +358,7 @@ impl HummockMetaClient for MockHummockMetaClient { .map(SstableInfo::from) .collect_vec(), Some(table_stats_change), + object_timestamps, ) .await { diff --git a/src/storage/hummock_sdk/src/compact_task.rs b/src/storage/hummock_sdk/src/compact_task.rs index ff76e3a70dde8..162895a38ac91 100644 --- a/src/storage/hummock_sdk/src/compact_task.rs +++ b/src/storage/hummock_sdk/src/compact_task.rs @@ -26,6 +26,7 @@ use crate::key_range::KeyRange; use crate::level::InputLevel; use crate::sstable_info::SstableInfo; use crate::table_watermark::TableWatermarks; +use crate::HummockSstableObjectId; #[derive(Clone, PartialEq, Default, Debug)] pub struct CompactTask { @@ -373,6 +374,7 @@ pub struct ReportTask { pub task_id: u64, pub task_status: TaskStatus, pub sorted_output_ssts: Vec, + pub object_timestamps: HashMap, } impl From for ReportTask { @@ -386,6 +388,7 @@ impl From for ReportTask { .into_iter() .map(SstableInfo::from) .collect_vec(), + object_timestamps: value.object_timestamps, } } } @@ -401,6 +404,7 @@ impl From for PbReportTask { .into_iter() .map(|sst| sst.into()) .collect_vec(), + object_timestamps: value.object_timestamps, } } } diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 402d902d27e8c..92ffd77851b70 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -282,7 +282,7 @@ pub(crate) mod tests { { // 3. compact let (_tx, rx) = tokio::sync::oneshot::channel(); - let ((result_task, task_stats), _) = compact( + let ((result_task, task_stats, object_timestamps), _) = compact( compact_ctx.clone(), compact_task.clone(), rx, @@ -297,6 +297,7 @@ pub(crate) mod tests { result_task.task_status, result_task.sorted_output_ssts, Some(to_prost_table_stats_map(task_stats)), + object_timestamps, ) .await .unwrap(); @@ -623,7 +624,7 @@ pub(crate) mod tests { // 4. compact let (_tx, rx) = tokio::sync::oneshot::channel(); - let ((result_task, task_stats), _) = compact( + let ((result_task, task_stats, object_timestamps), _) = compact( compact_ctx, compact_task.clone(), rx, @@ -637,6 +638,7 @@ pub(crate) mod tests { result_task.task_status, result_task.sorted_output_ssts, Some(to_prost_table_stats_map(task_stats)), + object_timestamps, ) .await .unwrap(); @@ -822,7 +824,7 @@ pub(crate) mod tests { // 3. compact let (_tx, rx) = tokio::sync::oneshot::channel(); - let ((result_task, task_stats), _) = compact( + let ((result_task, task_stats, object_timestamps), _) = compact( compact_ctx, compact_task.clone(), rx, @@ -837,6 +839,7 @@ pub(crate) mod tests { result_task.task_status, result_task.sorted_output_ssts, Some(to_prost_table_stats_map(task_stats)), + object_timestamps, ) .await .unwrap(); @@ -1015,7 +1018,7 @@ pub(crate) mod tests { // 3. compact let (_tx, rx) = tokio::sync::oneshot::channel(); - let ((result_task, task_stats), _) = compact( + let ((result_task, task_stats, object_timestamps), _) = compact( compact_ctx, compact_task.clone(), rx, @@ -1030,6 +1033,7 @@ pub(crate) mod tests { result_task.task_status, result_task.sorted_output_ssts, Some(to_prost_table_stats_map(task_stats)), + object_timestamps, ) .await .unwrap(); @@ -1189,7 +1193,7 @@ pub(crate) mod tests { // 3. compact let (_tx, rx) = tokio::sync::oneshot::channel(); - let ((result_task, task_stats), _) = compact( + let ((result_task, task_stats, object_timestamps), _) = compact( compact_ctx, compact_task.clone(), rx, @@ -1204,6 +1208,7 @@ pub(crate) mod tests { result_task.task_status, result_task.sorted_output_ssts, Some(to_prost_table_stats_map(task_stats)), + object_timestamps, ) .await .unwrap(); @@ -1999,7 +2004,7 @@ pub(crate) mod tests { // 3. compact let (_tx, rx) = tokio::sync::oneshot::channel(); - let ((result_task, task_stats), _) = compact( + let ((result_task, task_stats, object_timestamps), _) = compact( compact_ctx, compact_task.clone(), rx, @@ -2014,6 +2019,7 @@ pub(crate) mod tests { result_task.task_status, result_task.sorted_output_ssts, Some(to_prost_table_stats_map(task_stats)), + object_timestamps, ) .await .unwrap(); @@ -2223,7 +2229,7 @@ pub(crate) mod tests { // 3. compact let (_tx, rx) = tokio::sync::oneshot::channel(); - let ((result_task, task_stats), _) = compact( + let ((result_task, task_stats, object_timestamps), _) = compact( compact_ctx.clone(), compact_task.clone(), rx, @@ -2238,6 +2244,7 @@ pub(crate) mod tests { result_task.task_status, result_task.sorted_output_ssts, Some(to_prost_table_stats_map(task_stats)), + object_timestamps, ) .await .unwrap(); diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index f6ad680beba0c..fad988b430d4e 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -298,7 +298,11 @@ pub async fn compact( object_id_getter: Box, filter_key_extractor_manager: FilterKeyExtractorManager, ) -> ( - (CompactTask, HashMap), + ( + CompactTask, + HashMap, + HashMap, + ), Option, ) { let context = compactor_context.clone(); @@ -460,7 +464,7 @@ pub async fn compact( } // After a compaction is done, mutate the compaction task. - let (compact_task, table_stats) = + let (compact_task, table_stats, object_timestamps) = compact_done(compact_task, context.clone(), output_ssts, task_status); let cost_time = timer.stop_and_record() * 1000.0; tracing::info!( @@ -468,7 +472,10 @@ pub async fn compact( cost_time, compact_task_to_string(&compact_task) ); - return ((compact_task, table_stats), memory_detector); + return ( + (compact_task, table_stats, object_timestamps), + memory_detector, + ); } for (split_index, _) in compact_task.splits.iter().enumerate() { let filter = multi_filter.clone(); @@ -555,7 +562,7 @@ pub async fn compact( } // After a compaction is done, mutate the compaction task. - let (compact_task, table_stats) = + let (compact_task, table_stats, object_timestamps) = compact_done(compact_task, context.clone(), output_ssts, task_status); let cost_time = timer.stop_and_record() * 1000.0; tracing::info!( @@ -563,7 +570,10 @@ pub async fn compact( cost_time, compact_task_output_to_string(&compact_task) ); - ((compact_task, table_stats), memory_detector) + ( + (compact_task, table_stats, object_timestamps), + memory_detector, + ) } /// Fills in the compact task and tries to report the task result to meta node. @@ -572,8 +582,13 @@ pub(crate) fn compact_done( context: CompactorContext, output_ssts: Vec, task_status: TaskStatus, -) -> (CompactTask, HashMap) { +) -> ( + CompactTask, + HashMap, + HashMap, +) { let mut table_stats_map = TableStatsMap::default(); + let mut object_timestamps = HashMap::default(); compact_task.task_status = task_status; compact_task .sorted_output_ssts @@ -590,6 +605,7 @@ pub(crate) fn compact_done( add_table_stats_map(&mut table_stats_map, &delta_drop_stat); for sst_info in ssts { compaction_write_bytes += sst_info.file_size(); + object_timestamps.insert(sst_info.sst_info.object_id, sst_info.created_at); compact_task.sorted_output_ssts.push(sst_info.sst_info); } } @@ -607,7 +623,7 @@ pub(crate) fn compact_done( .with_label_values(&[&group_label, level_label.as_str()]) .inc_by(compact_task.sorted_output_ssts.len() as u64); - (compact_task, table_stats_map) + (compact_task, table_stats_map, object_timestamps) } pub async fn compact_and_build_sst( diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index b535bd3945cdf..c49306d90cf79 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -53,7 +53,9 @@ use futures::{pin_mut, StreamExt}; pub use iterator::{ConcatSstableIterator, SstableStreamIterator}; use more_asserts::assert_ge; use risingwave_hummock_sdk::table_stats::{to_prost_table_stats_map, TableStatsMap}; -use risingwave_hummock_sdk::{compact_task_to_string, HummockCompactionTaskId, LocalSstableInfo}; +use risingwave_hummock_sdk::{ + compact_task_to_string, HummockCompactionTaskId, HummockSstableObjectId, LocalSstableInfo, +}; use risingwave_pb::hummock::compact_task::TaskStatus; use risingwave_pb::hummock::subscribe_compaction_event_request::{ Event as RequestEvent, HeartBeat, PullTask, ReportTask, @@ -423,6 +425,7 @@ pub fn start_compactor( fn send_report_task_event( compact_task: &CompactTask, table_stats: TableStatsMap, + object_timestamps: HashMap, request_sender: &mpsc::UnboundedSender, ) { if let Err(e) = request_sender.send(SubscribeCompactionEventRequest { @@ -435,6 +438,7 @@ pub fn start_compactor( .map(|sst| sst.into()) .collect(), table_stats_change: to_prost_table_stats_map(table_stats), + object_timestamps, })), create_at: SystemTime::now() .duration_since(std::time::UNIX_EPOCH) @@ -487,16 +491,18 @@ pub fn start_compactor( max_task_parallelism, running_task_parallelism.load(Ordering::Relaxed), ); - let (compact_task, table_stats) = compact_done( - compact_task, - context.clone(), - vec![], - TaskStatus::NoAvailCpuResourceCanceled, - ); + let (compact_task, table_stats, object_timestamps) = + compact_done( + compact_task, + context.clone(), + vec![], + TaskStatus::NoAvailCpuResourceCanceled, + ); send_report_task_event( &compact_task, table_stats, + object_timestamps, &request_sender, ); @@ -509,7 +515,7 @@ pub fn start_compactor( let (tx, rx) = tokio::sync::oneshot::channel(); let task_id = compact_task.task_id; shutdown.lock().unwrap().insert(task_id, tx); - let ((compact_task, table_stats), _memory_tracker) = + let ((compact_task, table_stats, object_timestamps), _memory_tracker) = match sstable_object_id_manager .add_watermark_object_id(None) .await @@ -538,7 +544,7 @@ pub fn start_compactor( tracing::warn!(error = %err.as_report(), "Failed to track pending SST object id"); let mut compact_task = compact_task; compact_task.task_status = TaskStatus::TrackSstObjectIdFailed; - ((compact_task, HashMap::default()), None) + ((compact_task, HashMap::default(), HashMap::default()), None) } }; shutdown.lock().unwrap().remove(&task_id); @@ -547,6 +553,7 @@ pub fn start_compactor( send_report_task_event( &compact_task, table_stats, + object_timestamps, &request_sender, ); @@ -740,7 +747,7 @@ pub fn start_shared_compactor( let task_id = compact_task.task_id; shutdown.lock().unwrap().insert(task_id, tx); - let ((compact_task, table_stats), _memory_tracker)= compactor_runner::compact( + let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact( context.clone(), compact_task, rx, @@ -753,6 +760,7 @@ pub fn start_shared_compactor( event: Some(ReportCompactionTaskEvent::ReportTask(ReportSharedTask { compact_task: Some(PbCompactTask::from(&compact_task)), table_stats_change: to_prost_table_stats_map(table_stats), + object_timestamps, })), };