diff --git a/proto/hummock.proto b/proto/hummock.proto index 947904df00868..e19faee10c43e 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -571,6 +571,7 @@ message VacuumTask { // Scan object store to get candidate orphan SSTs. message FullScanTask { uint64 sst_retention_time_sec = 1; + optional string prefix = 2; } // Cancel compact task @@ -614,6 +615,7 @@ message ReportFullScanTaskResponse { message TriggerFullGCRequest { uint64 sst_retention_time_sec = 1; + optional string prefix = 2; } message TriggerFullGCResponse { diff --git a/src/ctl/src/cmd_impl/hummock/sst_dump.rs b/src/ctl/src/cmd_impl/hummock/sst_dump.rs index 3a71fbd007214..4d75912e0d4d5 100644 --- a/src/ctl/src/cmd_impl/hummock/sst_dump.rs +++ b/src/ctl/src/cmd_impl/hummock/sst_dump.rs @@ -137,7 +137,7 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result .await?; } else { let mut metadata_iter = sstable_store - .list_object_metadata_from_object_store() + .list_object_metadata_from_object_store(None) .await?; while let Some(obj) = metadata_iter.try_next().await? { print_object(&obj); diff --git a/src/ctl/src/cmd_impl/hummock/trigger_full_gc.rs b/src/ctl/src/cmd_impl/hummock/trigger_full_gc.rs index f5e90eba48d88..bded39fa2adb8 100644 --- a/src/ctl/src/cmd_impl/hummock/trigger_full_gc.rs +++ b/src/ctl/src/cmd_impl/hummock/trigger_full_gc.rs @@ -19,9 +19,12 @@ use crate::CtlContext; pub async fn trigger_full_gc( context: &CtlContext, sst_retention_time_sec: u64, + prefix: Option, ) -> anyhow::Result<()> { let meta_client = context.meta_client().await?; - let result = meta_client.trigger_full_gc(sst_retention_time_sec).await; + let result = meta_client + .trigger_full_gc(sst_retention_time_sec, prefix) + .await; println!("{:#?}", result); Ok(()) } diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index bbd25a8612a23..efbd681f8cae5 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -210,14 +210,16 @@ enum HummockCommands { #[clap(short, long = "level", default_value_t = 1)] level: u32, - #[clap(short, long = "sst-ids")] + #[clap(short, long = "sst-ids", value_delimiter = ',')] sst_ids: Vec, }, - /// trigger a full GC for SSTs that is not in version and with timestamp <= now - - /// `sst_retention_time_sec`. + /// Trigger a full GC for SSTs that is not pinned, with timestamp <= now - + /// `sst_retention_time_sec`, and with `prefix` in path. TriggerFullGc { #[clap(short, long = "sst_retention_time_sec", default_value_t = 259200)] sst_retention_time_sec: u64, + #[clap(short, long = "prefix", required = false)] + prefix: Option, }, /// List pinned versions of each worker. ListPinnedVersions {}, @@ -227,7 +229,7 @@ enum HummockCommands { ListCompactionGroup, /// Update compaction config for compaction groups. UpdateCompactionConfig { - #[clap(long)] + #[clap(long, value_delimiter = ',')] compaction_group_ids: Vec, #[clap(long)] max_bytes_for_level_base: Option, @@ -270,7 +272,7 @@ enum HummockCommands { SplitCompactionGroup { #[clap(long)] compaction_group_id: u64, - #[clap(long)] + #[clap(long, value_delimiter = ',')] table_ids: Vec, }, /// Pause version checkpoint, which subsequently pauses GC of delta log and SST object. @@ -447,7 +449,10 @@ enum MetaCommands { opts: RestoreOpts, }, /// delete meta snapshots - DeleteMetaSnapshots { snapshot_ids: Vec }, + DeleteMetaSnapshots { + #[clap(long, value_delimiter = ',')] + snapshot_ids: Vec, + }, /// List all existing connections in the catalog ListConnections, @@ -626,7 +631,8 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { } Commands::Hummock(HummockCommands::TriggerFullGc { sst_retention_time_sec, - }) => cmd_impl::hummock::trigger_full_gc(context, sst_retention_time_sec).await?, + prefix, + }) => cmd_impl::hummock::trigger_full_gc(context, sst_retention_time_sec, prefix).await?, Commands::Hummock(HummockCommands::ListPinnedVersions {}) => { list_pinned_versions(context).await? } diff --git a/src/meta/service/src/hummock_service.rs b/src/meta/service/src/hummock_service.rs index ab69d6da4ea2e..3e5a26d2a7771 100644 --- a/src/meta/service/src/hummock_service.rs +++ b/src/meta/service/src/hummock_service.rs @@ -320,9 +320,9 @@ impl HummockManagerService for HummockServiceImpl { &self, request: Request, ) -> Result, Status> { - self.hummock_manager.start_full_gc(Duration::from_secs( - request.into_inner().sst_retention_time_sec, - ))?; + let req = request.into_inner(); + self.hummock_manager + .start_full_gc(Duration::from_secs(req.sst_retention_time_sec), req.prefix)?; Ok(Response::new(TriggerFullGcResponse { status: None })) } diff --git a/src/meta/src/hummock/manager/gc.rs b/src/meta/src/hummock/manager/gc.rs index 27c3bfc0ade71..f1e4a4ba426da 100644 --- a/src/meta/src/hummock/manager/gc.rs +++ b/src/meta/src/hummock/manager/gc.rs @@ -177,7 +177,11 @@ impl HummockManager { /// 3. Meta node decides which SSTs to delete. See `HummockManager::complete_full_gc`. /// /// Returns Ok(false) if there is no worker available. - pub fn start_full_gc(&self, sst_retention_time: Duration) -> Result { + pub fn start_full_gc( + &self, + sst_retention_time: Duration, + prefix: Option, + ) -> Result { self.metrics.full_gc_trigger_count.inc(); // Set a minimum sst_retention_time to avoid deleting SSTs of on-going write op. let sst_retention_time = cmp::max( @@ -185,8 +189,9 @@ impl HummockManager { Duration::from_secs(self.env.opts.min_sst_retention_time_sec), ); tracing::info!( - "run full GC with sst_retention_time = {} secs", - sst_retention_time.as_secs() + retention_sec = sst_retention_time.as_secs(), + prefix = prefix.as_ref().unwrap_or(&String::from("")), + "run full GC" ); let compactor = match self.compactor_manager.next_compactor() { None => { @@ -198,6 +203,7 @@ impl HummockManager { compactor .send_event(ResponseEvent::FullScanTask(FullScanTask { sst_retention_time_sec: sst_retention_time.as_secs(), + prefix, })) .map_err(|_| Error::CompactorUnreachable(compactor.context_id()))?; Ok(true) @@ -344,17 +350,19 @@ mod tests { // No task scheduled because no available worker. assert!(!hummock_manager - .start_full_gc(Duration::from_secs( - hummock_manager.env.opts.min_sst_retention_time_sec - 1 - )) + .start_full_gc( + Duration::from_secs(hummock_manager.env.opts.min_sst_retention_time_sec - 1,), + None + ) .unwrap()); let mut receiver = compactor_manager.add_compactor(context_id); assert!(hummock_manager - .start_full_gc(Duration::from_secs( - hummock_manager.env.opts.min_sst_retention_time_sec - 1 - )) + .start_full_gc( + Duration::from_secs(hummock_manager.env.opts.min_sst_retention_time_sec - 1), + None + ) .unwrap()); let full_scan_task = match receiver.recv().await.unwrap().unwrap().event.unwrap() { ResponseEvent::FullScanTask(task) => task, @@ -369,9 +377,10 @@ mod tests { ); assert!(hummock_manager - .start_full_gc(Duration::from_secs( - hummock_manager.env.opts.min_sst_retention_time_sec + 1 - )) + .start_full_gc( + Duration::from_secs(hummock_manager.env.opts.min_sst_retention_time_sec + 1), + None + ) .unwrap()); let full_scan_task = match receiver.recv().await.unwrap().unwrap().event.unwrap() { ResponseEvent::FullScanTask(task) => task, diff --git a/src/meta/src/hummock/manager/timer_task.rs b/src/meta/src/hummock/manager/timer_task.rs index bb4a9fa86b06c..b1884edb417a1 100644 --- a/src/meta/src/hummock/manager/timer_task.rs +++ b/src/meta/src/hummock/manager/timer_task.rs @@ -333,7 +333,7 @@ impl HummockManager { HummockTimerEvent::FullGc => { if hummock_manager - .start_full_gc(Duration::from_secs(3600)) + .start_full_gc(Duration::from_secs(3600), None) .is_ok() { tracing::info!("Start full GC from meta node."); diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index f61049980af4f..95e5ad9a368cb 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -224,7 +224,11 @@ impl HummockMetaClient for MockHummockMetaClient { unimplemented!() } - async fn trigger_full_gc(&self, _sst_retention_time_sec: u64) -> Result<()> { + async fn trigger_full_gc( + &self, + _sst_retention_time_sec: u64, + _prefix: Option, + ) -> Result<()> { unimplemented!() } diff --git a/src/rpc_client/src/hummock_meta_client.rs b/src/rpc_client/src/hummock_meta_client.rs index e037114d843c9..5c25a59afa7f8 100644 --- a/src/rpc_client/src/hummock_meta_client.rs +++ b/src/rpc_client/src/hummock_meta_client.rs @@ -54,7 +54,11 @@ pub trait HummockMetaClient: Send + Sync + 'static { total_object_count: u64, total_object_size: u64, ) -> Result<()>; - async fn trigger_full_gc(&self, sst_retention_time_sec: u64) -> Result<()>; + async fn trigger_full_gc( + &self, + sst_retention_time_sec: u64, + prefix: Option, + ) -> Result<()>; async fn subscribe_compaction_event( &self, diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 130f5ef694ab8..7982c7b209991 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -1506,10 +1506,15 @@ impl HummockMetaClient for MetaClient { Ok(()) } - async fn trigger_full_gc(&self, sst_retention_time_sec: u64) -> Result<()> { + async fn trigger_full_gc( + &self, + sst_retention_time_sec: u64, + prefix: Option, + ) -> Result<()> { self.inner .trigger_full_gc(TriggerFullGcRequest { sst_retention_time_sec, + prefix, }) .await?; Ok(()) diff --git a/src/storage/hummock_test/src/vacuum_tests.rs b/src/storage/hummock_test/src/vacuum_tests.rs index 5c6dbdc556d5a..5c6dbd20587cc 100644 --- a/src/storage/hummock_test/src/vacuum_tests.rs +++ b/src/storage/hummock_test/src/vacuum_tests.rs @@ -92,6 +92,7 @@ async fn test_full_scan() { let task = FullScanTask { sst_retention_time_sec: 10000, + prefix: None, }; let (scan_result, _, _) = Vacuum::full_scan_inner(task, object_metadata_iter.clone()) .await @@ -100,6 +101,7 @@ async fn test_full_scan() { let task = FullScanTask { sst_retention_time_sec: 6000, + prefix: None, }; let (scan_result, _, _) = Vacuum::full_scan_inner(task, object_metadata_iter.clone()) .await @@ -108,6 +110,7 @@ async fn test_full_scan() { let task = FullScanTask { sst_retention_time_sec: 2000, + prefix: None, }; let (scan_result, _, _) = Vacuum::full_scan_inner(task, object_metadata_iter) .await diff --git a/src/storage/src/hummock/hummock_meta_client.rs b/src/storage/src/hummock/hummock_meta_client.rs index f30240b1264ce..9663a7787c474 100644 --- a/src/storage/src/hummock/hummock_meta_client.rs +++ b/src/storage/src/hummock/hummock_meta_client.rs @@ -111,9 +111,13 @@ impl HummockMetaClient for MonitoredHummockMetaClient { .await } - async fn trigger_full_gc(&self, sst_retention_time_sec: u64) -> Result<()> { + async fn trigger_full_gc( + &self, + sst_retention_time_sec: u64, + prefix: Option, + ) -> Result<()> { self.meta_client - .trigger_full_gc(sst_retention_time_sec) + .trigger_full_gc(sst_retention_time_sec, prefix) .await } diff --git a/src/storage/src/hummock/sstable_store.rs b/src/storage/src/hummock/sstable_store.rs index c654e12bb7d69..8843ddff7719a 100644 --- a/src/storage/src/hummock/sstable_store.rs +++ b/src/storage/src/hummock/sstable_store.rs @@ -602,8 +602,10 @@ impl SstableStore { pub async fn list_object_metadata_from_object_store( &self, + prefix: Option, ) -> HummockResult { - let raw_iter = self.store.list(&format!("{}/", self.path)).await?; + let list_path = format!("{}/{}", self.path, prefix.unwrap_or("".into())); + let raw_iter = self.store.list(&list_path).await?; let iter = raw_iter.filter(|r| match r { Ok(i) => future::ready(i.key.ends_with(&format!(".{}", OBJECT_SUFFIX))), Err(_) => future::ready(true), diff --git a/src/storage/src/hummock/vacuum.rs b/src/storage/src/hummock/vacuum.rs index 5242a6eae0784..fb4c9d782215b 100644 --- a/src/storage/src/hummock/vacuum.rs +++ b/src/storage/src/hummock/vacuum.rs @@ -99,11 +99,12 @@ impl Vacuum { sstable_store: SstableStoreRef, ) -> HummockResult<(Vec, u64, u64)> { tracing::info!( - "Try to full scan SSTs with timestamp {}", - full_scan_task.sst_retention_time_sec + timestamp = full_scan_task.sst_retention_time_sec, + prefix = full_scan_task.prefix.as_ref().unwrap_or(&String::from("")), + "Try to full scan SSTs" ); let metadata_iter = sstable_store - .list_object_metadata_from_object_store() + .list_object_metadata_from_object_store(full_scan_task.prefix.clone()) .await?; Vacuum::full_scan_inner(full_scan_task, metadata_iter).await }