Skip to content

Commit

Permalink
chore: cr comment
Browse files Browse the repository at this point in the history
  • Loading branch information
QuenKar committed Oct 17, 2023
1 parent 49583f3 commit 33ae485
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 101 deletions.
2 changes: 1 addition & 1 deletion src/mito2/src/manifest/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ impl RegionManifestManagerInner {

/// Returns total manifest size.
pub(crate) fn total_manifest_size(&self) -> u64 {
self.store.get_total_manifest_size()
self.store.total_manifest_size()
}
}

Expand Down
205 changes: 105 additions & 100 deletions src/mito2/src/manifest/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,14 @@ pub fn file_version(path: &str) -> ManifestVersion {
s.parse().unwrap_or_else(|_| panic!("Invalid file: {path}"))
}

/// Return's the file name from path
/// Just use for .json and .checkpoint file
/// ### Panics
/// Panics if the file path is not a valid delta or checkpoint file.
pub fn file_name(path: &str) -> String {
/// Return the file name from path.
/// Just for (`.json`) and (`.checkpoint`) file, other file will return None.
pub fn file_name(path: &str) -> Option<String> {
let name = path.rsplit('/').next().unwrap_or("").to_string();
if !is_checkpoint_file(&name) && !is_delta_file(&name) {
panic!("Invalid file: {path}")
return None;
}
name
Some(name)
}

/// Return's the file compress algorithm by file extension.
Expand Down Expand Up @@ -141,15 +139,36 @@ impl ObjectStoreLogIterator {
}
}

/// Key to identify a manifest file.
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
enum FileKey {
/// A delta file (`.json`).
Delta(ManifestVersion),
/// A checkpoint file (`.checkpoint`).
Checkpoint(ManifestVersion),
}

impl FileKey {
/// New a FileKey, if the file name is not a valid delta
/// or checkpoint file, return None.
pub fn new(file_name: &str) -> Option<Self> {
if is_delta_file(file_name) {
Some(FileKey::Delta(file_version(file_name)))
} else if is_checkpoint_file(file_name) {
Some(FileKey::Checkpoint(file_version(file_name)))
} else {
None
}
}
}

#[derive(Clone, Debug)]
pub struct ManifestObjectStore {
object_store: ObjectStore,
compress_type: CompressionType,
path: String,
/// Stores the size of each manifest file.
/// K is the manifest file name(such as "0000000000000000006.checkpoint"
/// or "0000000000000000006.json"), and V is the file size.
manifest_size_map: HashMap<String, u64>,
manifest_size_map: HashMap<FileKey, u64>,
}

impl ManifestObjectStore {
Expand Down Expand Up @@ -301,8 +320,11 @@ impl ManifestObjectStore {

// delete the manifest'size in paths
for path in &paths {
let name = file_name(path);
self.manifest_size_map.remove(&name);
if let Some(name) = file_name(path) {
if let Some(file_key) = FileKey::new(&name) {
self.manifest_size_map.remove(&file_key);
}
}
}

self.object_store
Expand All @@ -325,11 +347,14 @@ impl ManifestObjectStore {
compress_type: self.compress_type,
path: &path,
})?;
self.set_manifest_size_by_path(&path, data.len() as u64);
self.object_store
let delta_size = data.len();
let _ = self
.object_store
.write(&path, data)
.await
.context(OpenDalSnafu)
.context(OpenDalSnafu);
self.set_manifest_size_by_path(&path, delta_size as u64);
Ok(())
}

/// Save the checkpoint manifest file.
Expand All @@ -343,11 +368,12 @@ impl ManifestObjectStore {
compress_type: self.compress_type,
path: &path,
})?;
self.set_manifest_size_by_path(&path, data.len() as u64);
let checkpoint_size = data.len();
self.object_store
.write(&path, data)
.await
.context(OpenDalSnafu)?;
self.set_manifest_size_by_path(&path, checkpoint_size as u64);

