Skip to content

Commit

Permalink
fix(meta): use object store's timestamp in SST retention sanity check
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Oct 16, 2024
1 parent 5d914ba commit 17183c2
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 33 deletions.
45 changes: 43 additions & 2 deletions src/meta/src/hummock/manager/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
use std::collections::{BTreeMap, HashMap, HashSet};

use fail::fail_point;
use futures::{stream, StreamExt};
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_hummock_sdk::{
HummockContextId, HummockEpoch, HummockSstableObjectId, HummockVersionId, LocalSstableInfo,
INVALID_VERSION_ID,
get_sst_data_path, HummockContextId, HummockEpoch, HummockSstableObjectId, HummockVersionId,
LocalSstableInfo, INVALID_VERSION_ID,
};
use risingwave_pb::hummock::{HummockPinnedVersion, ValidationTask};

Expand Down Expand Up @@ -133,6 +135,29 @@ impl HummockManager {
pub async fn get_min_pinned_version_id(&self) -> HummockVersionId {
self.context_info.read().await.min_pinned_version_id()
}

async fn get_objects_created_at(
&self,
// need IntoIterator to work around stream's "implementation of `std::iter::Iterator` is not general enough" error.
object_ids: impl IntoIterator<Item = HummockSstableObjectId>,
) -> Result<HashMap<HummockSstableObjectId, u64>> {
let futures = object_ids.into_iter().map(|object_id| {
let object_store = self.object_store.clone();
let system_params_reader = self.env.system_params_reader();
async move {
let system_params = system_params_reader.await;
let obj_prefix = object_store
.get_object_prefix(object_id, system_params.use_new_object_prefix_strategy());
let data_dir = system_params.data_directory();
let path = get_sst_data_path(data_dir, &obj_prefix, object_id);
let metadata = object_store.metadata(&path).await?;
Ok::<(HummockSstableObjectId, u64), Error>((object_id, metadata.last_modified))
}
});
let res: Vec<_> = stream::iter(futures).buffer_unordered(100).collect().await;
let res: Result<HashMap<_, _>> = res.into_iter().collect();
res
}
}

impl ContextInfo {
Expand Down Expand Up @@ -241,6 +266,14 @@ impl HummockManager {
.iter()
.map(|s| (s.sst_info.object_id, s.created_at)),
)?;

let ids = sstables.iter().map(|s| s.sst_info.object_id).collect_vec();
let id_to_ts = self.get_objects_created_at(ids).await?;
check_sst_retention(
now,
self.env.opts.min_sst_retention_time_sec,
id_to_ts.into_iter(),
)?;
}

async {
Expand Down Expand Up @@ -293,6 +326,14 @@ impl HummockManager {
now,
self.env.opts.min_sst_retention_time_sec,
object_timestamps.iter().map(|(k, v)| (*k, *v)),
)?;

let ids = object_timestamps.iter().map(|(id, _)| *id).collect_vec();
let id_to_ts = self.get_objects_created_at(ids).await?;
check_sst_retention(
now,
self.env.opts.min_sst_retention_time_sec,
id_to_ts.into_iter(),
)
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/object_store/src/object/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ fn get_obj_meta(path: &str, obj: &Bytes) -> ObjectResult<ObjectMetadata> {
last_modified: SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(ObjectError::internal)?
.as_secs_f64(),
.as_secs_f64() as u64,
total_size: obj.len(),
})
}
Expand Down
2 changes: 1 addition & 1 deletion src/object_store/src/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub struct ObjectMetadata {
// Full path
pub key: String,
// Seconds since unix epoch.
pub last_modified: f64,
pub last_modified: u64,
pub total_size: usize,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ impl ObjectStore for OpendalObjectStore {
let opendal_metadata = self.op.stat(path).await?;
let key = path.to_string();
let last_modified = match opendal_metadata.last_modified() {
Some(t) => t.timestamp() as f64,
None => 0_f64,
Some(t) => t.timestamp().try_into().unwrap(),
None => 0u64,
};

let total_size = opendal_metadata.content_length() as usize;
Expand Down Expand Up @@ -253,8 +253,8 @@ impl ObjectStore for OpendalObjectStore {
let key = object.path().to_string();
let om = object.metadata();
let last_modified = match om.last_modified() {
Some(t) => t.timestamp() as f64,
None => 0_f64,
Some(t) => t.timestamp().try_into().unwrap(),
None => 0_u64,
};
let total_size = om.content_length() as usize;
let metadata = ObjectMetadata {
Expand Down
6 changes: 3 additions & 3 deletions src/object_store/src/object/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ impl ObjectStore for S3ObjectStore {
last_modified: resp
.last_modified()
.expect("last_modified required")
.as_secs_f64(),
.as_secs_f64() as u64,
total_size: resp.content_length.unwrap_or_default() as usize,
})
}
Expand Down Expand Up @@ -1031,8 +1031,8 @@ impl Stream for S3ObjectIter {
key: obj.key().expect("key required").to_owned(),
last_modified: obj
.last_modified()
.map(|l| l.as_secs_f64())
.unwrap_or(0f64),
.map(|l| l.as_secs_f64() as u64)
.unwrap_or(0u64),
total_size: obj.size().unwrap_or_default() as usize,
})
.collect_vec();
Expand Down
2 changes: 1 addition & 1 deletion src/object_store/src/object/sim/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ fn get_obj_meta(path: &str, obj: &Bytes) -> Result<ObjectMetadata> {
last_modified: SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(ObjectError::internal)?
.as_secs_f64(),
.as_secs_f64() as u64,
total_size: obj.len(),
})
}
Expand Down
22 changes: 22 additions & 0 deletions src/storage/hummock_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,28 @@ impl EpochWithGap {
}
}

