Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(storage): support pagination for LIST during full GC #18737

Merged
merged 5 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,8 @@ message VacuumTask {
message FullScanTask {
uint64 sst_retention_time_sec = 1;
optional string prefix = 2;
optional string start_after = 3;
optional uint64 limit = 4;
}

// Cancel compact task
Expand Down Expand Up @@ -580,6 +582,7 @@ message ReportFullScanTaskRequest {
uint64 total_object_count = 2;
// Total size of objects before filtered by conditions specified by FullScanTask.
uint64 total_object_size = 3;
optional string next_start_after = 4;
}

message ReportFullScanTaskResponse {
Expand Down
8 changes: 8 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ pub struct MetaConfig {
#[serde(default = "default::meta::full_gc_interval_sec")]
pub full_gc_interval_sec: u64,

/// Max number of object per full GC job can fetch.
#[serde(default = "default::meta::full_gc_object_limit")]
pub full_gc_object_limit: u64,

/// The spin interval when collecting global GC watermark in hummock.
#[serde(default = "default::meta::collect_gc_watermark_spin_interval_sec")]
pub collect_gc_watermark_spin_interval_sec: u64,
Expand Down Expand Up @@ -1346,6 +1350,10 @@ pub mod default {
86400
}

pub fn full_gc_object_limit() -> u64 {
100_000
}

pub fn collect_gc_watermark_spin_interval_sec() -> u64 {
5
}
Expand Down
1 change: 1 addition & 0 deletions src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ This page is automatically generated by `./risedev generate-example-config`
| event_log_channel_max_size | Keeps the latest N events per channel. | 10 |
| event_log_enabled | | true |
| full_gc_interval_sec | Interval of automatic hummock full GC. | 86400 |
| full_gc_object_limit | Max number of object per full GC job can fetch. | 100000 |
| hummock_time_travel_snapshot_interval | The interval at which a Hummock version snapshot is taken for time travel. Larger value indicates less storage overhead but worse query performance. | 100 |
| hummock_version_checkpoint_interval_sec | Interval of hummock version checkpoint. | 30 |
| hybrid_partition_vnode_count | Count of partitions of tables in default group and materialized view group. The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment. Each partition contains aligned data of `VirtualNode::COUNT / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. Set it zero to disable this feature. | 4 |
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dir = "./"
[meta]
min_sst_retention_time_sec = 86400
full_gc_interval_sec = 86400
full_gc_object_limit = 100000
collect_gc_watermark_spin_interval_sec = 5
periodic_compaction_interval_sec = 60
vacuum_interval_sec = 30
Expand Down
2 changes: 1 addition & 1 deletion src/ctl/src/cmd_impl/hummock/sst_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result
.await?;
} else {
let mut metadata_iter = sstable_store
.list_object_metadata_from_object_store(None)
.list_object_metadata_from_object_store(None, None, None)
.await?;
while let Some(obj) = metadata_iter.try_next().await? {
print_object(&obj);
Expand Down
1 change: 1 addition & 0 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ pub fn start(
.min_delta_log_num_for_hummock_version_checkpoint,
min_sst_retention_time_sec: config.meta.min_sst_retention_time_sec,
full_gc_interval_sec: config.meta.full_gc_interval_sec,
full_gc_object_limit: config.meta.full_gc_object_limit,
collect_gc_watermark_spin_interval_sec: config
.meta
.collect_gc_watermark_spin_interval_sec,
Expand Down
5 changes: 4 additions & 1 deletion src/meta/service/src/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,10 @@ impl HummockManagerService for HummockServiceImpl {
// The following operation takes some time, so we do it in dedicated task and responds the
// RPC immediately.
tokio::spawn(async move {
match hummock_manager.complete_full_gc(req.object_ids).await {
match hummock_manager
.complete_full_gc(req.object_ids, req.next_start_after)
.await
{
Ok(number) => {
tracing::info!("Full GC results {} SSTs to delete", number);
}
Expand Down
48 changes: 43 additions & 5 deletions src/meta/src/hummock/manager/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,14 +188,19 @@ impl HummockManager {
sst_retention_time,
Duration::from_secs(self.env.opts.min_sst_retention_time_sec),
);
let start_after = self.full_gc_state.next_start_after();
let limit = self.full_gc_state.limit;
tracing::info!(
retention_sec = sst_retention_time.as_secs(),
prefix = prefix.as_ref().unwrap_or(&String::from("")),
start_after,
limit,
"run full GC"
);

let compactor = match self.compactor_manager.next_compactor() {
None => {
tracing::warn!("Try full GC but no available idle worker.");
tracing::warn!("full GC attempt but no available idle worker");
return Ok(false);
}
Some(compactor) => compactor,
Expand All @@ -204,14 +209,21 @@ impl HummockManager {
.send_event(ResponseEvent::FullScanTask(FullScanTask {
sst_retention_time_sec: sst_retention_time.as_secs(),
prefix,
start_after,
limit,
}))
.map_err(|_| Error::CompactorUnreachable(compactor.context_id()))?;
Ok(true)
}

/// Given candidate SSTs to GC, filter out false positive.
/// Returns number of SSTs to GC.
pub async fn complete_full_gc(&self, object_ids: Vec<HummockSstableObjectId>) -> Result<usize> {
pub async fn complete_full_gc(
&self,
object_ids: Vec<HummockSstableObjectId>,
next_start_after: Option<String>,
) -> Result<usize> {
self.full_gc_state.set_next_start_after(next_start_after);
if object_ids.is_empty() {
tracing::info!("SST full scan returns no SSTs.");
return Ok(0);
Expand Down Expand Up @@ -252,6 +264,28 @@ impl HummockManager {
}
}

pub struct FullGcState {
next_start_after: Mutex<Option<String>>,
limit: Option<u64>,
}

impl FullGcState {
pub fn new(limit: Option<u64>) -> Self {
Self {
next_start_after: Mutex::new(None),
limit,
}
}

pub fn set_next_start_after(&self, next_start_after: Option<String>) {
*self.next_start_after.lock() = next_start_after;
}

pub fn next_start_after(&self) -> Option<String> {
self.next_start_after.lock().clone()
}
}

/// Collects SST GC watermark from related cluster nodes and calculates a global one.
///
/// It must wait enough heartbeats first. This precondition is checked at `spin_interval`.
Expand Down Expand Up @@ -403,7 +437,10 @@ mod tests {
);

// Empty input results immediate return, without waiting heartbeat.
hummock_manager.complete_full_gc(vec![]).await.unwrap();
hummock_manager
.complete_full_gc(vec![], None)
.await
.unwrap();

// mimic CN heartbeat
use risingwave_pb::meta::heartbeat_request::extra_info::Info;
Expand All @@ -428,7 +465,7 @@ mod tests {
assert_eq!(
3,
hummock_manager
.complete_full_gc(vec![1, 2, 3])
.complete_full_gc(vec![1, 2, 3], None)
.await
.unwrap()
);
Expand All @@ -452,7 +489,8 @@ mod tests {
1,
hummock_manager
.complete_full_gc(
[committed_object_ids, vec![max_committed_object_id + 1]].concat()
[committed_object_ids, vec![max_committed_object_id + 1]].concat(),
None
)
.await
.unwrap()
Expand Down
6 changes: 4 additions & 2 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::hummock::compaction::CompactStatus;
use crate::hummock::error::Result;
use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
use crate::hummock::manager::context::ContextInfo;
use crate::hummock::manager::gc::DeleteObjectTracker;
use crate::hummock::manager::gc::{DeleteObjectTracker, FullGcState};
use crate::hummock::CompactorManagerRef;
use crate::manager::{MetaSrvEnv, MetaStoreImpl, MetadataManager};
use crate::model::{ClusterId, MetadataModel, MetadataModelError};
Expand Down Expand Up @@ -113,6 +113,7 @@ pub struct HummockManager {
// `compaction_state` will record the types of compact tasks that can be triggered in `hummock`
// and suggest types with a certain priority.
pub compaction_state: CompactionState,
full_gc_state: FullGcState,
}

pub type HummockManagerRef = Arc<HummockManager>;
Expand Down Expand Up @@ -246,7 +247,7 @@ impl HummockManager {
let version_checkpoint_path = version_checkpoint_path(state_store_dir);
let version_archive_dir = version_archive_dir(state_store_dir);
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();

let full_gc_object_limit = env.opts.full_gc_object_limit;
let instance = HummockManager {
env,
versioning: MonitoredRwLock::new(
Expand Down Expand Up @@ -285,6 +286,7 @@ impl HummockManager {
history_table_throughput: parking_lot::RwLock::new(HashMap::default()),
compactor_streams_change_tx,
compaction_state: CompactionState::new(),
full_gc_state: FullGcState::new(Some(full_gc_object_limit)),
};
let instance = Arc::new(instance);
instance.init_time_travel_state().await?;
Expand Down
4 changes: 3 additions & 1 deletion src/meta/src/hummock/manager/timer_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,10 @@ impl HummockManager {
}

HummockTimerEvent::FullGc => {
let retention_sec =
hummock_manager.env.opts.min_sst_retention_time_sec;
if hummock_manager
.start_full_gc(Duration::from_secs(3600), None)
.start_full_gc(Duration::from_secs(retention_sec), None)
.is_ok()
{
tracing::info!("Start full GC from meta node.");
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/hummock/mock_hummock_meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ impl HummockMetaClient for MockHummockMetaClient {
_filtered_object_ids: Vec<HummockSstableObjectId>,
_total_object_count: u64,
_total_object_size: u64,
_next_start_after: Option<String>,
) -> Result<()> {
unimplemented!()
}
Expand Down
3 changes: 3 additions & 0 deletions src/meta/src/manager/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ pub struct MetaOpts {
pub min_sst_retention_time_sec: u64,
/// Interval of automatic hummock full GC.
pub full_gc_interval_sec: u64,
/// Max number of object per full GC job can fetch.
pub full_gc_object_limit: u64,
/// The spin interval when collecting global GC watermark in hummock
pub collect_gc_watermark_spin_interval_sec: u64,
/// Enable sanity check when SSTs are committed
Expand Down Expand Up @@ -318,6 +320,7 @@ impl MetaOpts {
min_delta_log_num_for_hummock_version_checkpoint: 1,
min_sst_retention_time_sec: 3600 * 24 * 7,
full_gc_interval_sec: 3600 * 24 * 7,
full_gc_object_limit: 100_000,
collect_gc_watermark_spin_interval_sec: 5,
enable_committed_sst_sanity_check: false,
periodic_compaction_interval_sec: 60,
Expand Down
15 changes: 13 additions & 2 deletions src/object_store/src/object/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,19 +182,30 @@ impl ObjectStore for InMemObjectStore {
Ok(())
}

async fn list(&self, prefix: &str) -> ObjectResult<ObjectMetadataIter> {
async fn list(
&self,
prefix: &str,
start_after: Option<String>,
limit: Option<usize>,
) -> ObjectResult<ObjectMetadataIter> {
let list_result = self
.objects
.lock()
.await
.iter()
.filter_map(|(path, (metadata, _))| {
if let Some(ref start_after) = start_after
&& metadata.key.le(start_after)
{
return None;
}
if path.starts_with(prefix) {
return Some(metadata.clone());
}
None
})
.sorted_by(|a, b| Ord::cmp(&a.key, &b.key))
.take(limit.unwrap_or(usize::MAX))
.collect_vec();
Ok(Box::pin(InMemObjectIter::new(list_result)))
}
Expand Down Expand Up @@ -376,7 +387,7 @@ mod tests {

async fn list_all(prefix: &str, store: &InMemObjectStore) -> Vec<ObjectMetadata> {
store
.list(prefix)
.list(prefix, None, None)
.await
.unwrap()
.try_collect::<Vec<_>>()
Expand Down
25 changes: 20 additions & 5 deletions src/object_store/src/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,12 @@ pub trait ObjectStore: Send + Sync {
MonitoredObjectStore::new(self, metrics, config)
}

async fn list(&self, prefix: &str) -> ObjectResult<ObjectMetadataIter>;
async fn list(
&self,
prefix: &str,
start_after: Option<String>,
limit: Option<usize>,
) -> ObjectResult<ObjectMetadataIter>;

fn store_media_type(&self) -> &'static str;

Expand Down Expand Up @@ -326,8 +331,13 @@ impl ObjectStoreImpl {
object_store_impl_method_body!(self, delete_objects(paths).await)
}

pub async fn list(&self, prefix: &str) -> ObjectResult<ObjectMetadataIter> {
object_store_impl_method_body!(self, list(prefix).await)
pub async fn list(
&self,
prefix: &str,
start_after: Option<String>,
limit: Option<usize>,
) -> ObjectResult<ObjectMetadataIter> {
object_store_impl_method_body!(self, list(prefix, start_after, limit).await)
}

pub fn get_object_prefix(&self, obj_id: u64, use_new_object_prefix_strategy: bool) -> String {
Expand Down Expand Up @@ -814,7 +824,12 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
res
}

pub async fn list(&self, prefix: &str) -> ObjectResult<ObjectMetadataIter> {
pub async fn list(
&self,
prefix: &str,
start_after: Option<String>,
limit: Option<usize>,
) -> ObjectResult<ObjectMetadataIter> {
let operation_type = OperationType::List;
let operation_type_str = operation_type.as_str();
let media_type = self.media_type();
Expand All @@ -827,7 +842,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {

let builder = || async {
self.inner
.list(prefix)
.list(prefix, start_after.clone(), limit)
.verbose_instrument_await(operation_type_str)
.await
};
Expand Down
20 changes: 14 additions & 6 deletions src/object_store/src/object/opendal_engine/opendal_object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,13 +231,21 @@ impl ObjectStore for OpendalObjectStore {
Ok(())
}

async fn list(&self, prefix: &str) -> ObjectResult<ObjectMetadataIter> {
let object_lister = self
async fn list(
&self,
prefix: &str,
start_after: Option<String>,
limit: Option<usize>,
) -> ObjectResult<ObjectMetadataIter> {
let mut object_lister = self
.op
.lister_with(prefix)
.recursive(true)
.metakey(Metakey::ContentLength)
.await?;
.metakey(Metakey::ContentLength);
if let Some(start_after) = start_after {
object_lister = object_lister.start_after(&start_after);
}
let object_lister = object_lister.await?;

let stream = stream::unfold(object_lister, |mut object_lister| async move {
match object_lister.next().await {
Expand All @@ -261,7 +269,7 @@ impl ObjectStore for OpendalObjectStore {
}
});

Ok(stream.boxed())
Ok(stream.take(limit.unwrap_or(usize::MAX)).boxed())
}

fn store_media_type(&self) -> &'static str {
Expand Down Expand Up @@ -428,7 +436,7 @@ mod tests {

async fn list_all(prefix: &str, store: &OpendalObjectStore) -> Vec<ObjectMetadata> {
store
.list(prefix)
.list(prefix, None, None)
.await
.unwrap()
.try_collect::<Vec<_>>()
Expand Down
Loading
Loading