From 91fe1740e831adadedab31b89627c221fa085496 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Thu, 26 Sep 2024 15:57:38 +0800 Subject: [PATCH] refactor(storage): support pagination for LIST during full GC --- proto/hummock.proto | 3 ++ src/common/src/config.rs | 8 +++ src/ctl/src/cmd_impl/hummock/sst_dump.rs | 2 +- src/meta/node/src/lib.rs | 1 + src/meta/service/src/hummock_service.rs | 5 +- src/meta/src/hummock/manager/gc.rs | 44 ++++++++++++++-- src/meta/src/hummock/manager/mod.rs | 6 ++- .../src/hummock/mock_hummock_meta_client.rs | 1 + src/meta/src/manager/env.rs | 3 ++ src/object_store/src/object/mem.rs | 21 ++++++-- src/object_store/src/object/mod.rs | 25 +++++++-- .../opendal_engine/opendal_object_store.rs | 20 ++++--- src/object_store/src/object/s3.rs | 36 ++++++++++--- src/object_store/src/object/sim/mod.rs | 13 ++++- src/rpc_client/src/hummock_meta_client.rs | 1 + src/rpc_client/src/meta_client.rs | 2 + src/storage/hummock_test/src/vacuum_tests.rs | 12 +++-- src/storage/src/hummock/compactor/mod.rs | 6 ++- .../src/hummock/hummock_meta_client.rs | 8 ++- src/storage/src/hummock/sstable_store.rs | 4 +- src/storage/src/hummock/vacuum.rs | 52 +++++++++++++------ 21 files changed, 218 insertions(+), 55 deletions(-) diff --git a/proto/hummock.proto b/proto/hummock.proto index 92c1494707fbd..546e7edc08370 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -545,6 +545,8 @@ message VacuumTask { message FullScanTask { uint64 sst_retention_time_sec = 1; optional string prefix = 2; + optional string start_after = 3; + optional uint64 limit = 4; } // Cancel compact task @@ -580,6 +582,7 @@ message ReportFullScanTaskRequest { uint64 total_object_count = 2; // Total size of objects before filtered by conditions specified by FullScanTask. uint64 total_object_size = 3; + optional string next_start_after = 4; } message ReportFullScanTaskResponse { diff --git a/src/common/src/config.rs b/src/common/src/config.rs index b312304e80799..7b7220866a112 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -192,6 +192,10 @@ pub struct MetaConfig { #[serde(default = "default::meta::full_gc_interval_sec")] pub full_gc_interval_sec: u64, + /// Max number of object per full GC job can fetch. + #[serde(default = "default::meta::full_gc_object_limit")] + pub full_gc_object_limit: u64, + /// The spin interval when collecting global GC watermark in hummock. #[serde(default = "default::meta::collect_gc_watermark_spin_interval_sec")] pub collect_gc_watermark_spin_interval_sec: u64, @@ -1346,6 +1350,10 @@ pub mod default { 86400 } + pub fn full_gc_object_limit() -> u64 { + 100_000 + } + pub fn collect_gc_watermark_spin_interval_sec() -> u64 { 5 } diff --git a/src/ctl/src/cmd_impl/hummock/sst_dump.rs b/src/ctl/src/cmd_impl/hummock/sst_dump.rs index 51b776ad1b2c2..dc03d4fb8ca9e 100644 --- a/src/ctl/src/cmd_impl/hummock/sst_dump.rs +++ b/src/ctl/src/cmd_impl/hummock/sst_dump.rs @@ -138,7 +138,7 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result .await?; } else { let mut metadata_iter = sstable_store - .list_object_metadata_from_object_store(None) + .list_object_metadata_from_object_store(None, None, None) .await?; while let Some(obj) = metadata_iter.try_next().await? { print_object(&obj); diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index 439a4a6eded53..20d239efc1030 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -386,6 +386,7 @@ pub fn start( .min_delta_log_num_for_hummock_version_checkpoint, min_sst_retention_time_sec: config.meta.min_sst_retention_time_sec, full_gc_interval_sec: config.meta.full_gc_interval_sec, + full_gc_object_limit: config.meta.full_gc_object_limit, collect_gc_watermark_spin_interval_sec: config .meta .collect_gc_watermark_spin_interval_sec, diff --git a/src/meta/service/src/hummock_service.rs b/src/meta/service/src/hummock_service.rs index 5830951d3e5f1..17b87e3a6b376 100644 --- a/src/meta/service/src/hummock_service.rs +++ b/src/meta/service/src/hummock_service.rs @@ -277,7 +277,10 @@ impl HummockManagerService for HummockServiceImpl { // The following operation takes some time, so we do it in dedicated task and responds the // RPC immediately. tokio::spawn(async move { - match hummock_manager.complete_full_gc(req.object_ids).await { + match hummock_manager + .complete_full_gc(req.object_ids, req.next_start_after) + .await + { Ok(number) => { tracing::info!("Full GC results {} SSTs to delete", number); } diff --git a/src/meta/src/hummock/manager/gc.rs b/src/meta/src/hummock/manager/gc.rs index 596c36857907f..9fb9a2bf96fc8 100644 --- a/src/meta/src/hummock/manager/gc.rs +++ b/src/meta/src/hummock/manager/gc.rs @@ -193,9 +193,10 @@ impl HummockManager { prefix = prefix.as_ref().unwrap_or(&String::from("")), "run full GC" ); + let compactor = match self.compactor_manager.next_compactor() { None => { - tracing::warn!("Try full GC but no available idle worker."); + tracing::warn!("full GC attempt but no available idle worker"); return Ok(false); } Some(compactor) => compactor, @@ -204,6 +205,8 @@ impl HummockManager { .send_event(ResponseEvent::FullScanTask(FullScanTask { sst_retention_time_sec: sst_retention_time.as_secs(), prefix, + start_after: self.full_gc_state.next_start_after(), + limit: self.full_gc_state.limit, })) .map_err(|_| Error::CompactorUnreachable(compactor.context_id()))?; Ok(true) @@ -211,7 +214,12 @@ impl HummockManager { /// Given candidate SSTs to GC, filter out false positive. /// Returns number of SSTs to GC. - pub async fn complete_full_gc(&self, object_ids: Vec) -> Result { + pub async fn complete_full_gc( + &self, + object_ids: Vec, + next_start_after: Option, + ) -> Result { + self.full_gc_state.set_next_start_after(next_start_after); if object_ids.is_empty() { tracing::info!("SST full scan returns no SSTs."); return Ok(0); @@ -252,6 +260,28 @@ impl HummockManager { } } +pub struct FullGcState { + next_start_after: Mutex>, + limit: Option, +} + +impl FullGcState { + pub fn new(limit: Option) -> Self { + Self { + next_start_after: Mutex::new(None), + limit, + } + } + + pub fn set_next_start_after(&self, next_start_after: Option) { + *self.next_start_after.lock() = next_start_after; + } + + pub fn next_start_after(&self) -> Option { + self.next_start_after.lock().clone() + } +} + /// Collects SST GC watermark from related cluster nodes and calculates a global one. /// /// It must wait enough heartbeats first. This precondition is checked at `spin_interval`. @@ -403,7 +433,10 @@ mod tests { ); // Empty input results immediate return, without waiting heartbeat. - hummock_manager.complete_full_gc(vec![]).await.unwrap(); + hummock_manager + .complete_full_gc(vec![], None) + .await + .unwrap(); // mimic CN heartbeat use risingwave_pb::meta::heartbeat_request::extra_info::Info; @@ -428,7 +461,7 @@ mod tests { assert_eq!( 3, hummock_manager - .complete_full_gc(vec![1, 2, 3]) + .complete_full_gc(vec![1, 2, 3], None) .await .unwrap() ); @@ -452,7 +485,8 @@ mod tests { 1, hummock_manager .complete_full_gc( - [committed_object_ids, vec![max_committed_object_id + 1]].concat() + [committed_object_ids, vec![max_committed_object_id + 1]].concat(), + None ) .await .unwrap() diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 4cf29ca060e1d..dea2314df75aa 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -44,7 +44,7 @@ use crate::hummock::compaction::CompactStatus; use crate::hummock::error::Result; use crate::hummock::manager::checkpoint::HummockVersionCheckpoint; use crate::hummock::manager::context::ContextInfo; -use crate::hummock::manager::gc::DeleteObjectTracker; +use crate::hummock::manager::gc::{DeleteObjectTracker, FullGcState}; use crate::hummock::CompactorManagerRef; use crate::manager::{MetaSrvEnv, MetaStoreImpl, MetadataManager}; use crate::model::{ClusterId, MetadataModel, MetadataModelError}; @@ -113,6 +113,7 @@ pub struct HummockManager { // `compaction_state` will record the types of compact tasks that can be triggered in `hummock` // and suggest types with a certain priority. pub compaction_state: CompactionState, + full_gc_state: FullGcState, } pub type HummockManagerRef = Arc; @@ -246,7 +247,7 @@ impl HummockManager { let version_checkpoint_path = version_checkpoint_path(state_store_dir); let version_archive_dir = version_archive_dir(state_store_dir); let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - + let full_gc_object_limit = env.opts.full_gc_object_limit; let instance = HummockManager { env, versioning: MonitoredRwLock::new( @@ -285,6 +286,7 @@ impl HummockManager { history_table_throughput: parking_lot::RwLock::new(HashMap::default()), compactor_streams_change_tx, compaction_state: CompactionState::new(), + full_gc_state: FullGcState::new(Some(full_gc_object_limit)), }; let instance = Arc::new(instance); instance.init_time_travel_state().await?; diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index c59fb512090fa..0b2fafe39b1c7 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -246,6 +246,7 @@ impl HummockMetaClient for MockHummockMetaClient { _filtered_object_ids: Vec, _total_object_count: u64, _total_object_size: u64, + _next_start_after: Option, ) -> Result<()> { unimplemented!() } diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index bf47b5fd1e541..409a05e9d59a1 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -186,6 +186,8 @@ pub struct MetaOpts { pub min_sst_retention_time_sec: u64, /// Interval of automatic hummock full GC. pub full_gc_interval_sec: u64, + /// Max number of object per full GC job can fetch. + pub full_gc_object_limit: u64, /// The spin interval when collecting global GC watermark in hummock pub collect_gc_watermark_spin_interval_sec: u64, /// Enable sanity check when SSTs are committed @@ -318,6 +320,7 @@ impl MetaOpts { min_delta_log_num_for_hummock_version_checkpoint: 1, min_sst_retention_time_sec: 3600 * 24 * 7, full_gc_interval_sec: 3600 * 24 * 7, + full_gc_object_limit: 100_000, collect_gc_watermark_spin_interval_sec: 5, enable_committed_sst_sanity_check: false, periodic_compaction_interval_sec: 60, diff --git a/src/object_store/src/object/mem.rs b/src/object_store/src/object/mem.rs index 270cac3719d61..1b05203a90ddf 100644 --- a/src/object_store/src/object/mem.rs +++ b/src/object_store/src/object/mem.rs @@ -23,14 +23,16 @@ use bytes::{BufMut, Bytes, BytesMut}; use fail::fail_point; use futures::Stream; use itertools::Itertools; -use risingwave_common::range::RangeBoundsExt; use thiserror::Error; use tokio::sync::Mutex; +use risingwave_common::range::RangeBoundsExt; + +use crate::object::{ObjectDataStream, ObjectMetadataIter}; + use super::{ ObjectError, ObjectMetadata, ObjectRangeBounds, ObjectResult, ObjectStore, StreamingUploader, }; -use crate::object::{ObjectDataStream, ObjectMetadataIter}; #[derive(Error, Debug)] pub enum Error { @@ -182,19 +184,30 @@ impl ObjectStore for InMemObjectStore { Ok(()) } - async fn list(&self, prefix: &str) -> ObjectResult { + async fn list( + &self, + prefix: &str, + start_after: Option, + limit: Option, + ) -> ObjectResult { let list_result = self .objects .lock() .await .iter() .filter_map(|(path, (metadata, _))| { + if let Some(ref start_after) = start_after + && metadata.key.le(start_after) + { + return None; + } if path.starts_with(prefix) { return Some(metadata.clone()); } None }) .sorted_by(|a, b| Ord::cmp(&a.key, &b.key)) + .take(limit.unwrap_or(usize::MAX)) .collect_vec(); Ok(Box::pin(InMemObjectIter::new(list_result))) } @@ -376,7 +389,7 @@ mod tests { async fn list_all(prefix: &str, store: &InMemObjectStore) -> Vec { store - .list(prefix) + .list(prefix, None, None) .await .unwrap() .try_collect::>() diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index aff197263f8fe..a304266d46419 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -121,7 +121,12 @@ pub trait ObjectStore: Send + Sync { MonitoredObjectStore::new(self, metrics, config) } - async fn list(&self, prefix: &str) -> ObjectResult; + async fn list( + &self, + prefix: &str, + start_after: Option, + limit: Option, + ) -> ObjectResult; fn store_media_type(&self) -> &'static str; @@ -326,8 +331,13 @@ impl ObjectStoreImpl { object_store_impl_method_body!(self, delete_objects(paths).await) } - pub async fn list(&self, prefix: &str) -> ObjectResult { - object_store_impl_method_body!(self, list(prefix).await) + pub async fn list( + &self, + prefix: &str, + start_after: Option, + limit: Option, + ) -> ObjectResult { + object_store_impl_method_body!(self, list(prefix, start_after, limit).await) } pub fn get_object_prefix(&self, obj_id: u64, use_new_object_prefix_strategy: bool) -> String { @@ -814,7 +824,12 @@ impl MonitoredObjectStore { res } - pub async fn list(&self, prefix: &str) -> ObjectResult { + pub async fn list( + &self, + prefix: &str, + start_after: Option, + limit: Option, + ) -> ObjectResult { let operation_type = OperationType::List; let operation_type_str = operation_type.as_str(); let media_type = self.media_type(); @@ -827,7 +842,7 @@ impl MonitoredObjectStore { let builder = || async { self.inner - .list(prefix) + .list(prefix, start_after.clone(), limit) .verbose_instrument_await(operation_type_str) .await }; diff --git a/src/object_store/src/object/opendal_engine/opendal_object_store.rs b/src/object_store/src/object/opendal_engine/opendal_object_store.rs index 6855ae9519566..8f629027c18b5 100644 --- a/src/object_store/src/object/opendal_engine/opendal_object_store.rs +++ b/src/object_store/src/object/opendal_engine/opendal_object_store.rs @@ -231,13 +231,21 @@ impl ObjectStore for OpendalObjectStore { Ok(()) } - async fn list(&self, prefix: &str) -> ObjectResult { - let object_lister = self + async fn list( + &self, + prefix: &str, + start_after: Option, + limit: Option, + ) -> ObjectResult { + let mut object_lister = self .op .lister_with(prefix) .recursive(true) - .metakey(Metakey::ContentLength) - .await?; + .metakey(Metakey::ContentLength); + if let Some(start_after) = start_after { + object_lister = object_lister.start_after(&start_after); + } + let object_lister = object_lister.await?; let stream = stream::unfold(object_lister, |mut object_lister| async move { match object_lister.next().await { @@ -261,7 +269,7 @@ impl ObjectStore for OpendalObjectStore { } }); - Ok(stream.boxed()) + Ok(stream.take(limit.unwrap_or(usize::MAX)).boxed()) } fn store_media_type(&self) -> &'static str { @@ -428,7 +436,7 @@ mod tests { async fn list_all(prefix: &str, store: &OpendalObjectStore) -> Vec { store - .list(prefix) + .list(prefix, None, None) .await .unwrap() .try_collect::>() diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs index 7ea687559ab3f..a2bd08440eb51 100644 --- a/src/object_store/src/object/s3.rs +++ b/src/object_store/src/object/s3.rs @@ -619,13 +619,22 @@ impl ObjectStore for S3ObjectStore { Ok(()) } - async fn list(&self, prefix: &str) -> ObjectResult { - Ok(Box::pin(S3ObjectIter::new( - self.client.clone(), - self.bucket.clone(), - prefix.to_string(), - self.config.clone(), - ))) + async fn list( + &self, + prefix: &str, + start_after: Option, + limit: Option, + ) -> ObjectResult { + Ok(Box::pin( + S3ObjectIter::new( + self.client.clone(), + self.bucket.clone(), + prefix.to_string(), + self.config.clone(), + start_after, + ) + .take(limit.unwrap_or(usize::MAX)), + )) } fn store_media_type(&self) -> &'static str { @@ -947,10 +956,17 @@ struct S3ObjectIter { >, config: Arc, + start_after: Option, } impl S3ObjectIter { - fn new(client: Client, bucket: String, prefix: String, config: Arc) -> Self { + fn new( + client: Client, + bucket: String, + prefix: String, + config: Arc, + start_after: Option, + ) -> Self { Self { buffer: VecDeque::default(), client, @@ -960,6 +976,7 @@ impl S3ObjectIter { is_truncated: Some(true), send_future: None, config, + start_after, } } } @@ -994,6 +1011,9 @@ impl Stream for S3ObjectIter { .list_objects_v2() .bucket(&self.bucket) .prefix(&self.prefix); + if let Some(start_after) = self.start_after.take() { + request = request.start_after(start_after); + } if let Some(continuation_token) = self.next_continuation_token.as_ref() { request = request.continuation_token(continuation_token); } diff --git a/src/object_store/src/object/sim/mod.rs b/src/object_store/src/object/sim/mod.rs index 2f06b91839090..f9ec4b85c5a40 100644 --- a/src/object_store/src/object/sim/mod.rs +++ b/src/object_store/src/object/sim/mod.rs @@ -221,7 +221,18 @@ impl ObjectStore for SimObjectStore { } } - async fn list(&self, path: &str) -> ObjectResult { + async fn list( + &self, + path: &str, + start_after: Option, + limit: Option, + ) -> ObjectResult { + if let Some(start_after) = start_after { + tracing::warn!(start_after, "start_after is ignored by SimObjectStore"); + } + if let Some(limit) = limit { + tracing::warn!(limit, "limit is ignored by SimObjectStore"); + } let path = path.to_string(); let resp = self .client diff --git a/src/rpc_client/src/hummock_meta_client.rs b/src/rpc_client/src/hummock_meta_client.rs index 5239d000353c4..2006b16e8d5f1 100644 --- a/src/rpc_client/src/hummock_meta_client.rs +++ b/src/rpc_client/src/hummock_meta_client.rs @@ -54,6 +54,7 @@ pub trait HummockMetaClient: Send + Sync + 'static { filtered_object_ids: Vec, total_object_count: u64, total_object_size: u64, + next_start_after: Option, ) -> Result<()>; async fn trigger_full_gc( &self, diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index bdbba36d17a82..8e7dc5dee1b45 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -1546,11 +1546,13 @@ impl HummockMetaClient for MetaClient { filtered_object_ids: Vec, total_object_count: u64, total_object_size: u64, + next_start_after: Option, ) -> Result<()> { let req = ReportFullScanTaskRequest { object_ids: filtered_object_ids, total_object_count, total_object_size, + next_start_after, }; self.inner.report_full_scan_task(req).await?; Ok(()) diff --git a/src/storage/hummock_test/src/vacuum_tests.rs b/src/storage/hummock_test/src/vacuum_tests.rs index 5c6dbd20587cc..e68a2de4b00ac 100644 --- a/src/storage/hummock_test/src/vacuum_tests.rs +++ b/src/storage/hummock_test/src/vacuum_tests.rs @@ -93,8 +93,10 @@ async fn test_full_scan() { let task = FullScanTask { sst_retention_time_sec: 10000, prefix: None, + start_after: None, + limit: None, }; - let (scan_result, _, _) = Vacuum::full_scan_inner(task, object_metadata_iter.clone()) + let (scan_result, ..) = Vacuum::full_scan_inner(task, object_metadata_iter.clone()) .await .unwrap(); assert!(scan_result.is_empty()); @@ -102,8 +104,10 @@ async fn test_full_scan() { let task = FullScanTask { sst_retention_time_sec: 6000, prefix: None, + start_after: None, + limit: None, }; - let (scan_result, _, _) = Vacuum::full_scan_inner(task, object_metadata_iter.clone()) + let (scan_result, ..) = Vacuum::full_scan_inner(task, object_metadata_iter.clone()) .await .unwrap(); assert_eq!(scan_result.into_iter().sorted().collect_vec(), vec![1]); @@ -111,8 +115,10 @@ async fn test_full_scan() { let task = FullScanTask { sst_retention_time_sec: 2000, prefix: None, + start_after: None, + limit: None, }; - let (scan_result, _, _) = Vacuum::full_scan_inner(task, object_metadata_iter) + let (scan_result, ..) = Vacuum::full_scan_inner(task, object_metadata_iter) .await .unwrap(); assert_eq!(scan_result.into_iter().sorted().collect_vec(), vec![1, 2]); diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index 12e009bb60cb2..b535bd3945cdf 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -594,11 +594,12 @@ pub fn start_compactor( ) .await { - Ok((object_ids, total_object_count, total_object_size)) => { + Ok((object_ids, total_object_count, total_object_size, next_start_after)) => { Vacuum::report_full_scan_task( object_ids, total_object_count, total_object_size, + next_start_after, meta_client, ) .await; @@ -804,11 +805,12 @@ pub fn start_shared_compactor( match Vacuum::handle_full_scan_task(full_scan_task, context.sstable_store.clone()) .await { - Ok((object_ids, total_object_count, total_object_size)) => { + Ok((object_ids, total_object_count, total_object_size, next_start_after)) => { let report_full_scan_task_request = ReportFullScanTaskRequest { object_ids, total_object_count, total_object_size, + next_start_after, }; match cloned_grpc_proxy_client .report_full_scan_task(report_full_scan_task_request) diff --git a/src/storage/src/hummock/hummock_meta_client.rs b/src/storage/src/hummock/hummock_meta_client.rs index 53f15ebc1fb3f..7e07a6ed9c89d 100644 --- a/src/storage/src/hummock/hummock_meta_client.rs +++ b/src/storage/src/hummock/hummock_meta_client.rs @@ -98,9 +98,15 @@ impl HummockMetaClient for MonitoredHummockMetaClient { filtered_object_ids: Vec, total_object_count: u64, total_object_size: u64, + next_start_after: Option, ) -> Result<()> { self.meta_client - .report_full_scan_task(filtered_object_ids, total_object_count, total_object_size) + .report_full_scan_task( + filtered_object_ids, + total_object_count, + total_object_size, + next_start_after, + ) .await } diff --git a/src/storage/src/hummock/sstable_store.rs b/src/storage/src/hummock/sstable_store.rs index d1367b92a9ce8..11f8c2c327400 100644 --- a/src/storage/src/hummock/sstable_store.rs +++ b/src/storage/src/hummock/sstable_store.rs @@ -614,9 +614,11 @@ impl SstableStore { pub async fn list_object_metadata_from_object_store( &self, prefix: Option, + start_after: Option, + limit: Option, ) -> HummockResult { let list_path = format!("{}/{}", self.path, prefix.unwrap_or("".into())); - let raw_iter = self.store.list(&list_path).await?; + let raw_iter = self.store.list(&list_path, start_after, limit).await?; let iter = raw_iter.filter(|r| match r { Ok(i) => future::ready(i.key.ends_with(&format!(".{}", OBJECT_SUFFIX))), Err(_) => future::ready(true), diff --git a/src/storage/src/hummock/vacuum.rs b/src/storage/src/hummock/vacuum.rs index fb4c9d782215b..17095e994e287 100644 --- a/src/storage/src/hummock/vacuum.rs +++ b/src/storage/src/hummock/vacuum.rs @@ -35,7 +35,7 @@ impl Vacuum { sstable_store: SstableStoreRef, sstable_object_ids: &[u64], ) -> HummockResult<()> { - tracing::info!("Try to vacuum SSTs {:?}", sstable_object_ids); + tracing::info!("try to vacuum SSTs {:?}", sstable_object_ids); sstable_store.delete_list(sstable_object_ids).await?; Ok(()) } @@ -46,10 +46,10 @@ impl Vacuum { ) -> bool { match hummock_meta_client.report_vacuum_task(vacuum_task).await { Ok(_) => { - tracing::info!("Finished vacuuming SSTs"); + tracing::info!("vacuuming SSTs succeeded"); } Err(e) => { - tracing::warn!(error = %e.as_report(), "Failed to report vacuum task"); + tracing::warn!(error = %e.as_report(), "failed to report vacuum task"); return false; } } @@ -59,7 +59,7 @@ impl Vacuum { pub async fn full_scan_inner( full_scan_task: FullScanTask, metadata_iter: ObjectMetadataIter, - ) -> HummockResult<(Vec, u64, u64)> { + ) -> HummockResult<(Vec, u64, u64, Option)> { let timestamp_watermark = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() @@ -68,12 +68,21 @@ impl Vacuum { let mut total_object_count = 0; let mut total_object_size = 0; + let mut next_start_after: Option = None; let filtered = metadata_iter .filter_map(|r| { let result = match r { Ok(o) => { total_object_count += 1; total_object_size += o.total_size; + // Determine if the LIST has been truncated. + // A false positives would at most cost one additional LIST later. + if let Some(limit) = full_scan_task.limit + && limit == total_object_count + { + next_start_after = Some(o.key.clone()); + tracing::debug!(next_start_after, "set next start after"); + } if o.last_modified < timestamp_watermark { Some(Ok(SstableStore::get_object_id_from_path(&o.key))) } else { @@ -88,8 +97,9 @@ impl Vacuum { .await?; Ok(( filtered, - total_object_count as u64, + total_object_count, total_object_size as u64, + next_start_after, )) } @@ -97,14 +107,20 @@ impl Vacuum { pub async fn handle_full_scan_task( full_scan_task: FullScanTask, sstable_store: SstableStoreRef, - ) -> HummockResult<(Vec, u64, u64)> { + ) -> HummockResult<(Vec, u64, u64, Option)> { tracing::info!( timestamp = full_scan_task.sst_retention_time_sec, prefix = full_scan_task.prefix.as_ref().unwrap_or(&String::from("")), - "Try to full scan SSTs" + start_after = full_scan_task.start_after, + limit = full_scan_task.limit, + "try to full scan SSTs" ); let metadata_iter = sstable_store - .list_object_metadata_from_object_store(full_scan_task.prefix.clone()) + .list_object_metadata_from_object_store( + full_scan_task.prefix.clone(), + full_scan_task.start_after.clone(), + full_scan_task.limit.map(|i| i as usize), + ) .await?; Vacuum::full_scan_inner(full_scan_task, metadata_iter).await } @@ -113,24 +129,30 @@ impl Vacuum { filtered_object_ids: Vec, unfiltered_count: u64, unfiltered_size: u64, + next_start_after: Option, hummock_meta_client: Arc, ) -> bool { - tracing::info!("Try to report full scan task",); tracing::info!( - "filtered_object_ids length = {}, unfiltered_count = {}, unfiltered_size = {}", - filtered_object_ids.len(), + filtered_object_ids_len = filtered_object_ids.len(), unfiltered_count, - unfiltered_size + unfiltered_size, + next_start_after, + "try to report full scan task" ); match hummock_meta_client - .report_full_scan_task(filtered_object_ids, unfiltered_count, unfiltered_size) + .report_full_scan_task( + filtered_object_ids, + unfiltered_count, + unfiltered_size, + next_start_after, + ) .await { Ok(_) => { - tracing::info!("Finished full scan SSTs"); + tracing::info!("full scan SSTs succeeded"); } Err(e) => { - tracing::warn!(error = %e.as_report(), "Failed to report full scan task"); + tracing::warn!(error = %e.as_report(), "failed to report full scan task"); return false; } }