pub fn get_sst_data_path(
path_prefix: &str,
obj_prefix: &str,
object_id: HummockSstableObjectId,
) -> String {
let mut path = String::with_capacity(
path_prefix.len()
+ "/".len()
+ obj_prefix.len()
+ HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH
+ ".".len()
+ OBJECT_SUFFIX.len(),
);
path.push_str(path_prefix);
path.push('/');
path.push_str(obj_prefix);
path.push_str(&object_id.to_string());
path.push('.');
path.push_str(OBJECT_SUFFIX);
path
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
4 changes: 2 additions & 2 deletions src/storage/hummock_test/src/vacuum_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ async fn test_full_scan() {
let object_store_list_result = vec![
ObjectMetadata {
key: sstable_store.get_sst_data_path(1),
last_modified: now_ts.sub(Duration::from_secs(7200)).as_secs_f64(),
last_modified: now_ts.sub(Duration::from_secs(7200)).as_secs_f64() as u64,
total_size: 128,
},
ObjectMetadata {
key: sstable_store.get_sst_data_path(2),
last_modified: now_ts.sub(Duration::from_secs(3600)).as_secs_f64(),
last_modified: now_ts.sub(Duration::from_secs(3600)).as_secs_f64() as u64,
total_size: 128,
},
];
Expand Down
20 changes: 2 additions & 18 deletions src/storage/src/hummock/sstable_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ use foyer::{
use futures::{future, StreamExt};
use itertools::Itertools;
use risingwave_hummock_sdk::sstable_info::SstableInfo;
use risingwave_hummock_sdk::{
HummockSstableObjectId, HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH, OBJECT_SUFFIX,
};
use risingwave_hummock_sdk::{get_sst_data_path, HummockSstableObjectId, OBJECT_SUFFIX};
use risingwave_hummock_trace::TracedCachePolicy;
use risingwave_object_store::object::{
ObjectError, ObjectMetadataIter, ObjectResult, ObjectStoreRef, ObjectStreamingUploader,
Expand Down Expand Up @@ -522,21 +520,7 @@ impl SstableStore {
let obj_prefix = self
.store
.get_object_prefix(object_id, self.use_new_object_prefix_strategy);
let mut path = String::with_capacity(
self.path.len()
+ "/".len()
+ obj_prefix.len()
+ HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH
+ ".".len()
+ OBJECT_SUFFIX.len(),
);
path.push_str(&self.path);
path.push('/');
path.push_str(&obj_prefix);
path.push_str(&object_id.to_string());
path.push('.');
path.push_str(OBJECT_SUFFIX);
path
get_sst_data_path(&self.path, &obj_prefix, object_id)
}

pub fn get_object_id_from_path(path: &str) -> HummockSstableObjectId {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/hummock/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl Vacuum {
next_start_after = Some(o.key.clone());
tracing::debug!(next_start_after, "set next start after");
}
if o.last_modified < full_scan_task.sst_retention_watermark as f64 {
if o.last_modified < full_scan_task.sst_retention_watermark {
Some(Ok(SstableStore::get_object_id_from_path(&o.key)))
} else {
None
Expand Down

0 comments on commit 17183c2

Please sign in to comment.