diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index 73c07ccf8259..626ac32a8171 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -589,7 +589,7 @@ pub async fn start_service_as_election_leader( // sub_tasks executed concurrently. Can be shutdown via shutdown_all sub_tasks.extend(hummock::start_hummock_workers( hummock_manager.clone(), - // compaction_scheduler, + backup_manager.clone(), &env.opts, )); sub_tasks.push(start_worker_info_monitor( diff --git a/src/meta/src/hummock/manager/checkpoint.rs b/src/meta/src/hummock/manager/checkpoint.rs index 394933109844..9558866edd43 100644 --- a/src/meta/src/hummock/manager/checkpoint.rs +++ b/src/meta/src/hummock/manager/checkpoint.rs @@ -12,14 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::ops::Bound::{Excluded, Included}; use std::ops::{Deref, DerefMut}; use std::sync::atomic::Ordering; +use itertools::Itertools; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::object_size_map; use risingwave_hummock_sdk::version::{GroupDeltaCommon, HummockVersion}; -use risingwave_hummock_sdk::HummockVersionId; +use risingwave_hummock_sdk::{HummockSstableObjectId, HummockVersionId}; use risingwave_pb::hummock::hummock_version_checkpoint::{PbStaleObjects, StaleObjects}; use risingwave_pb::hummock::{ PbHummockVersion, PbHummockVersionArchive, PbHummockVersionCheckpoint, @@ -122,7 +123,10 @@ impl HummockManager { /// Returns the diff between new and old checkpoint id. /// Note that this method must not be called concurrently, because internally it doesn't hold /// lock throughout the method. - pub async fn create_version_checkpoint(&self, min_delta_log_num: u64) -> Result { + pub async fn create_version_checkpoint( + &self, + min_delta_log_num: u64, + ) -> Result<(u64, HashSet)> { let timer = self.metrics.version_checkpoint_latency.start_timer(); // 1. hold read lock and create new checkpoint let versioning_guard = self.versioning.read().await; @@ -132,7 +136,7 @@ impl HummockManager { let new_checkpoint_id = current_version.id; let old_checkpoint_id = old_checkpoint.version.id; if new_checkpoint_id < old_checkpoint_id + min_delta_log_num { - return Ok(0); + return Ok((0, HashSet::default())); } if cfg!(test) && new_checkpoint_id == old_checkpoint_id { drop(versioning_guard); @@ -140,7 +144,7 @@ impl HummockManager { let context_info = self.context_info.read().await; let min_pinned_version_id = context_info.min_pinned_version_id(); trigger_gc_stat(&self.metrics, &versioning.checkpoint, min_pinned_version_id); - return Ok(0); + return Ok((0, HashSet::default())); } assert!(new_checkpoint_id > old_checkpoint_id); let mut archive: Option = None; @@ -203,13 +207,21 @@ impl HummockManager { }) }) .sum::(); - stale_objects.insert( - current_version.id, - StaleObjects { + stale_objects + .entry(current_version.id) + .and_modify(|s| { + s.id = + s.id.iter() + .chain(removed_object_ids.iter()) + .unique() + .copied() + .collect(); + s.total_file_size += total_file_size; + }) + .or_insert(StaleObjects { id: removed_object_ids.into_iter().collect(), total_file_size, - }, - ); + }); if self.env.opts.enable_hummock_data_archive { archive = Some(PbHummockVersionArchive { version: Some(PbHummockVersion::from(&old_checkpoint.version)), @@ -220,8 +232,17 @@ impl HummockManager { .collect(), }); } - // We can directly discard reference to stale objects that will no longer be used. let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id(); + let may_delete_object = stale_objects + .iter() + .filter_map(|(version_id, object_ids)| { + if *version_id >= min_pinned_version_id { + return None; + } + Some(object_ids.id.clone()) + }) + .flatten() + .collect(); stale_objects.retain(|version_id, _| *version_id >= min_pinned_version_id); let new_checkpoint = HummockVersionCheckpoint { version: current_version.clone(), @@ -253,7 +274,7 @@ impl HummockManager { .checkpoint_version_id .set(new_checkpoint_id.to_u64() as i64); - Ok(new_checkpoint_id - old_checkpoint_id) + Ok((new_checkpoint_id - old_checkpoint_id, may_delete_object)) } pub fn pause_version_checkpoint(&self) { diff --git a/src/meta/src/hummock/manager/gc.rs b/src/meta/src/hummock/manager/gc.rs index 12619cae1977..785a88def87e 100644 --- a/src/meta/src/hummock/manager/gc.rs +++ b/src/meta/src/hummock/manager/gc.rs @@ -506,6 +506,26 @@ impl HummockManager { } Ok(total) } + + /// Minor GC attempts to delete objects that were part of Hummock version but are no longer in use. + pub async fn start_minor_gc( + &self, + backup_manager: BackupManagerRef, + object_ids: impl Iterator, + ) -> Result<()> { + // Objects pinned by either meta backup or time travel should be filtered out. + let pinned_objects: HashSet<_> = backup_manager + .list_pinned_ssts() + .into_iter() + .chain(self.all_object_ids_in_time_travel().await?) + .collect(); + let object_ids = object_ids + .filter(|s| !pinned_objects.contains(s)) + .collect_vec(); + // Retry is not necessary. Full GC will handle these objects eventually. + self.delete_objects(object_ids).await?; + Ok(()) + } } async fn collect_min_uncommitted_sst_id( diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index d0eedbfdd8a7..0bd39c4b1f12 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -575,7 +575,11 @@ async fn test_hummock_manager_basic() { (0, 0) ); assert_eq!( - hummock_manager.create_version_checkpoint(1).await.unwrap(), + hummock_manager + .create_version_checkpoint(1) + .await + .unwrap() + .0, commit_log_count + register_log_count ); assert_eq!( @@ -1198,7 +1202,11 @@ async fn test_extend_objects_to_delete() { // Checkpoint assert_eq!( - hummock_manager.create_version_checkpoint(1).await.unwrap(), + hummock_manager + .create_version_checkpoint(1) + .await + .unwrap() + .0, 6 ); // since version1 is still pinned, the sst removed in compaction can not be reclaimed. @@ -2143,7 +2151,11 @@ async fn test_gc_stats() { }; assert_eq_gc_stats(0, 0, 0, 0, 0, 0); assert_eq!( - hummock_manager.create_version_checkpoint(0).await.unwrap(), + hummock_manager + .create_version_checkpoint(0) + .await + .unwrap() + .0, 0 ); @@ -2157,7 +2169,11 @@ async fn test_gc_stats() { .await; assert_eq_gc_stats(0, 0, 0, 0, 0, 0); assert_ne!( - hummock_manager.create_version_checkpoint(0).await.unwrap(), + hummock_manager + .create_version_checkpoint(0) + .await + .unwrap() + .0, 0 ); @@ -2169,7 +2185,11 @@ async fn test_gc_stats() { assert_eq_gc_stats(0, 0, 6, 3, 2, 4); assert_eq!( - hummock_manager.create_version_checkpoint(0).await.unwrap(), + hummock_manager + .create_version_checkpoint(0) + .await + .unwrap() + .0, 0 ); } diff --git a/src/meta/src/hummock/manager/time_travel.rs b/src/meta/src/hummock/manager/time_travel.rs index f9d2534fb8c6..b836ae316de6 100644 --- a/src/meta/src/hummock/manager/time_travel.rs +++ b/src/meta/src/hummock/manager/time_travel.rs @@ -32,6 +32,7 @@ use risingwave_meta_model::{ hummock_epoch_to_version, hummock_sstable_info, hummock_time_travel_delta, hummock_time_travel_version, }; +use risingwave_pb::hummock::hummock_version_checkpoint::PbStaleObjects; use risingwave_pb::hummock::{PbHummockVersion, PbHummockVersionDelta}; use sea_orm::sea_query::OnConflict; use sea_orm::ActiveValue::Set; @@ -95,7 +96,7 @@ impl HummockManager { "delete {} rows from hummock_epoch_to_version", res.rows_affected ); - let earliest_valid_version = hummock_time_travel_version::Entity::find() + let latest_valid_version = hummock_time_travel_version::Entity::find() .filter( hummock_time_travel_version::Column::VersionId.lte(version_watermark.version_id), ) @@ -103,23 +104,29 @@ impl HummockManager { .one(&txn) .await? .map(|m| HummockVersion::from_persisted_protobuf(&m.version.to_protobuf())); - let Some(earliest_valid_version) = earliest_valid_version else { + let Some(latest_valid_version) = latest_valid_version else { txn.commit().await?; return Ok(()); }; - let (earliest_valid_version_id, earliest_valid_version_sst_ids) = { + let ( + latest_valid_version_id, + latest_valid_version_sst_ids, + latest_valid_version_object_ids, + ) = { ( - earliest_valid_version.id, - earliest_valid_version.get_sst_ids(), + latest_valid_version.id, + latest_valid_version.get_sst_ids(), + latest_valid_version.get_object_ids(), ) }; + let mut object_ids_to_delete: HashSet<_> = HashSet::default(); let version_ids_to_delete: Vec = hummock_time_travel_version::Entity::find() .select_only() .column(hummock_time_travel_version::Column::VersionId) .filter( hummock_time_travel_version::Column::VersionId - .lt(earliest_valid_version_id.to_u64()), + .lt(latest_valid_version_id.to_u64()), ) .order_by_desc(hummock_time_travel_version::Column::VersionId) .into_tuple() @@ -131,7 +138,7 @@ impl HummockManager { .column(hummock_time_travel_delta::Column::VersionId) .filter( hummock_time_travel_delta::Column::VersionId - .lt(earliest_valid_version_id.to_u64()), + .lt(latest_valid_version_id.to_u64()), ) .into_tuple() .all(&txn) @@ -146,25 +153,27 @@ impl HummockManager { delta_id_to_delete ))) })?; - let new_sst_ids = HummockVersionDelta::from_persisted_protobuf( + let delta_to_delete = HummockVersionDelta::from_persisted_protobuf( &delta_to_delete.version_delta.to_protobuf(), - ) - .newly_added_sst_ids(); + ); + let new_sst_ids = delta_to_delete.newly_added_sst_ids(); // The SST ids added and then deleted by compaction between the 2 versions. - let sst_ids_to_delete = &new_sst_ids - &earliest_valid_version_sst_ids; + let sst_ids_to_delete = &new_sst_ids - &latest_valid_version_sst_ids; let res = hummock_sstable_info::Entity::delete_many() .filter(hummock_sstable_info::Column::SstId.is_in(sst_ids_to_delete)) .exec(&txn) .await?; + let new_object_ids = delta_to_delete.newly_added_object_ids(); + object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids); tracing::debug!( - delta_id = delta_to_delete.version_id, + delta_id = delta_to_delete.id.to_u64(), "delete {} rows from hummock_sstable_info", res.rows_affected ); } - let mut next_version_sst_ids = earliest_valid_version_sst_ids; + let mut next_version_sst_ids = latest_valid_version_sst_ids; for prev_version_id in version_ids_to_delete { - let sst_ids = { + let prev_version = { let prev_version = hummock_time_travel_version::Entity::find_by_id(prev_version_id) .one(&txn) .await? @@ -175,14 +184,16 @@ impl HummockManager { ))) })?; HummockVersion::from_persisted_protobuf(&prev_version.version.to_protobuf()) - .get_sst_ids() }; + let sst_ids = prev_version.get_sst_ids(); // The SST ids deleted by compaction between the 2 versions. let sst_ids_to_delete = &sst_ids - &next_version_sst_ids; let res = hummock_sstable_info::Entity::delete_many() .filter(hummock_sstable_info::Column::SstId.is_in(sst_ids_to_delete)) .exec(&txn) .await?; + let new_object_ids = prev_version.get_object_ids(); + object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids); tracing::debug!( prev_version_id, "delete {} rows from hummock_sstable_info", @@ -190,30 +201,49 @@ impl HummockManager { ); next_version_sst_ids = sst_ids; } + { + let mut guard = self.versioning.write().await; + guard + .checkpoint + .stale_objects + .entry(latest_valid_version_id) + .and_modify(|s| { + s.id = + s.id.iter() + .chain(object_ids_to_delete.iter()) + .unique() + .copied() + .collect(); + }) + .or_insert(PbStaleObjects { + id: object_ids_to_delete.into_iter().collect(), + // To avoid unnecessary computations, disregard size in this context. + total_file_size: 0, + }); + } let res = hummock_time_travel_version::Entity::delete_many() .filter( - hummock_time_travel_version::Column::VersionId - .lt(earliest_valid_version_id.to_u64()), + hummock_time_travel_version::Column::VersionId.lt(latest_valid_version_id.to_u64()), ) .exec(&txn) .await?; tracing::debug!( epoch_watermark_version_id = ?version_watermark.version_id, - ?earliest_valid_version_id, + ?latest_valid_version_id, "delete {} rows from hummock_time_travel_version", res.rows_affected ); let res = hummock_time_travel_delta::Entity::delete_many() .filter( - hummock_time_travel_delta::Column::VersionId.lt(earliest_valid_version_id.to_u64()), + hummock_time_travel_delta::Column::VersionId.lt(latest_valid_version_id.to_u64()), ) .exec(&txn) .await?; tracing::debug!( epoch_watermark_version_id = ?version_watermark.version_id, - ?earliest_valid_version_id, + ?latest_valid_version_id, "delete {} rows from hummock_time_travel_delta", res.rows_affected ); diff --git a/src/meta/src/hummock/mod.rs b/src/meta/src/hummock/mod.rs index 7b8af79ae960..162d46b4cae1 100644 --- a/src/meta/src/hummock/mod.rs +++ b/src/meta/src/hummock/mod.rs @@ -15,6 +15,9 @@ pub mod compaction; pub mod compactor_manager; pub mod error; mod manager; + +use std::collections::HashSet; + pub use manager::*; use thiserror_ext::AsReport; @@ -33,11 +36,13 @@ pub use mock_hummock_meta_client::MockHummockMetaClient; use tokio::sync::oneshot::Sender; use tokio::task::JoinHandle; +use crate::backup_restore::BackupManagerRef; use crate::MetaOpts; /// Start hummock's asynchronous tasks. pub fn start_hummock_workers( hummock_manager: HummockManagerRef, + backup_manager: BackupManagerRef, meta_opts: &MetaOpts, ) -> Vec<(JoinHandle<()>, Sender<()>)> { // These critical tasks are put in their own timer loop deliberately, to avoid long-running ones @@ -45,6 +50,7 @@ pub fn start_hummock_workers( let workers = vec![ start_checkpoint_loop( hummock_manager.clone(), + backup_manager, Duration::from_secs(meta_opts.hummock_version_checkpoint_interval_sec), meta_opts.min_delta_log_num_for_hummock_version_checkpoint, ), @@ -85,6 +91,7 @@ pub fn start_vacuum_metadata_loop( pub fn start_checkpoint_loop( hummock_manager: HummockManagerRef, + backup_manager: BackupManagerRef, interval: Duration, min_delta_log_num: u64, ) -> (JoinHandle<()>, Sender<()>) { @@ -92,6 +99,7 @@ pub fn start_checkpoint_loop( let join_handle = tokio::spawn(async move { let mut min_trigger_interval = tokio::time::interval(interval); min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + let mut accumulated_may_delete_object_ids = HashSet::new(); loop { tokio::select! { // Wait for interval @@ -107,11 +115,28 @@ pub fn start_checkpoint_loop( { continue; } - if let Err(err) = hummock_manager + match hummock_manager .create_version_checkpoint(min_delta_log_num) .await { - tracing::warn!(error = %err.as_report(), "Hummock version checkpoint error"); + Ok((_, may_delete_objects)) => { + accumulated_may_delete_object_ids.extend(may_delete_objects); + const MIN_MINOR_GC_OBJECT_COUNT: usize = 1000; + if accumulated_may_delete_object_ids.len() >= MIN_MINOR_GC_OBJECT_COUNT { + let _ = hummock_manager + .start_minor_gc( + backup_manager.clone(), + accumulated_may_delete_object_ids.drain(), + ) + .await + .inspect_err(|err| { + tracing::warn!(error = %err.as_report(), "Hummock minor GC error."); + }); + }; + } + Err(err) => { + tracing::warn!(error = %err.as_report(), "Hummock version checkpoint error.") + } } } });