diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index 81e3128b7fcd..653cd7511fc8 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -356,7 +356,7 @@ impl RegionManifestManagerInner { /// Returns total manifest size. pub(crate) fn total_manifest_size(&self) -> u64 { - self.store.get_total_manifest_size() + self.store.total_manifest_size() } } diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index 5179df743994..73e507978906 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -80,16 +80,14 @@ pub fn file_version(path: &str) -> ManifestVersion { s.parse().unwrap_or_else(|_| panic!("Invalid file: {path}")) } -/// Return's the file name from path -/// Just use for .json and .checkpoint file -/// ### Panics -/// Panics if the file path is not a valid delta or checkpoint file. -pub fn file_name(path: &str) -> String { +/// Return the file name from path. +/// Just for (`.json`) and (`.checkpoint`) file, other file will return None. +pub fn file_name(path: &str) -> Option { let name = path.rsplit('/').next().unwrap_or("").to_string(); if !is_checkpoint_file(&name) && !is_delta_file(&name) { - panic!("Invalid file: {path}") + return None; } - name + Some(name) } /// Return's the file compress algorithm by file extension. @@ -141,15 +139,36 @@ impl ObjectStoreLogIterator { } } +/// Key to identify a manifest file. +#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] +enum FileKey { + /// A delta file (`.json`). + Delta(ManifestVersion), + /// A checkpoint file (`.checkpoint`). + Checkpoint(ManifestVersion), +} + +impl FileKey { + /// New a FileKey, if the file name is not a valid delta + /// or checkpoint file, return None. + pub fn new(file_name: &str) -> Option { + if is_delta_file(file_name) { + Some(FileKey::Delta(file_version(file_name))) + } else if is_checkpoint_file(file_name) { + Some(FileKey::Checkpoint(file_version(file_name))) + } else { + None + } + } +} + #[derive(Clone, Debug)] pub struct ManifestObjectStore { object_store: ObjectStore, compress_type: CompressionType, path: String, /// Stores the size of each manifest file. - /// K is the manifest file name(such as "0000000000000000006.checkpoint" - /// or "0000000000000000006.json"), and V is the file size. - manifest_size_map: HashMap, + manifest_size_map: HashMap, } impl ManifestObjectStore { @@ -301,8 +320,11 @@ impl ManifestObjectStore { // delete the manifest'size in paths for path in &paths { - let name = file_name(path); - self.manifest_size_map.remove(&name); + if let Some(name) = file_name(path) { + if let Some(file_key) = FileKey::new(&name) { + self.manifest_size_map.remove(&file_key); + } + } } self.object_store @@ -325,11 +347,14 @@ impl ManifestObjectStore { compress_type: self.compress_type, path: &path, })?; - self.set_manifest_size_by_path(&path, data.len() as u64); - self.object_store + let delta_size = data.len(); + let _ = self + .object_store .write(&path, data) .await - .context(OpenDalSnafu) + .context(OpenDalSnafu); + self.set_manifest_size_by_path(&path, delta_size as u64); + Ok(()) } /// Save the checkpoint manifest file. @@ -343,11 +368,12 @@ impl ManifestObjectStore { compress_type: self.compress_type, path: &path, })?; - self.set_manifest_size_by_path(&path, data.len() as u64); + let checkpoint_size = data.len(); self.object_store .write(&path, data) .await .context(OpenDalSnafu)?; + self.set_manifest_size_by_path(&path, checkpoint_size as u64); // Because last checkpoint file only contain size and version, which is tiny, so we don't compress it. let last_checkpoint_path = self.last_checkpoint_path(); @@ -380,55 +406,56 @@ impl ManifestObjectStore { let path = self.checkpoint_file_path(version); // Due to backward compatibility, it is possible that the user's checkpoint not compressed, // so if we don't find file by compressed type. fall back to checkpoint not compressed find again. - let checkpoint_data = match self.object_store.read(&path).await { - Ok(checkpoint) => { - let decompress_data = - self.compress_type - .decode(checkpoint) - .await - .context(DecompressObjectSnafu { + let checkpoint_data = + match self.object_store.read(&path).await { + Ok(checkpoint) => { + let checkpoint_size = checkpoint.len(); + let decompress_data = self.compress_type.decode(checkpoint).await.context( + DecompressObjectSnafu { compress_type: self.compress_type, path: path.clone(), - })?; - // set the checkpoint size - self.set_manifest_size_by_path(&path, decompress_data.len() as u64); - Ok(Some(decompress_data)) - } - Err(e) => { - if e.kind() == ErrorKind::NotFound { - if self.compress_type != FALL_BACK_COMPRESS_TYPE { - let fall_back_path = gen_path( - &self.path, - &checkpoint_file(version), - FALL_BACK_COMPRESS_TYPE, - ); - debug!( - "Failed to load checkpoint from path: {}, fall back to path: {}", - path, fall_back_path - ); - match self.object_store.read(&fall_back_path).await { - Ok(checkpoint) => { - let decompress_data = FALL_BACK_COMPRESS_TYPE - .decode(checkpoint) - .await - .context(DecompressObjectSnafu { - compress_type: FALL_BACK_COMPRESS_TYPE, - path: path.clone(), - })?; - self.set_manifest_size_by_path(&path, decompress_data.len() as u64); - Ok(Some(decompress_data)) + }, + )?; + // set the checkpoint size + self.set_manifest_size_by_path(&path, checkpoint_size as u64); + Ok(Some(decompress_data)) + } + Err(e) => { + if e.kind() == ErrorKind::NotFound { + if self.compress_type != FALL_BACK_COMPRESS_TYPE { + let fall_back_path = gen_path( + &self.path, + &checkpoint_file(version), + FALL_BACK_COMPRESS_TYPE, + ); + debug!( + "Failed to load checkpoint from path: {}, fall back to path: {}", + path, fall_back_path + ); + match self.object_store.read(&fall_back_path).await { + Ok(checkpoint) => { + let checkpoint_size = checkpoint.len(); + let decompress_data = FALL_BACK_COMPRESS_TYPE + .decode(checkpoint) + .await + .context(DecompressObjectSnafu { + compress_type: FALL_BACK_COMPRESS_TYPE, + path: path.clone(), + })?; + self.set_manifest_size_by_path(&path, checkpoint_size as u64); + Ok(Some(decompress_data)) + } + Err(e) if e.kind() == ErrorKind::NotFound => Ok(None), + Err(e) => Err(e).context(OpenDalSnafu), } - Err(e) if e.kind() == ErrorKind::NotFound => Ok(None), - Err(e) => Err(e).context(OpenDalSnafu), + } else { + Ok(None) } } else { - Ok(None) + Err(e).context(OpenDalSnafu) } - } else { - Err(e).context(OpenDalSnafu) } - } - }?; + }?; Ok(checkpoint_data.map(|data| (version, data))) } @@ -464,64 +491,42 @@ impl ManifestObjectStore { /// Get the size(Byte) of the delta file by delta version. /// If the delta file does not exist, return None. pub fn delta_file_size(&self, version: ManifestVersion) -> Option { - self.manifest_size_map.get(&delta_file(version)).copied() + self.manifest_size_map + .get(&FileKey::Delta(version)) + .copied() } /// Get the size(Byte) of the checkpoint file by checkpoint version. /// If the checkpoint file does not exist, return None. pub fn checkpoint_file_size(&self, version: ManifestVersion) -> Option { self.manifest_size_map - .get(&checkpoint_file(version)) + .get(&FileKey::Checkpoint(version)) .copied() } /// Compute the size(Byte) in manifest size map. - pub fn get_total_manifest_size(&self) -> u64 { + pub fn total_manifest_size(&self) -> u64 { self.manifest_size_map.values().sum() } - /// Count the total size(Byte) of exist manifest files which satisfy: - /// delta file version <= end and checkpoint file version < end. - /// Notice: this function will read files from object store. - pub async fn set_manifest_size_until(&mut self, end: ManifestVersion) -> Result<()> { - let entries = self - .get_paths(|entry| { - let file_name = entry.name(); - if is_delta_file(file_name) || is_checkpoint_file(file_name) { - let file_version = file_version(file_name); - if file_version < end || (file_version == end && is_delta_file(file_name)) { - return Some(entry); - } - } - None - }) - .await?; - for entry in entries { - let bytes = self - .object_store - .read(entry.path()) - .await - .context(OpenDalSnafu)?; - self.set_manifest_size_by_path(entry.path(), bytes.len() as u64); - } - - Ok(()) - } - /// Set the size of the manifest file by path. pub fn set_manifest_size_by_path(&mut self, path: &str, size: u64) { - self.manifest_size_map.insert(file_name(path), size); + if let Some(name) = file_name(path) { + if let Some(file_key) = FileKey::new(&name) { + self.manifest_size_map.insert(file_key, size); + } + } } /// Set the size of the delta file by delta version. pub fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) { - self.manifest_size_map.insert(delta_file(version), size); + self.manifest_size_map.insert(FileKey::Delta(version), size); } /// Set the size of the checkpoint file by checkpoint version. pub fn set_checkpoint_file_size(&mut self, version: ManifestVersion, size: u64) { self.manifest_size_map - .insert(checkpoint_file(version), size); + .insert(FileKey::Checkpoint(version), size); } } @@ -705,12 +710,12 @@ mod tests { let name = file_name( "data/greptime/public/1054/1054_0000000000/manifest/00000000000000000007.json", ); - assert_eq!(name, "00000000000000000007.json"); + assert_eq!(name.unwrap(), "00000000000000000007.json"); let name = file_name( "/data/greptime/public/1054/1054_0000000000/manifest/00000000000000000007.checkpoint", ); - assert_eq!(name, "00000000000000000007.checkpoint"); + assert_eq!(name.unwrap(), "00000000000000000007.checkpoint"); let version = file_version("00000000000000000007.checkpoint"); assert_eq!(version, 7); @@ -744,13 +749,13 @@ mod tests { assert_eq!(log_store.checkpoint_file_size(5), Some(23)); // manifest files size - assert_eq!(log_store.get_total_manifest_size(), 63); + assert_eq!(log_store.total_manifest_size(), 63); // delete 3 manifest files assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3); // manifest files size after delete - assert_eq!(log_store.get_total_manifest_size(), 39); + assert_eq!(log_store.total_manifest_size(), 39); // delete all manifest files assert_eq!( @@ -761,7 +766,7 @@ mod tests { 3 ); - assert_eq!(log_store.get_total_manifest_size(), 0); + assert_eq!(log_store.total_manifest_size(), 0); } #[tokio::test] @@ -782,13 +787,13 @@ mod tests { .unwrap(); // manifest files size - assert_eq!(log_store.get_total_manifest_size(), 181); + assert_eq!(log_store.total_manifest_size(), 181); // delete 3 manifest files assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3); // manifest files size after delete - assert_eq!(log_store.get_total_manifest_size(), 97); + assert_eq!(log_store.total_manifest_size(), 97); // delete all manifest files assert_eq!( @@ -799,6 +804,6 @@ mod tests { 3 ); - assert_eq!(log_store.get_total_manifest_size(), 0); + assert_eq!(log_store.total_manifest_size(), 0); } }