Skip to content

Commit

Permalink
refactor(storage): support pagination for LIST during full GC
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Sep 26, 2024
1 parent 5e2cfac commit 91fe174
Show file tree
Hide file tree
Showing 21 changed files with 218 additions and 55 deletions.
3 changes: 3 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,8 @@ message VacuumTask {
message FullScanTask {
uint64 sst_retention_time_sec = 1;
optional string prefix = 2;
optional string start_after = 3;
optional uint64 limit = 4;
}

// Cancel compact task
Expand Down Expand Up @@ -580,6 +582,7 @@ message ReportFullScanTaskRequest {
uint64 total_object_count = 2;
// Total size of objects before filtered by conditions specified by FullScanTask.
uint64 total_object_size = 3;
optional string next_start_after = 4;
}

message ReportFullScanTaskResponse {
Expand Down
8 changes: 8 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ pub struct MetaConfig {
#[serde(default = "default::meta::full_gc_interval_sec")]
pub full_gc_interval_sec: u64,

/// Max number of object per full GC job can fetch.
#[serde(default = "default::meta::full_gc_object_limit")]
pub full_gc_object_limit: u64,

/// The spin interval when collecting global GC watermark in hummock.
#[serde(default = "default::meta::collect_gc_watermark_spin_interval_sec")]
pub collect_gc_watermark_spin_interval_sec: u64,
Expand Down Expand Up @@ -1346,6 +1350,10 @@ pub mod default {
86400
}

pub fn full_gc_object_limit() -> u64 {
100_000
}

pub fn collect_gc_watermark_spin_interval_sec() -> u64 {
5
}
Expand Down
2 changes: 1 addition & 1 deletion src/ctl/src/cmd_impl/hummock/sst_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result
.await?;
} else {
let mut metadata_iter = sstable_store
.list_object_metadata_from_object_store(None)
.list_object_metadata_from_object_store(None, None, None)
.await?;
while let Some(obj) = metadata_iter.try_next().await? {
print_object(&obj);
Expand Down
1 change: 1 addition & 0 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ pub fn start(
.min_delta_log_num_for_hummock_version_checkpoint,
min_sst_retention_time_sec: config.meta.min_sst_retention_time_sec,
full_gc_interval_sec: config.meta.full_gc_interval_sec,
full_gc_object_limit: config.meta.full_gc_object_limit,
collect_gc_watermark_spin_interval_sec: config
.meta
.collect_gc_watermark_spin_interval_sec,
Expand Down
5 changes: 4 additions & 1 deletion src/meta/service/src/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,10 @@ impl HummockManagerService for HummockServiceImpl {
// The following operation takes some time, so we do it in dedicated task and responds the
// RPC immediately.
tokio::spawn(async move {
match hummock_manager.complete_full_gc(req.object_ids).await {
match hummock_manager
.complete_full_gc(req.object_ids, req.next_start_after)
.await
{
Ok(number) => {
tracing::info!("Full GC results {} SSTs to delete", number);
}
Expand Down
44 changes: 39 additions & 5 deletions src/meta/src/hummock/manager/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,10 @@ impl HummockManager {
prefix = prefix.as_ref().unwrap_or(&String::from("")),
"run full GC"
);

let compactor = match self.compactor_manager.next_compactor() {
None => {
tracing::warn!("Try full GC but no available idle worker.");
tracing::warn!("full GC attempt but no available idle worker");
return Ok(false);
}
Some(compactor) => compactor,
Expand All @@ -204,14 +205,21 @@ impl HummockManager {
.send_event(ResponseEvent::FullScanTask(FullScanTask {
sst_retention_time_sec: sst_retention_time.as_secs(),
prefix,
start_after: self.full_gc_state.next_start_after(),
limit: self.full_gc_state.limit,
}))
.map_err(|_| Error::CompactorUnreachable(compactor.context_id()))?;
Ok(true)
}

/// Given candidate SSTs to GC, filter out false positive.
/// Returns number of SSTs to GC.
pub async fn complete_full_gc(&self, object_ids: Vec<HummockSstableObjectId>) -> Result<usize> {
pub async fn complete_full_gc(
&self,
object_ids: Vec<HummockSstableObjectId>,
next_start_after: Option<String>,
) -> Result<usize> {
self.full_gc_state.set_next_start_after(next_start_after);
if object_ids.is_empty() {
tracing::info!("SST full scan returns no SSTs.");
return Ok(0);
Expand Down Expand Up @@ -252,6 +260,28 @@ impl HummockManager {
}
}

pub struct FullGcState {
next_start_after: Mutex<Option<String>>,
limit: Option<u64>,
}

impl FullGcState {
pub fn new(limit: Option<u64>) -> Self {
Self {
next_start_after: Mutex::new(None),
limit,
}
}

pub fn set_next_start_after(&self, next_start_after: Option<String>) {
*self.next_start_after.lock() = next_start_after;
}

pub fn next_start_after(&self) -> Option<String> {
self.next_start_after.lock().clone()
}
}

/// Collects SST GC watermark from related cluster nodes and calculates a global one.
///
/// It must wait enough heartbeats first. This precondition is checked at `spin_interval`.
Expand Down Expand Up @@ -403,7 +433,10 @@ mod tests {
);

// Empty input results immediate return, without waiting heartbeat.
hummock_manager.complete_full_gc(vec![]).await.unwrap();
hummock_manager
.complete_full_gc(vec![], None)
.await
.unwrap();

// mimic CN heartbeat
use risingwave_pb::meta::heartbeat_request::extra_info::Info;
Expand All @@ -428,7 +461,7 @@ mod tests {
assert_eq!(
3,
hummock_manager
.complete_full_gc(vec![1, 2, 3])
.complete_full_gc(vec![1, 2, 3], None)
.await
.unwrap()
);
Expand All @@ -452,7 +485,8 @@ mod tests {
1,
hummock_manager
.complete_full_gc(
[committed_object_ids, vec![max_committed_object_id + 1]].concat()
[committed_object_ids, vec![max_committed_object_id + 1]].concat(),
None
)
.await
.unwrap()
Expand Down
6 changes: 4 additions & 2 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::hummock::compaction::CompactStatus;
use crate::hummock::error::Result;
use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
use crate::hummock::manager::context::ContextInfo;
use crate::hummock::manager::gc::DeleteObjectTracker;
use crate::hummock::manager::gc::{DeleteObjectTracker, FullGcState};
use crate::hummock::CompactorManagerRef;
use crate::manager::{MetaSrvEnv, MetaStoreImpl, MetadataManager};
use crate::model::{ClusterId, MetadataModel, MetadataModelError};
Expand Down Expand Up @@ -113,6 +113,7 @@ pub struct HummockManager {
// `compaction_state` will record the types of compact tasks that can be triggered in `hummock`
// and suggest types with a certain priority.
pub compaction_state: CompactionState,
full_gc_state: FullGcState,
}

pub type HummockManagerRef = Arc<HummockManager>;
Expand Down Expand Up @@ -246,7 +247,7 @@ impl HummockManager {
let version_checkpoint_path = version_checkpoint_path(state_store_dir);
let version_archive_dir = version_archive_dir(state_store_dir);
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();

let full_gc_object_limit = env.opts.full_gc_object_limit;
let instance = HummockManager {
env,
versioning: MonitoredRwLock::new(
Expand Down Expand Up @@ -285,6 +286,7 @@ impl HummockManager {
history_table_throughput: parking_lot::RwLock::new(HashMap::default()),
compactor_streams_change_tx,
compaction_state: CompactionState::new(),
full_gc_state: FullGcState::new(Some(full_gc_object_limit)),
};
let instance = Arc::new(instance);
instance.init_time_travel_state().await?;
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/hummock/mock_hummock_meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ impl HummockMetaClient for MockHummockMetaClient {
_filtered_object_ids: Vec<HummockSstableObjectId>,
_total_object_count: u64,
_total_object_size: u64,
_next_start_after: Option<String>,
) -> Result<()> {
unimplemented!()
}
Expand Down
3 changes: 3 additions & 0 deletions src/meta/src/manager/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ pub struct MetaOpts {
pub min_sst_retention_time_sec: u64,
/// Interval of automatic hummock full GC.
pub full_gc_interval_sec: u64,
/// Max number of object per full GC job can fetch.
pub full_gc_object_limit: u64,
/// The spin interval when collecting global GC watermark in hummock
pub collect_gc_watermark_spin_interval_sec: u64,
/// Enable sanity check when SSTs are committed
Expand Down Expand Up @@ -318,6 +320,7 @@ impl MetaOpts {
min_delta_log_num_for_hummock_version_checkpoint: 1,
min_sst_retention_time_sec: 3600 * 24 * 7,
full_gc_interval_sec: 3600 * 24 * 7,
full_gc_object_limit: 100_000,
collect_gc_watermark_spin_interval_sec: 5,
enable_committed_sst_sanity_check: false,
periodic_compaction_interval_sec: 60,
Expand Down
21 changes: 17 additions & 4 deletions src/object_store/src/object/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@ use bytes::{BufMut, Bytes, BytesMut};
use fail::fail_point;
use futures::Stream;
use itertools::Itertools;
use risingwave_common::range::RangeBoundsExt;
use thiserror::Error;
use tokio::sync::Mutex;

use risingwave_common::range::RangeBoundsExt;

use crate::object::{ObjectDataStream, ObjectMetadataIter};

use super::{
ObjectError, ObjectMetadata, ObjectRangeBounds, ObjectResult, ObjectStore, StreamingUploader,
};
use crate::object::{ObjectDataStream, ObjectMetadataIter};

#[derive(Error, Debug)]
pub enum Error {
Expand Down Expand Up @@ -182,19 +184,30 @@ impl ObjectStore for InMemObjectStore {
Ok(())
}

async fn list(&self, prefix: &str) -> ObjectResult<ObjectMetadataIter> {
async fn list(
&self,
prefix: &str,
start_after: Option<String>,
limit: Option<usize>,
) -> ObjectResult<ObjectMetadataIter> {
let list_result = self
.objects
.lock()
.await
.iter()
.filter_map(|(path, (metadata, _))| {
if let Some(ref start_after) = start_after
&& metadata.key.le(start_after)
{
return None;
}
if path.starts_with(prefix) {
return Some(metadata.clone());
}
None
})
.sorted_by(|a, b| Ord::cmp(&a.key, &b.key))
.take(limit.unwrap_or(usize::MAX))
.collect_vec();
Ok(Box::pin(InMemObjectIter::new(list_result)))
}
Expand Down Expand Up @@ -376,7 +389,7 @@ mod tests {

async fn list_all(prefix: &str, store: &InMemObjectStore) -> Vec<ObjectMetadata> {
store
.list(prefix)
.list(prefix, None, None)
.await
.unwrap()
.try_collect::<Vec<_>>()
Expand Down
25 changes: 20 additions & 5 deletions src/object_store/src/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,12 @@ pub trait ObjectStore: Send + Sync {
MonitoredObjectStore::new(self, metrics, config)
}

async fn list(&self, prefix: &str) -> ObjectResult<ObjectMetadataIter>;
async fn list(
&self,
prefix: &str,
start_after: Option<String>,
limit: Option<usize>,
) -> ObjectResult<ObjectMetadataIter>;

fn store_media_type(&self) -> &'static str;

Expand Down Expand Up @@ -326,8 +331,13 @@ impl ObjectStoreImpl {
object_store_impl_method_body!(self, delete_objects(paths).await)
}

pub async fn list(&self, prefix: &str) -> ObjectResult<ObjectMetadataIter> {
object_store_impl_method_body!(self, list(prefix).await)
pub async fn list(
&self,
prefix: &str,
start_after: Option<String>,
limit: Option<usize>,
) -> ObjectResult<ObjectMetadataIter> {
object_store_impl_method_body!(self, list(prefix, start_after, limit).await)
}

pub fn get_object_prefix(&self, obj_id: u64, use_new_object_prefix_strategy: bool) -> String {
Expand Down Expand Up @@ -814,7 +824,12 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
res
}

pub async fn list(&self, prefix: &str) -> ObjectResult<ObjectMetadataIter> {
pub async fn list(
&self,
prefix: &str,
start_after: Option<String>,
limit: Option<usize>,
) -> ObjectResult<ObjectMetadataIter> {
let operation_type = OperationType::List;
let operation_type_str = operation_type.as_str();
let media_type = self.media_type();
Expand All @@ -827,7 +842,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {

let builder = || async {
self.inner
.list(prefix)
.list(prefix, start_after.clone(), limit)
.verbose_instrument_await(operation_type_str)
.await
};
Expand Down
20 changes: 14 additions & 6 deletions src/object_store/src/object/opendal_engine/opendal_object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,13 +231,21 @@ impl ObjectStore for OpendalObjectStore {
Ok(())
}

async fn list(&self, prefix: &str) -> ObjectResult<ObjectMetadataIter> {
let object_lister = self
async fn list(
&self,
prefix: &str,
start_after: Option<String>,
limit: Option<usize>,
) -> ObjectResult<ObjectMetadataIter> {
let mut object_lister = self
.op
.lister_with(prefix)
.recursive(true)
.metakey(Metakey::ContentLength)
.await?;
.metakey(Metakey::ContentLength);
if let Some(start_after) = start_after {
object_lister = object_lister.start_after(&start_after);
}
let object_lister = object_lister.await?;

let stream = stream::unfold(object_lister, |mut object_lister| async move {
match object_lister.next().await {
Expand All @@ -261,7 +269,7 @@ impl ObjectStore for OpendalObjectStore {
}
});

Ok(stream.boxed())
Ok(stream.take(limit.unwrap_or(usize::MAX)).boxed())
}

fn store_media_type(&self) -> &'static str {
Expand Down Expand Up @@ -428,7 +436,7 @@ mod tests {

async fn list_all(prefix: &str, store: &OpendalObjectStore) -> Vec<ObjectMetadata> {
store
.list(prefix)
.list(prefix, None, None)
.await
.unwrap()
.try_collect::<Vec<_>>()
Expand Down
Loading

0 comments on commit 91fe174

Please sign in to comment.