From 8f0d5decac74f806046b06f197e879664793cd20 Mon Sep 17 00:00:00 2001 From: zwang28 <70626450+zwang28@users.noreply.github.com> Date: Fri, 27 Sep 2024 13:38:21 +0800 Subject: [PATCH] refactor(storage): support pagination for LIST during full GC (#18737) --- proto/hummock.proto | 3 ++ src/common/src/config.rs | 8 +++ src/config/docs.md | 1 + src/config/example.toml | 1 + src/ctl/src/cmd_impl/hummock/sst_dump.rs | 2 +- src/meta/node/src/lib.rs | 1 + src/meta/service/src/hummock_service.rs | 5 +- src/meta/src/hummock/manager/gc.rs | 48 +++++++++++++++-- src/meta/src/hummock/manager/mod.rs | 6 ++- src/meta/src/hummock/manager/timer_task.rs | 4 +- .../src/hummock/mock_hummock_meta_client.rs | 1 + src/meta/src/manager/env.rs | 3 ++ src/object_store/src/object/mem.rs | 15 +++++- src/object_store/src/object/mod.rs | 25 +++++++-- .../opendal_engine/opendal_object_store.rs | 20 ++++--- src/object_store/src/object/s3.rs | 39 +++++++++++--- src/object_store/src/object/sim/mod.rs | 13 ++++- src/rpc_client/src/hummock_meta_client.rs | 1 + src/rpc_client/src/meta_client.rs | 2 + src/storage/hummock_test/src/vacuum_tests.rs | 12 +++-- src/storage/src/hummock/compactor/mod.rs | 6 ++- .../src/hummock/hummock_meta_client.rs | 8 ++- src/storage/src/hummock/sstable_store.rs | 4 +- src/storage/src/hummock/vacuum.rs | 52 +++++++++++++------ 24 files changed, 226 insertions(+), 54 deletions(-) diff --git a/proto/hummock.proto b/proto/hummock.proto index 92c1494707fb..546e7edc0837 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -545,6 +545,8 @@ message VacuumTask { message FullScanTask { uint64 sst_retention_time_sec = 1; optional string prefix = 2; + optional string start_after = 3; + optional uint64 limit = 4; } // Cancel compact task @@ -580,6 +582,7 @@ message ReportFullScanTaskRequest { uint64 total_object_count = 2; // Total size of objects before filtered by conditions specified by FullScanTask. uint64 total_object_size = 3; + optional string next_start_after = 4; } message ReportFullScanTaskResponse { diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 7ab8b5d84c69..d36299bfea37 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -192,6 +192,10 @@ pub struct MetaConfig { #[serde(default = "default::meta::full_gc_interval_sec")] pub full_gc_interval_sec: u64, + /// Max number of object per full GC job can fetch. + #[serde(default = "default::meta::full_gc_object_limit")] + pub full_gc_object_limit: u64, + /// The spin interval when collecting global GC watermark in hummock. #[serde(default = "default::meta::collect_gc_watermark_spin_interval_sec")] pub collect_gc_watermark_spin_interval_sec: u64, @@ -1346,6 +1350,10 @@ pub mod default { 86400 } + pub fn full_gc_object_limit() -> u64 { + 100_000 + } + pub fn collect_gc_watermark_spin_interval_sec() -> u64 { 5 } diff --git a/src/config/docs.md b/src/config/docs.md index edf32965c492..3e66bc56535d 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -39,6 +39,7 @@ This page is automatically generated by `./risedev generate-example-config` | event_log_channel_max_size | Keeps the latest N events per channel. | 10 | | event_log_enabled | | true | | full_gc_interval_sec | Interval of automatic hummock full GC. | 86400 | +| full_gc_object_limit | Max number of object per full GC job can fetch. | 100000 | | hummock_time_travel_snapshot_interval | The interval at which a Hummock version snapshot is taken for time travel. Larger value indicates less storage overhead but worse query performance. | 100 | | hummock_version_checkpoint_interval_sec | Interval of hummock version checkpoint. | 30 | | hybrid_partition_vnode_count | Count of partitions of tables in default group and materialized view group. The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment. Each partition contains aligned data of `vnode_count / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. Set it zero to disable this feature. | 4 | diff --git a/src/config/example.toml b/src/config/example.toml index 7d56b1a11c63..1c172b77efe5 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -16,6 +16,7 @@ dir = "./" [meta] min_sst_retention_time_sec = 86400 full_gc_interval_sec = 86400 +full_gc_object_limit = 100000 collect_gc_watermark_spin_interval_sec = 5 periodic_compaction_interval_sec = 60 vacuum_interval_sec = 30 diff --git a/src/ctl/src/cmd_impl/hummock/sst_dump.rs b/src/ctl/src/cmd_impl/hummock/sst_dump.rs index 51b776ad1b2c..dc03d4fb8ca9 100644 --- a/src/ctl/src/cmd_impl/hummock/sst_dump.rs +++ b/src/ctl/src/cmd_impl/hummock/sst_dump.rs @@ -138,7 +138,7 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result .await?; } else { let mut metadata_iter = sstable_store - .list_object_metadata_from_object_store(None) + .list_object_metadata_from_object_store(None, None, None) .await?; while let Some(obj) = metadata_iter.try_next().await? { print_object(&obj); diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index 439a4a6eded5..20d239efc103 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -386,6 +386,7 @@ pub fn start( .min_delta_log_num_for_hummock_version_checkpoint, min_sst_retention_time_sec: config.meta.min_sst_retention_time_sec, full_gc_interval_sec: config.meta.full_gc_interval_sec, + full_gc_object_limit: config.meta.full_gc_object_limit, collect_gc_watermark_spin_interval_sec: config .meta .collect_gc_watermark_spin_interval_sec, diff --git a/src/meta/service/src/hummock_service.rs b/src/meta/service/src/hummock_service.rs index 5830951d3e5f..17b87e3a6b37 100644 --- a/src/meta/service/src/hummock_service.rs +++ b/src/meta/service/src/hummock_service.rs @@ -277,7 +277,10 @@ impl HummockManagerService for HummockServiceImpl { // The following operation takes some time, so we do it in dedicated task and responds the // RPC immediately. tokio::spawn(async move { - match hummock_manager.complete_full_gc(req.object_ids).await { + match hummock_manager + .complete_full_gc(req.object_ids, req.next_start_after) + .await + { Ok(number) => { tracing::info!("Full GC results {} SSTs to delete", number); } diff --git a/src/meta/src/hummock/manager/gc.rs b/src/meta/src/hummock/manager/gc.rs index 596c36857907..a548025781e1 100644 --- a/src/meta/src/hummock/manager/gc.rs +++ b/src/meta/src/hummock/manager/gc.rs @@ -188,14 +188,19 @@ impl HummockManager { sst_retention_time, Duration::from_secs(self.env.opts.min_sst_retention_time_sec), ); + let start_after = self.full_gc_state.next_start_after(); + let limit = self.full_gc_state.limit; tracing::info!( retention_sec = sst_retention_time.as_secs(), prefix = prefix.as_ref().unwrap_or(&String::from("")), + start_after, + limit, "run full GC" ); + let compactor = match self.compactor_manager.next_compactor() { None => { - tracing::warn!("Try full GC but no available idle worker."); + tracing::warn!("full GC attempt but no available idle worker"); return Ok(false); } Some(compactor) => compactor, @@ -204,6 +209,8 @@ impl HummockManager { .send_event(ResponseEvent::FullScanTask(FullScanTask { sst_retention_time_sec: sst_retention_time.as_secs(), prefix, + start_after, + limit, })) .map_err(|_| Error::CompactorUnreachable(compactor.context_id()))?; Ok(true) @@ -211,7 +218,12 @@ impl HummockManager { /// Given candidate SSTs to GC, filter out false positive. /// Returns number of SSTs to GC. - pub async fn complete_full_gc(&self, object_ids: Vec) -> Result { + pub async fn complete_full_gc( + &self, + object_ids: Vec, + next_start_after: Option, + ) -> Result { + self.full_gc_state.set_next_start_after(next_start_after); if object_ids.is_empty() { tracing::info!("SST full scan returns no SSTs."); return Ok(0); @@ -252,6 +264,28 @@ impl HummockManager { } } +pub struct FullGcState { + next_start_after: Mutex>, + limit: Option, +} + +impl FullGcState { + pub fn new(limit: Option) -> Self { + Self { + next_start_after: Mutex::new(None), + limit, + } + } + + pub fn set_next_start_after(&self, next_start_after: Option) { + *self.next_start_after.lock() = next_start_after; + } + + pub fn next_start_after(&self) -> Option { + self.next_start_after.lock().clone() + } +} + /// Collects SST GC watermark from related cluster nodes and calculates a global one. /// /// It must wait enough heartbeats first. This precondition is checked at `spin_interval`. @@ -403,7 +437,10 @@ mod tests { ); // Empty input results immediate return, without waiting heartbeat. - hummock_manager.complete_full_gc(vec![]).await.unwrap(); + hummock_manager + .complete_full_gc(vec![], None) + .await + .unwrap(); // mimic CN heartbeat use risingwave_pb::meta::heartbeat_request::extra_info::Info; @@ -428,7 +465,7 @@ mod tests { assert_eq!( 3, hummock_manager - .complete_full_gc(vec![1, 2, 3]) + .complete_full_gc(vec![1, 2, 3], None) .await .unwrap() ); @@ -452,7 +489,8 @@ mod tests { 1, hummock_manager .complete_full_gc( - [committed_object_ids, vec![max_committed_object_id + 1]].concat() + [committed_object_ids, vec![max_committed_object_id + 1]].concat(), + None ) .await .unwrap() diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 4cf29ca060e1..dea2314df75a 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -44,7 +44,7 @@ use crate::hummock::compaction::CompactStatus; use crate::hummock::error::Result; use crate::hummock::manager::checkpoint::HummockVersionCheckpoint; use crate::hummock::manager::context::ContextInfo; -use crate::hummock::manager::gc::DeleteObjectTracker; +use crate::hummock::manager::gc::{DeleteObjectTracker, FullGcState}; use crate::hummock::CompactorManagerRef; use crate::manager::{MetaSrvEnv, MetaStoreImpl, MetadataManager}; use crate::model::{ClusterId, MetadataModel, MetadataModelError}; @@ -113,6 +113,7 @@ pub struct HummockManager { // `compaction_state` will record the types of compact tasks that can be triggered in `hummock` // and suggest types with a certain priority. pub compaction_state: CompactionState, + full_gc_state: FullGcState, } pub type HummockManagerRef = Arc; @@ -246,7 +247,7 @@ impl HummockManager { let version_checkpoint_path = version_checkpoint_path(state_store_dir); let version_archive_dir = version_archive_dir(state_store_dir); let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - + let full_gc_object_limit = env.opts.full_gc_object_limit; let instance = HummockManager { env, versioning: MonitoredRwLock::new( @@ -285,6 +286,7 @@ impl HummockManager { history_table_throughput: parking_lot::RwLock::new(HashMap::default()), compactor_streams_change_tx, compaction_state: CompactionState::new(), + full_gc_state: FullGcState::new(Some(full_gc_object_limit)), }; let instance = Arc::new(instance); instance.init_time_travel_state().await?; diff --git a/src/meta/src/hummock/manager/timer_task.rs b/src/meta/src/hummock/manager/timer_task.rs index f8fd51346f49..eb0b2655d004 100644 --- a/src/meta/src/hummock/manager/timer_task.rs +++ b/src/meta/src/hummock/manager/timer_task.rs @@ -340,8 +340,10 @@ impl HummockManager { } HummockTimerEvent::FullGc => { + let retention_sec = + hummock_manager.env.opts.min_sst_retention_time_sec; if hummock_manager - .start_full_gc(Duration::from_secs(3600), None) + .start_full_gc(Duration::from_secs(retention_sec), None) .is_ok() { tracing::info!("Start full GC from meta node."); diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index c59fb512090f..0b2fafe39b1c 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -246,6 +246,7 @@ impl HummockMetaClient for MockHummockMetaClient { _filtered_object_ids: Vec, _total_object_count: u64, _total_object_size: u64, + _next_start_after: Option, ) -> Result<()> { unimplemented!() } diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index bf47b5fd1e54..409a05e9d59a 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -186,6 +186,8 @@ pub struct MetaOpts { pub min_sst_retention_time_sec: u64, /// Interval of automatic hummock full GC. pub full_gc_interval_sec: u64, + /// Max number of object per full GC job can fetch. + pub full_gc_object_limit: u64, /// The spin interval when collecting global GC watermark in hummock pub collect_gc_watermark_spin_interval_sec: u64, /// Enable sanity check when SSTs are committed @@ -318,6 +320,7 @@ impl MetaOpts { min_delta_log_num_for_hummock_version_checkpoint: 1, min_sst_retention_time_sec: 3600 * 24 * 7, full_gc_interval_sec: 3600 * 24 * 7, + full_gc_object_limit: 100_000, collect_gc_watermark_spin_interval_sec: 5, enable_committed_sst_sanity_check: false, periodic_compaction_interval_sec: 60, diff --git a/src/object_store/src/object/mem.rs b/src/object_store/src/object/mem.rs index 270cac3719d6..7d622984450f 100644 --- a/src/object_store/src/object/mem.rs +++ b/src/object_store/src/object/mem.rs @@ -182,19 +182,30 @@ impl ObjectStore for InMemObjectStore { Ok(()) } - async fn list(&self, prefix: &str) -> ObjectResult { + async fn list( + &self, + prefix: &str, + start_after: Option, + limit: Option, + ) -> ObjectResult { let list_result = self .objects .lock() .await .iter() .filter_map(|(path, (metadata, _))| { + if let Some(ref start_after) = start_after + && metadata.key.le(start_after) + { + return None; + } if path.starts_with(prefix) { return Some(metadata.clone()); } None }) .sorted_by(|a, b| Ord::cmp(&a.key, &b.key)) + .take(limit.unwrap_or(usize::MAX)) .collect_vec(); Ok(Box::pin(InMemObjectIter::new(list_result))) } @@ -376,7 +387,7 @@ mod tests { async fn list_all(prefix: &str, store: &InMemObjectStore) -> Vec { store - .list(prefix) + .list(prefix, None, None) .await .unwrap() .try_collect::>() diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index aff197263f8f..a304266d4641 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -121,7 +121,12 @@ pub trait ObjectStore: Send + Sync { MonitoredObjectStore::new(self, metrics, config) } - async fn list(&self, prefix: &str) -> ObjectResult; + async fn list( + &self, + prefix: &str, + start_after: Option, + limit: Option, + ) -> ObjectResult; fn store_media_type(&self) -> &'static str; @@ -326,8 +331,13 @@ impl ObjectStoreImpl { object_store_impl_method_body!(self, delete_objects(paths).await) } - pub async fn list(&self, prefix: &str) -> ObjectResult { - object_store_impl_method_body!(self, list(prefix).await) + pub async fn list( + &self, + prefix: &str, + start_after: Option, + limit: Option, + ) -> ObjectResult { + object_store_impl_method_body!(self, list(prefix, start_after, limit).await) } pub fn get_object_prefix(&self, obj_id: u64, use_new_object_prefix_strategy: bool) -> String { @@ -814,7 +824,12 @@ impl MonitoredObjectStore { res } - pub async fn list(&self, prefix: &str) -> ObjectResult { + pub async fn list( + &self, + prefix: &str, + start_after: Option, + limit: Option, + ) -> ObjectResult { let operation_type = OperationType::List; let operation_type_str = operation_type.as_str(); let media_type = self.media_type(); @@ -827,7 +842,7 @@ impl MonitoredObjectStore { let builder = || async { self.inner - .list(prefix) + .list(prefix, start_after.clone(), limit) .verbose_instrument_await(operation_type_str) .await }; diff --git a/src/object_store/src/object/opendal_engine/opendal_object_store.rs b/src/object_store/src/object/opendal_engine/opendal_object_store.rs index 6855ae951956..8f629027c18b 100644 --- a/src/object_store/src/object/opendal_engine/opendal_object_store.rs +++ b/src/object_store/src/object/opendal_engine/opendal_object_store.rs @@ -231,13 +231,21 @@ impl ObjectStore for OpendalObjectStore { Ok(()) } - async fn list(&self, prefix: &str) -> ObjectResult { - let object_lister = self + async fn list( + &self, + prefix: &str, + start_after: Option, + limit: Option, + ) -> ObjectResult { + let mut object_lister = self .op .lister_with(prefix) .recursive(true) - .metakey(Metakey::ContentLength) - .await?; + .metakey(Metakey::ContentLength); + if let Some(start_after) = start_after { + object_lister = object_lister.start_after(&start_after); + } + let object_lister = object_lister.await?; let stream = stream::unfold(object_lister, |mut object_lister| async move { match object_lister.next().await { @@ -261,7 +269,7 @@ impl ObjectStore for OpendalObjectStore { } }); - Ok(stream.boxed()) + Ok(stream.take(limit.unwrap_or(usize::MAX)).boxed()) } fn store_media_type(&self) -> &'static str { @@ -428,7 +436,7 @@ mod tests { async fn list_all(prefix: &str, store: &OpendalObjectStore) -> Vec { store - .list(prefix) + .list(prefix, None, None) .await .unwrap() .try_collect::>() diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs index 7ea687559ab3..3117f6e55bc4 100644 --- a/src/object_store/src/object/s3.rs +++ b/src/object_store/src/object/s3.rs @@ -619,13 +619,22 @@ impl ObjectStore for S3ObjectStore { Ok(()) } - async fn list(&self, prefix: &str) -> ObjectResult { - Ok(Box::pin(S3ObjectIter::new( - self.client.clone(), - self.bucket.clone(), - prefix.to_string(), - self.config.clone(), - ))) + async fn list( + &self, + prefix: &str, + start_after: Option, + limit: Option, + ) -> ObjectResult { + Ok(Box::pin( + S3ObjectIter::new( + self.client.clone(), + self.bucket.clone(), + prefix.to_string(), + self.config.clone(), + start_after, + ) + .take(limit.unwrap_or(usize::MAX)), + )) } fn store_media_type(&self) -> &'static str { @@ -947,10 +956,17 @@ struct S3ObjectIter { >, config: Arc, + start_after: Option, } impl S3ObjectIter { - fn new(client: Client, bucket: String, prefix: String, config: Arc) -> Self { + fn new( + client: Client, + bucket: String, + prefix: String, + config: Arc, + start_after: Option, + ) -> Self { Self { buffer: VecDeque::default(), client, @@ -960,6 +976,7 @@ impl S3ObjectIter { is_truncated: Some(true), send_future: None, config, + start_after, } } } @@ -978,6 +995,8 @@ impl Stream for S3ObjectIter { self.is_truncated = is_truncated; self.buffer.extend(more); self.send_future = None; + // only the first request may set start_after + self.start_after = None; self.poll_next(cx) } Err(e) => { @@ -994,6 +1013,10 @@ impl Stream for S3ObjectIter { .list_objects_v2() .bucket(&self.bucket) .prefix(&self.prefix); + #[cfg(not(madsim))] + if let Some(start_after) = self.start_after.as_ref() { + request = request.start_after(start_after); + } if let Some(continuation_token) = self.next_continuation_token.as_ref() { request = request.continuation_token(continuation_token); } diff --git a/src/object_store/src/object/sim/mod.rs b/src/object_store/src/object/sim/mod.rs index 2f06b9183909..f9ec4b85c5a4 100644 --- a/src/object_store/src/object/sim/mod.rs +++ b/src/object_store/src/object/sim/mod.rs @@ -221,7 +221,18 @@ impl ObjectStore for SimObjectStore { } } - async fn list(&self, path: &str) -> ObjectResult { + async fn list( + &self, + path: &str, + start_after: Option, + limit: Option, + ) -> ObjectResult { + if let Some(start_after) = start_after { + tracing::warn!(start_after, "start_after is ignored by SimObjectStore"); + } + if let Some(limit) = limit { + tracing::warn!(limit, "limit is ignored by SimObjectStore"); + } let path = path.to_string(); let resp = self .client diff --git a/src/rpc_client/src/hummock_meta_client.rs b/src/rpc_client/src/hummock_meta_client.rs index 5239d000353c..2006b16e8d5f 100644 --- a/src/rpc_client/src/hummock_meta_client.rs +++ b/src/rpc_client/src/hummock_meta_client.rs @@ -54,6 +54,7 @@ pub trait HummockMetaClient: Send + Sync + 'static { filtered_object_ids: Vec, total_object_count: u64, total_object_size: u64, + next_start_after: Option, ) -> Result<()>; async fn trigger_full_gc( &self, diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index bdbba36d17a8..8e7dc5dee1b4 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -1546,11 +1546,13 @@ impl HummockMetaClient for MetaClient { filtered_object_ids: Vec, total_object_count: u64, total_object_size: u64, + next_start_after: Option, ) -> Result<()> { let req = ReportFullScanTaskRequest { object_ids: filtered_object_ids, total_object_count, total_object_size, + next_start_after, }; self.inner.report_full_scan_task(req).await?; Ok(()) diff --git a/src/storage/hummock_test/src/vacuum_tests.rs b/src/storage/hummock_test/src/vacuum_tests.rs index 5c6dbd20587c..e68a2de4b00a 100644 --- a/src/storage/hummock_test/src/vacuum_tests.rs +++ b/src/storage/hummock_test/src/vacuum_tests.rs @@ -93,8 +93,10 @@ async fn test_full_scan() { let task = FullScanTask { sst_retention_time_sec: 10000, prefix: None, + start_after: None, + limit: None, }; - let (scan_result, _, _) = Vacuum::full_scan_inner(task, object_metadata_iter.clone()) + let (scan_result, ..) = Vacuum::full_scan_inner(task, object_metadata_iter.clone()) .await .unwrap(); assert!(scan_result.is_empty()); @@ -102,8 +104,10 @@ async fn test_full_scan() { let task = FullScanTask { sst_retention_time_sec: 6000, prefix: None, + start_after: None, + limit: None, }; - let (scan_result, _, _) = Vacuum::full_scan_inner(task, object_metadata_iter.clone()) + let (scan_result, ..) = Vacuum::full_scan_inner(task, object_metadata_iter.clone()) .await .unwrap(); assert_eq!(scan_result.into_iter().sorted().collect_vec(), vec![1]); @@ -111,8 +115,10 @@ async fn test_full_scan() { let task = FullScanTask { sst_retention_time_sec: 2000, prefix: None, + start_after: None, + limit: None, }; - let (scan_result, _, _) = Vacuum::full_scan_inner(task, object_metadata_iter) + let (scan_result, ..) = Vacuum::full_scan_inner(task, object_metadata_iter) .await .unwrap(); assert_eq!(scan_result.into_iter().sorted().collect_vec(), vec![1, 2]); diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index 12e009bb60cb..b535bd3945cd 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -594,11 +594,12 @@ pub fn start_compactor( ) .await { - Ok((object_ids, total_object_count, total_object_size)) => { + Ok((object_ids, total_object_count, total_object_size, next_start_after)) => { Vacuum::report_full_scan_task( object_ids, total_object_count, total_object_size, + next_start_after, meta_client, ) .await; @@ -804,11 +805,12 @@ pub fn start_shared_compactor( match Vacuum::handle_full_scan_task(full_scan_task, context.sstable_store.clone()) .await { - Ok((object_ids, total_object_count, total_object_size)) => { + Ok((object_ids, total_object_count, total_object_size, next_start_after)) => { let report_full_scan_task_request = ReportFullScanTaskRequest { object_ids, total_object_count, total_object_size, + next_start_after, }; match cloned_grpc_proxy_client .report_full_scan_task(report_full_scan_task_request) diff --git a/src/storage/src/hummock/hummock_meta_client.rs b/src/storage/src/hummock/hummock_meta_client.rs index 53f15ebc1fb3..7e07a6ed9c89 100644 --- a/src/storage/src/hummock/hummock_meta_client.rs +++ b/src/storage/src/hummock/hummock_meta_client.rs @@ -98,9 +98,15 @@ impl HummockMetaClient for MonitoredHummockMetaClient { filtered_object_ids: Vec, total_object_count: u64, total_object_size: u64, + next_start_after: Option, ) -> Result<()> { self.meta_client - .report_full_scan_task(filtered_object_ids, total_object_count, total_object_size) + .report_full_scan_task( + filtered_object_ids, + total_object_count, + total_object_size, + next_start_after, + ) .await } diff --git a/src/storage/src/hummock/sstable_store.rs b/src/storage/src/hummock/sstable_store.rs index d1367b92a9ce..11f8c2c32740 100644 --- a/src/storage/src/hummock/sstable_store.rs +++ b/src/storage/src/hummock/sstable_store.rs @@ -614,9 +614,11 @@ impl SstableStore { pub async fn list_object_metadata_from_object_store( &self, prefix: Option, + start_after: Option, + limit: Option, ) -> HummockResult { let list_path = format!("{}/{}", self.path, prefix.unwrap_or("".into())); - let raw_iter = self.store.list(&list_path).await?; + let raw_iter = self.store.list(&list_path, start_after, limit).await?; let iter = raw_iter.filter(|r| match r { Ok(i) => future::ready(i.key.ends_with(&format!(".{}", OBJECT_SUFFIX))), Err(_) => future::ready(true), diff --git a/src/storage/src/hummock/vacuum.rs b/src/storage/src/hummock/vacuum.rs index fb4c9d782215..17095e994e28 100644 --- a/src/storage/src/hummock/vacuum.rs +++ b/src/storage/src/hummock/vacuum.rs @@ -35,7 +35,7 @@ impl Vacuum { sstable_store: SstableStoreRef, sstable_object_ids: &[u64], ) -> HummockResult<()> { - tracing::info!("Try to vacuum SSTs {:?}", sstable_object_ids); + tracing::info!("try to vacuum SSTs {:?}", sstable_object_ids); sstable_store.delete_list(sstable_object_ids).await?; Ok(()) } @@ -46,10 +46,10 @@ impl Vacuum { ) -> bool { match hummock_meta_client.report_vacuum_task(vacuum_task).await { Ok(_) => { - tracing::info!("Finished vacuuming SSTs"); + tracing::info!("vacuuming SSTs succeeded"); } Err(e) => { - tracing::warn!(error = %e.as_report(), "Failed to report vacuum task"); + tracing::warn!(error = %e.as_report(), "failed to report vacuum task"); return false; } } @@ -59,7 +59,7 @@ impl Vacuum { pub async fn full_scan_inner( full_scan_task: FullScanTask, metadata_iter: ObjectMetadataIter, - ) -> HummockResult<(Vec, u64, u64)> { + ) -> HummockResult<(Vec, u64, u64, Option)> { let timestamp_watermark = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() @@ -68,12 +68,21 @@ impl Vacuum { let mut total_object_count = 0; let mut total_object_size = 0; + let mut next_start_after: Option = None; let filtered = metadata_iter .filter_map(|r| { let result = match r { Ok(o) => { total_object_count += 1; total_object_size += o.total_size; + // Determine if the LIST has been truncated. + // A false positives would at most cost one additional LIST later. + if let Some(limit) = full_scan_task.limit + && limit == total_object_count + { + next_start_after = Some(o.key.clone()); + tracing::debug!(next_start_after, "set next start after"); + } if o.last_modified < timestamp_watermark { Some(Ok(SstableStore::get_object_id_from_path(&o.key))) } else { @@ -88,8 +97,9 @@ impl Vacuum { .await?; Ok(( filtered, - total_object_count as u64, + total_object_count, total_object_size as u64, + next_start_after, )) } @@ -97,14 +107,20 @@ impl Vacuum { pub async fn handle_full_scan_task( full_scan_task: FullScanTask, sstable_store: SstableStoreRef, - ) -> HummockResult<(Vec, u64, u64)> { + ) -> HummockResult<(Vec, u64, u64, Option)> { tracing::info!( timestamp = full_scan_task.sst_retention_time_sec, prefix = full_scan_task.prefix.as_ref().unwrap_or(&String::from("")), - "Try to full scan SSTs" + start_after = full_scan_task.start_after, + limit = full_scan_task.limit, + "try to full scan SSTs" ); let metadata_iter = sstable_store - .list_object_metadata_from_object_store(full_scan_task.prefix.clone()) + .list_object_metadata_from_object_store( + full_scan_task.prefix.clone(), + full_scan_task.start_after.clone(), + full_scan_task.limit.map(|i| i as usize), + ) .await?; Vacuum::full_scan_inner(full_scan_task, metadata_iter).await } @@ -113,24 +129,30 @@ impl Vacuum { filtered_object_ids: Vec, unfiltered_count: u64, unfiltered_size: u64, + next_start_after: Option, hummock_meta_client: Arc, ) -> bool { - tracing::info!("Try to report full scan task",); tracing::info!( - "filtered_object_ids length = {}, unfiltered_count = {}, unfiltered_size = {}", - filtered_object_ids.len(), + filtered_object_ids_len = filtered_object_ids.len(), unfiltered_count, - unfiltered_size + unfiltered_size, + next_start_after, + "try to report full scan task" ); match hummock_meta_client - .report_full_scan_task(filtered_object_ids, unfiltered_count, unfiltered_size) + .report_full_scan_task( + filtered_object_ids, + unfiltered_count, + unfiltered_size, + next_start_after, + ) .await { Ok(_) => { - tracing::info!("Finished full scan SSTs"); + tracing::info!("full scan SSTs succeeded"); } Err(e) => { - tracing::warn!(error = %e.as_report(), "Failed to report full scan task"); + tracing::warn!(error = %e.as_report(), "failed to report full scan task"); return false; } }