// Because last checkpoint file only contain size and version, which is tiny, so we don't compress it.
let last_checkpoint_path = self.last_checkpoint_path();
Expand Down Expand Up @@ -380,55 +406,56 @@ impl ManifestObjectStore {
let path = self.checkpoint_file_path(version);
// Due to backward compatibility, it is possible that the user's checkpoint not compressed,
// so if we don't find file by compressed type. fall back to checkpoint not compressed find again.
let checkpoint_data = match self.object_store.read(&path).await {
Ok(checkpoint) => {
let decompress_data =
self.compress_type
.decode(checkpoint)
.await
.context(DecompressObjectSnafu {
let checkpoint_data =
match self.object_store.read(&path).await {
Ok(checkpoint) => {
let checkpoint_size = checkpoint.len();
let decompress_data = self.compress_type.decode(checkpoint).await.context(
DecompressObjectSnafu {
compress_type: self.compress_type,
path: path.clone(),
})?;
// set the checkpoint size
self.set_manifest_size_by_path(&path, decompress_data.len() as u64);
Ok(Some(decompress_data))
}
Err(e) => {
if e.kind() == ErrorKind::NotFound {
if self.compress_type != FALL_BACK_COMPRESS_TYPE {
let fall_back_path = gen_path(
&self.path,
&checkpoint_file(version),
FALL_BACK_COMPRESS_TYPE,
);
debug!(
"Failed to load checkpoint from path: {}, fall back to path: {}",
path, fall_back_path
);
match self.object_store.read(&fall_back_path).await {
Ok(checkpoint) => {
let decompress_data = FALL_BACK_COMPRESS_TYPE
.decode(checkpoint)
.await
.context(DecompressObjectSnafu {
compress_type: FALL_BACK_COMPRESS_TYPE,
path: path.clone(),
})?;
self.set_manifest_size_by_path(&path, decompress_data.len() as u64);
Ok(Some(decompress_data))
},
)?;
// set the checkpoint size
self.set_manifest_size_by_path(&path, checkpoint_size as u64);
Ok(Some(decompress_data))
}
Err(e) => {
if e.kind() == ErrorKind::NotFound {
if self.compress_type != FALL_BACK_COMPRESS_TYPE {
let fall_back_path = gen_path(
&self.path,
&checkpoint_file(version),
FALL_BACK_COMPRESS_TYPE,
);
debug!(
"Failed to load checkpoint from path: {}, fall back to path: {}",
path, fall_back_path
);
match self.object_store.read(&fall_back_path).await {
Ok(checkpoint) => {
let checkpoint_size = checkpoint.len();
let decompress_data = FALL_BACK_COMPRESS_TYPE
.decode(checkpoint)
.await
.context(DecompressObjectSnafu {
compress_type: FALL_BACK_COMPRESS_TYPE,
path: path.clone(),
})?;
self.set_manifest_size_by_path(&path, checkpoint_size as u64);
Ok(Some(decompress_data))
}
Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
Err(e) => Err(e).context(OpenDalSnafu),
}
Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
Err(e) => Err(e).context(OpenDalSnafu),
} else {
Ok(None)
}
} else {
Ok(None)
Err(e).context(OpenDalSnafu)
}
} else {
Err(e).context(OpenDalSnafu)
}
}
}?;
}?;
Ok(checkpoint_data.map(|data| (version, data)))
}

Expand Down Expand Up @@ -464,64 +491,42 @@ impl ManifestObjectStore {
/// Get the size(Byte) of the delta file by delta version.
/// If the delta file does not exist, return None.
pub fn delta_file_size(&self, version: ManifestVersion) -> Option<u64> {
self.manifest_size_map.get(&delta_file(version)).copied()
self.manifest_size_map
.get(&FileKey::Delta(version))
.copied()
}

/// Get the size(Byte) of the checkpoint file by checkpoint version.
/// If the checkpoint file does not exist, return None.
pub fn checkpoint_file_size(&self, version: ManifestVersion) -> Option<u64> {
self.manifest_size_map
.get(&checkpoint_file(version))
.get(&FileKey::Checkpoint(version))
.copied()
}

/// Compute the size(Byte) in manifest size map.
pub fn get_total_manifest_size(&self) -> u64 {
pub fn total_manifest_size(&self) -> u64 {
self.manifest_size_map.values().sum()
}

/// Count the total size(Byte) of exist manifest files which satisfy:
/// delta file version <= end and checkpoint file version < end.
/// Notice: this function will read files from object store.
pub async fn set_manifest_size_until(&mut self, end: ManifestVersion) -> Result<()> {
let entries = self
.get_paths(|entry| {
let file_name = entry.name();
if is_delta_file(file_name) || is_checkpoint_file(file_name) {
let file_version = file_version(file_name);
if file_version < end || (file_version == end && is_delta_file(file_name)) {
return Some(entry);
}
}
None
})
.await?;
for entry in entries {
let bytes = self
.object_store
.read(entry.path())
.await
.context(OpenDalSnafu)?;
self.set_manifest_size_by_path(entry.path(), bytes.len() as u64);
}

Ok(())
}

/// Set the size of the manifest file by path.
pub fn set_manifest_size_by_path(&mut self, path: &str, size: u64) {
self.manifest_size_map.insert(file_name(path), size);
if let Some(name) = file_name(path) {
if let Some(file_key) = FileKey::new(&name) {
self.manifest_size_map.insert(file_key, size);
}
}
}

/// Set the size of the delta file by delta version.
pub fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) {
self.manifest_size_map.insert(delta_file(version), size);
self.manifest_size_map.insert(FileKey::Delta(version), size);
}

/// Set the size of the checkpoint file by checkpoint version.
pub fn set_checkpoint_file_size(&mut self, version: ManifestVersion, size: u64) {
self.manifest_size_map
.insert(checkpoint_file(version), size);
.insert(FileKey::Checkpoint(version), size);
}
}

Expand Down Expand Up @@ -705,12 +710,12 @@ mod tests {
let name = file_name(
"data/greptime/public/1054/1054_0000000000/manifest/00000000000000000007.json",
);
assert_eq!(name, "00000000000000000007.json");
assert_eq!(name.unwrap(), "00000000000000000007.json");

let name = file_name(
"/data/greptime/public/1054/1054_0000000000/manifest/00000000000000000007.checkpoint",
);
assert_eq!(name, "00000000000000000007.checkpoint");
assert_eq!(name.unwrap(), "00000000000000000007.checkpoint");

let version = file_version("00000000000000000007.checkpoint");
assert_eq!(version, 7);
Expand Down Expand Up @@ -744,13 +749,13 @@ mod tests {
assert_eq!(log_store.checkpoint_file_size(5), Some(23));

// manifest files size
assert_eq!(log_store.get_total_manifest_size(), 63);
assert_eq!(log_store.total_manifest_size(), 63);

// delete 3 manifest files
assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);

// manifest files size after delete
assert_eq!(log_store.get_total_manifest_size(), 39);
assert_eq!(log_store.total_manifest_size(), 39);

// delete all manifest files
assert_eq!(
Expand All @@ -761,7 +766,7 @@ mod tests {
3
);

assert_eq!(log_store.get_total_manifest_size(), 0);
assert_eq!(log_store.total_manifest_size(), 0);
}

#[tokio::test]
Expand All @@ -782,13 +787,13 @@ mod tests {
.unwrap();

// manifest files size
assert_eq!(log_store.get_total_manifest_size(), 181);
assert_eq!(log_store.total_manifest_size(), 181);

// delete 3 manifest files
assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);

// manifest files size after delete
assert_eq!(log_store.get_total_manifest_size(), 97);
assert_eq!(log_store.total_manifest_size(), 97);

// delete all manifest files
assert_eq!(
Expand All @@ -799,6 +804,6 @@ mod tests {
3
);

assert_eq!(log_store.get_total_manifest_size(), 0);
assert_eq!(log_store.total_manifest_size(), 0);
}
}

0 comments on commit 33ae485

Please sign in to comment.