Skip to content

Commit

Permalink
refactor(storage): support pagination for LIST during full GC (#18737)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Sep 27, 2024
1 parent e9c2161 commit 8f0d5de
Show file tree
Hide file tree
Showing 24 changed files with 226 additions and 54 deletions.
3 changes: 3 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,8 @@ message VacuumTask {
message FullScanTask {
uint64 sst_retention_time_sec = 1;
optional string prefix = 2;
optional string start_after = 3;
optional uint64 limit = 4;
}

// Cancel compact task
Expand Down Expand Up @@ -580,6 +582,7 @@ message ReportFullScanTaskRequest {
uint64 total_object_count = 2;
// Total size of objects before filtered by conditions specified by FullScanTask.
uint64 total_object_size = 3;
optional string next_start_after = 4;
}

message ReportFullScanTaskResponse {
Expand Down
8 changes: 8 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ pub struct MetaConfig {
#[serde(default = "default::meta::full_gc_interval_sec")]
pub full_gc_interval_sec: u64,

/// Max number of object per full GC job can fetch.
#[serde(default = "default::meta::full_gc_object_limit")]
pub full_gc_object_limit: u64,

/// The spin interval when collecting global GC watermark in hummock.
#[serde(default = "default::meta::collect_gc_watermark_spin_interval_sec")]
pub collect_gc_watermark_spin_interval_sec: u64,
Expand Down Expand Up @@ -1346,6 +1350,10 @@ pub mod default {
86400
}

pub fn full_gc_object_limit() -> u64 {
100_000
}

pub fn collect_gc_watermark_spin_interval_sec() -> u64 {
5
}
Expand Down
1 change: 1 addition & 0 deletions src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ This page is automatically generated by `./risedev generate-example-config`
| event_log_channel_max_size | Keeps the latest N events per channel. | 10 |
| event_log_enabled | | true |
| full_gc_interval_sec | Interval of automatic hummock full GC. | 86400 |
| full_gc_object_limit | Max number of object per full GC job can fetch. | 100000 |
| hummock_time_travel_snapshot_interval | The interval at which a Hummock version snapshot is taken for time travel. Larger value indicates less storage overhead but worse query performance. | 100 |
| hummock_version_checkpoint_interval_sec | Interval of hummock version checkpoint. | 30 |
| hybrid_partition_vnode_count | Count of partitions of tables in default group and materialized view group. The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment. Each partition contains aligned data of `vnode_count / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. Set it zero to disable this feature. | 4 |
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dir = "./"
[meta]
min_sst_retention_time_sec = 86400
full_gc_interval_sec = 86400
full_gc_object_limit = 100000
collect_gc_watermark_spin_interval_sec = 5
periodic_compaction_interval_sec = 60
vacuum_interval_sec = 30
Expand Down
2 changes: 1 addition & 1 deletion src/ctl/src/cmd_impl/hummock/sst_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result
.await?;
} else {
let mut metadata_iter = sstable_store
.list_object_metadata_from_object_store(None)
.list_object_metadata_from_object_store(None, None, None)
.await?;
while let Some(obj) = metadata_iter.try_next().await? {
print_object(&obj);
Expand Down
1 change: 1 addition & 0 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ pub fn start(
.min_delta_log_num_for_hummock_version_checkpoint,
min_sst_retention_time_sec: config.meta.min_sst_retention_time_sec,
full_gc_interval_sec: config.meta.full_gc_interval_sec,
full_gc_object_limit: config.meta.full_gc_object_limit,
collect_gc_watermark_spin_interval_sec: config
.meta
.collect_gc_watermark_spin_interval_sec,
Expand Down
5 changes: 4 additions & 1 deletion src/meta/service/src/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,10 @@ impl HummockManagerService for HummockServiceImpl {
// The following operation takes some time, so we do it in dedicated task and responds the
// RPC immediately.
tokio::spawn(async move {
match hummock_manager.complete_full_gc(req.object_ids).await {
match hummock_manager
.complete_full_gc(req.object_ids, req.next_start_after)
.await
{
Ok(number) => {
tracing::info!("Full GC results {} SSTs to delete", number);
}
Expand Down
48 changes: 43 additions & 5 deletions src/meta/src/hummock/manager/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,14 +188,19 @@ impl HummockManager {
sst_retention_time,
Duration::from_secs(self.env.opts.min_sst_retention_time_sec),
);
let start_after = self.full_gc_state.next_start_after();
let limit = self.full_gc_state.limit;
tracing::info!(
retention_sec = sst_retention_time.as_secs(),
prefix = prefix.as_ref().unwrap_or(&String::from("")),
start_after,
limit,
"run full GC"
);

let compactor = match self.compactor_manager.next_compactor() {
None => {
tracing::warn!("Try full GC but no available idle worker.");
tracing::warn!("full GC attempt but no available idle worker");
return Ok(false);
}
Some(compactor) => compactor,
Expand All @@ -204,14 +209,21 @@ impl HummockManager {
.send_event(ResponseEvent::FullScanTask(FullScanTask {
sst_retention_time_sec: sst_retention_time.as_secs(),
prefix,
start_after,
limit,
}))
.map_err(|_| Error::CompactorUnreachable(compactor.context_id()))?;
Ok(true)
}

/// Given candidate SSTs to GC, filter out false positive.
/// Returns number of SSTs to GC.
pub async fn complete_full_gc(&self, object_ids: Vec<HummockSstableObjectId>) -> Result<usize> {
pub async fn complete_full_gc(
&self,
object_ids: Vec<HummockSstableObjectId>,
next_start_after: Option<String>,
) -> Result<usize> {
self.full_gc_state.set_next_start_after(next_start_after);
if object_ids.is_empty() {
tracing::info!("SST full scan returns no SSTs.");
return Ok(0);
Expand Down Expand Up @@ -252,6 +264,28 @@ impl HummockManager {
}
}

pub struct FullGcState {
next_start_after: Mutex<Option<String>>,
limit: Option<u64>,
}

impl FullGcState {
pub fn new(limit: Option<u64>) -> Self {
Self {
next_start_after: Mutex::new(None),
limit,
}
}

pub fn set_next_start_after(&self, next_start_after: Option<String>) {
*self.next_start_after.lock() = next_start_after;
}

pub fn next_start_after(&self) -> Option<String> {
self.next_start_after.lock().clone()
}
}

/// Collects SST GC watermark from related cluster nodes and calculates a global one.
///
/// It must wait enough heartbeats first. This precondition is checked at `spin_interval`.
Expand Down Expand Up @@ -403,7 +437,10 @@ mod tests {
);

// Empty input results immediate return, without waiting heartbeat.
hummock_manager.complete_full_gc(vec![]).await.unwrap();
hummock_manager
.complete_full_gc(vec![], None)
.await
.unwrap();

// mimic CN heartbeat
use risingwave_pb::meta::heartbeat_request::extra_info::Info;
Expand All @@ -428,7 +465,7 @@ mod tests {
assert_eq!(
3,
hummock_manager
.complete_full_gc(vec![1, 2, 3])
.complete_full_gc(vec![1, 2, 3], None)
.await
.unwrap()
);
Expand All @@ -452,7 +489,8 @@ mod tests {
1,
hummock_manager
.complete_full_gc(
[committed_object_ids, vec![max_committed_object_id + 1]].concat()
[committed_object_ids, vec![max_committed_object_id + 1]].concat(),
None
)
.await
.unwrap()
Expand Down
6 changes: 4 additions & 2 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::hummock::compaction::CompactStatus;
use crate::hummock::error::Result;
use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
use crate::hummock::manager::context::ContextInfo;
use crate::hummock::manager::gc::DeleteObjectTracker;
use crate::hummock::manager::gc::{DeleteObjectTracker, FullGcState};
use crate::hummock::CompactorManagerRef;
use crate::manager::{MetaSrvEnv, MetaStoreImpl, MetadataManager};
use crate::model::{ClusterId, MetadataModel, MetadataModelError};
Expand Down Expand Up @@ -113,6 +113,7 @@ pub struct HummockManager {
// `compaction_state` will record the types of compact tasks that can be triggered in `hummock`
// and suggest types with a certain priority.
pub compaction_state: CompactionState,
full_gc_state: FullGcState,
}

pub type HummockManagerRef = Arc<HummockManager>;
Expand Down Expand Up @@ -246,7 +247,7 @@ impl HummockManager {
let version_checkpoint_path = version_checkpoint_path(state_store_dir);
let version_archive_dir = version_archive_dir(state_store_dir);
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();

let full_gc_object_limit = env.opts.full_gc_object_limit;
let instance = HummockManager {
env,
versioning: MonitoredRwLock::new(
Expand Down Expand Up @@ -285,6 +286,7 @@ impl HummockManager {
history_table_throughput: parking_lot::RwLock::new(HashMap::default()),
compactor_streams_change_tx,
compaction_state: CompactionState::new(),
full_gc_state: FullGcState::new(Some(full_gc_object_limit)),
};
let instance = Arc::new(instance);
instance.init_time_travel_state().await?;
Expand Down
4 changes: 3 additions & 1 deletion src/meta/src/hummock/manager/timer_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,10 @@ impl HummockManager {
}

HummockTimerEvent::FullGc => {
let retention_sec =
hummock_manager.env.opts.min_sst_retention_time_sec;
if hummock_manager
.start_full_gc(Duration::from_secs(3600), None)
.start_full_gc(Duration::from_secs(retention_sec), None)
.is_ok()
{
tracing::info!("Start full GC from meta node.");
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/hummock/mock_hummock_meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ impl HummockMetaClient for MockHummockMetaClient {
_filtered_object_ids: Vec<HummockSstableObjectId>,
_total_object_count: u64,
_total_object_size: u64,
_next_start_after: Option<String>,
) -> Result<()> {
unimplemented!()
}
Expand Down
3 changes: 3 additions & 0 deletions src/meta/src/manager/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ pub struct MetaOpts {
pub min_sst_retention_time_sec: u64,
/// Interval of automatic hummock full GC.
pub full_gc_interval_sec: u64,
/// Max number of object per full GC job can fetch.
pub full_gc_object_limit: u64,
/// The spin interval when collecting global GC watermark in hummock
pub collect_gc_watermark_spin_interval_sec: u64,
/// Enable sanity check when SSTs are committed
Expand Down Expand Up @@ -318,6 +320,7 @@ impl MetaOpts {
min_delta_log_num_for_hummock_version_checkpoint: 1,
min_sst_retention_time_sec: 3600 * 24 * 7,
full_gc_interval_sec: 3600 * 24 * 7,
full_gc_object_limit: 100_000,
collect_gc_watermark_spin_interval_sec: 5,
enable_committed_sst_sanity_check: false,
periodic_compaction_interval_sec: 60,
Expand Down
15 changes: 13 additions & 2 deletions src/object_store/src/object/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,19 +182,30 @@ impl ObjectStore for InMemObjectStore {
Ok(())
}

async fn list(&self, prefix: &str) -> ObjectResult<ObjectMetadataIter> {
async fn list(
&self,
prefix: &str,
start_after: Option<String>,
limit: Option<usize>,
) -> ObjectResult<ObjectMetadataIter> {
let list_result = self
.objects
.lock()
.await
.iter()
.filter_map(|(path, (metadata, _))| {
if let Some(ref start_after) = start_after
&& metadata.key.le(start_after)
{
return None;
}
if path.starts_with(prefix) {
return Some(metadata.clone());
}
None
})
.sorted_by(|a, b| Ord::cmp(&a.key, &b.key))
.take(limit.unwrap_or(usize::MAX))
.collect_vec();
Ok(Box::pin(InMemObjectIter::new(list_result)))
}
Expand Down Expand Up @@ -376,7 +387,7 @@ mod tests {

async fn list_all(prefix: &str, store: &InMemObjectStore) -> Vec<ObjectMetadata> {
store
.list(prefix)
.list(prefix, None, None)
.await
.unwrap()
.try_collect::<Vec<_>>()
Expand Down
25 changes: 20 additions & 5 deletions src/object_store/src/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,12 @@ pub trait ObjectStore: Send + Sync {
MonitoredObjectStore::new(self, metrics, config)
}

async fn list(&self, prefix: &str) -> ObjectResult<ObjectMetadataIter>;
async fn list(
&self,
prefix: &str,
start_after: Option<String>,
limit: Option<usize>,
) -> ObjectResult<ObjectMetadataIter>;

fn store_media_type(&self) -> &'static str;

Expand Down Expand Up @@ -326,8 +331,13 @@ impl ObjectStoreImpl {
object_store_impl_method_body!(self, delete_objects(paths).await)
}

pub async fn list(&self, prefix: &str) -> ObjectResult<ObjectMetadataIter> {
object_store_impl_method_body!(self, list(prefix).await)
pub async fn list(
&self,
prefix: &str,
start_after: Option<String>,
limit: Option<usize>,
) -> ObjectResult<ObjectMetadataIter> {
object_store_impl_method_body!(self, list(prefix, start_after, limit).await)
}

pub fn get_object_prefix(&self, obj_id: u64, use_new_object_prefix_strategy: bool) -> String {
Expand Down Expand Up @@ -814,7 +824,12 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
res
}

pub async fn list(&self, prefix: &str) -> ObjectResult<ObjectMetadataIter> {
pub async fn list(
&self,
prefix: &str,
start_after: Option<String>,
limit: Option<usize>,
) -> ObjectResult<ObjectMetadataIter> {
let operation_type = OperationType::List;
let operation_type_str = operation_type.as_str();
let media_type = self.media_type();
Expand All @@ -827,7 +842,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {

let builder = || async {
self.inner
.list(prefix)
.list(prefix, start_after.clone(), limit)
.verbose_instrument_await(operation_type_str)
.await
};
Expand Down
20 changes: 14 additions & 6 deletions src/object_store/src/object/opendal_engine/opendal_object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,13 +231,21 @@ impl ObjectStore for OpendalObjectStore {
Ok(())
}

async fn list(&self, prefix: &str) -> ObjectResult<ObjectMetadataIter> {
let object_lister = self
async fn list(
&self,
prefix: &str,
start_after: Option<String>,
limit: Option<usize>,
) -> ObjectResult<ObjectMetadataIter> {
let mut object_lister = self
.op
.lister_with(prefix)
.recursive(true)
.metakey(Metakey::ContentLength)
.await?;
.metakey(Metakey::ContentLength);
if let Some(start_after) = start_after {
object_lister = object_lister.start_after(&start_after);
}
let object_lister = object_lister.await?;

let stream = stream::unfold(object_lister, |mut object_lister| async move {
match object_lister.next().await {
Expand All @@ -261,7 +269,7 @@ impl ObjectStore for OpendalObjectStore {
}
});

Ok(stream.boxed())
Ok(stream.take(limit.unwrap_or(usize::MAX)).boxed())
}

fn store_media_type(&self) -> &'static str {
Expand Down Expand Up @@ -428,7 +436,7 @@ mod tests {

async fn list_all(prefix: &str, store: &OpendalObjectStore) -> Vec<ObjectMetadata> {
store
.list(prefix)
.list(prefix, None, None)
.await
.unwrap()
.try_collect::<Vec<_>>()
Expand Down
Loading

0 comments on commit 8f0d5de

Please sign in to comment.