From 1315df4f35b8d47c147dc85f4ec881e1f43d9cba Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Wed, 11 Oct 2023 12:06:26 +0800 Subject: [PATCH 01/11] feat: get manifest file size --- src/mito2/src/manifest/storage.rs | 51 +++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index a0f7dbf9714e..69c16ffc9794 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -424,6 +424,32 @@ impl ManifestObjectStore { pub async fn read_file(&self, path: &str) -> Result> { self.object_store.read(path).await.context(OpenDalSnafu) } + + /// Get the total size(Byte) of all manifest files. + pub async fn get_total_manifest_size(&self) -> Result { + let entries = self + .get_paths(|entry| { + let file_name = entry.name(); + if is_delta_file(file_name) || is_checkpoint_file(file_name) { + return Some(entry); + } + None + }) + .await?; + let mut size = 0; + for entry in entries { + let bytes = self + .object_store + .read(entry.path()) + .await + .context(OpenDalSnafu)?; + size += bytes.len() as u64; + } + + // last checkpoint file only contains size ,version and checksum, + // which is very tiny, maybe we don't need add it. + Ok(size) + } } #[derive(Serialize, Deserialize, Debug)] @@ -600,4 +626,29 @@ mod tests { let mut it = log_store.scan(0, 10).await.unwrap(); assert!(it.next_log().await.unwrap().is_none()); } + + #[tokio::test] + async fn test_get_manifest_files_size() { + let mut log_store = new_test_manifest_store(); + // write manifest files + log_store.compress_type = CompressionType::Uncompressed; + for v in 0..5 { + log_store + .save(v, format!("hello, {v}").as_bytes()) + .await + .unwrap(); + } + log_store + .save_checkpoint(5, "checkpoint_uncompressed".as_bytes()) + .await + .unwrap(); + // get manifest files size + assert_eq!(log_store.get_total_manifest_size().await.unwrap(), 63); + + // delete some manifest files + assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3); + + // get manifest files size after delete + assert_eq!(log_store.get_total_manifest_size().await.unwrap(), 39); + } } From 9686132a113a3b1fa44b3c1aa937a2e8d274fd71 Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Thu, 12 Oct 2023 10:39:02 +0800 Subject: [PATCH 02/11] feat: manifest size statistics --- src/mito2/src/manifest/manager.rs | 76 +++++++- src/mito2/src/manifest/storage.rs | 213 +++++++++++++++------ src/mito2/src/manifest/tests/checkpoint.rs | 2 +- 3 files changed, 224 insertions(+), 67 deletions(-) diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index e9c85eea3958..975187dd8eb4 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -154,6 +154,12 @@ impl RegionManifestManager { let inner = self.inner.read().await; inner.store.clone() } + + /// Returns total manifest size. + pub async fn manifest_size(&self) -> u64 { + let inner = self.inner.read().await; + inner.manifest_size() + } } #[cfg(test)] @@ -186,7 +192,7 @@ impl RegionManifestManagerInner { /// Creates a new manifest. async fn new(metadata: RegionMetadataRef, options: RegionManifestOptions) -> Result { // construct storage - let store = ManifestObjectStore::new( + let mut store = ManifestObjectStore::new( &options.manifest_dir, options.object_store.clone(), options.compress_type, @@ -232,7 +238,7 @@ impl RegionManifestManagerInner { /// Returns `Ok(None)` if no such manifest. async fn open(options: RegionManifestOptions) -> Result> { // construct storage - let store = ManifestObjectStore::new( + let mut store = ManifestObjectStore::new( &options.manifest_dir, options.object_store.clone(), options.compress_type, @@ -241,7 +247,7 @@ impl RegionManifestManagerInner { // recover from storage // construct manifest builder let mut version = MIN_VERSION; - let checkpoint = Self::last_checkpoint(&store).await?; + let checkpoint = Self::last_checkpoint(&mut store).await?; let last_checkpoint_version = checkpoint .as_ref() .map(|checkpoint| checkpoint.last_version) @@ -251,6 +257,10 @@ impl RegionManifestManagerInner { "Recover region manifest {} from checkpoint version {}", options.manifest_dir, checkpoint.last_version ); + // set manifest size before last checkpoint + store + .set_manifest_size_until(last_checkpoint_version) + .await?; version = version.max(checkpoint.last_version + 1); RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint) } else { @@ -265,6 +275,8 @@ impl RegionManifestManagerInner { let mut action_iter = store.scan(version, MAX_VERSION).await?; while let Some((manifest_version, raw_action_list)) = action_iter.next_log().await? { let action_list = RegionMetaActionList::decode(&raw_action_list)?; + // set manifest size after last checkpoint + store.set_delta_file_size(manifest_version, raw_action_list.len() as u64); for action in action_list.actions { match action { RegionMetaAction::Change(action) => { @@ -343,6 +355,11 @@ impl RegionManifestManagerInner { Ok(version) } + + /// Returns total manifest size. + pub(crate) fn manifest_size(&self) -> u64 { + self.store.get_manifest_size() + } } impl RegionManifestManagerInner { @@ -369,8 +386,8 @@ impl RegionManifestManagerInner { } /// Make a new checkpoint. Return the fresh one if there are some actions to compact. - async fn do_checkpoint(&self) -> Result> { - let last_checkpoint = Self::last_checkpoint(&self.store).await?; + async fn do_checkpoint(&mut self) -> Result> { + let last_checkpoint = Self::last_checkpoint(&mut self.store).await?; let current_version = self.last_version; let (start_version, mut manifest_builder) = if let Some(checkpoint) = last_checkpoint { @@ -439,9 +456,9 @@ impl RegionManifestManagerInner { Ok(Some(checkpoint)) } - /// Fetch the last [RegionCheckpoint] from storage. + /// Fetch the last Checkpoint size and [RegionCheckpoint] from storage. pub(crate) async fn last_checkpoint( - store: &ManifestObjectStore, + store: &mut ManifestObjectStore, ) -> Result> { let last_checkpoint = store.load_last_checkpoint().await?; @@ -546,4 +563,49 @@ mod test { .unwrap(); manager.validate_manifest(&new_metadata, 1).await; } + + #[tokio::test] + async fn test_manifest_size() { + let metadata = Arc::new(basic_region_metadata()); + let env = TestEnv::new(); + let manager = env + .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone())) + .await + .unwrap() + .unwrap(); + + let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone()); + new_metadata_builder.push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false), + semantic_type: SemanticType::Field, + column_id: 252, + }); + let new_metadata = Arc::new(new_metadata_builder.build().unwrap()); + + let action_list = + RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange { + metadata: new_metadata.clone(), + })); + + let current_version = manager.update(action_list).await.unwrap(); + assert_eq!(current_version, 1); + manager.validate_manifest(&new_metadata, 1).await; + + // get manifest size + let manifest_size = manager.manifest_size().await; + assert_eq!(manifest_size, 1557); + + // Reopen the manager. + manager.stop().await.unwrap(); + let manager = env + .create_manifest_manager(CompressionType::Uncompressed, 10, None) + .await + .unwrap() + .unwrap(); + manager.validate_manifest(&new_metadata, 1).await; + + // get manifest size again + let manifest_size = manager.manifest_size().await; + assert_eq!(manifest_size, 1557); + } } diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index 69c16ffc9794..3c45490f23cc 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -134,6 +134,9 @@ pub struct ManifestObjectStore { object_store: ObjectStore, compress_type: CompressionType, path: String, + /// Stores the size of each manifest file. + /// K is the path of the manifest file, V is the size. + manifest_size_map: HashMap, } impl ManifestObjectStore { @@ -142,6 +145,7 @@ impl ManifestObjectStore { object_store, compress_type, path: util::normalize_dir(path), + manifest_size_map: HashMap::new(), } } @@ -213,7 +217,7 @@ impl ManifestObjectStore { } pub async fn delete_until( - &self, + &mut self, end: ManifestVersion, keep_last_checkpoint: bool, ) -> Result { @@ -277,6 +281,12 @@ impl ManifestObjectStore { paths, ); + // delete manifest size from paths + paths.iter().for_each(|path| { + let path = format!("{}{}", self.path, path); + self.manifest_size_map.remove(&path); + }); + self.object_store .remove(paths) .await @@ -285,7 +295,7 @@ impl ManifestObjectStore { Ok(ret) } - pub async fn save(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> { + pub async fn save(&mut self, version: ManifestVersion, bytes: &[u8]) -> Result<()> { let path = self.delta_file_path(version); debug!("Save log to manifest storage, version: {}", version); let data = self @@ -296,13 +306,14 @@ impl ManifestObjectStore { compress_type: self.compress_type, path: &path, })?; + self.set_manifest_size_by_path(&path, data.len() as u64); self.object_store .write(&path, data) .await .context(OpenDalSnafu) } - pub async fn save_checkpoint(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> { + pub async fn save_checkpoint(&mut self, version: ManifestVersion, bytes: &[u8]) -> Result<()> { let path = self.checkpoint_file_path(version); let data = self .compress_type @@ -312,6 +323,7 @@ impl ManifestObjectStore { compress_type: self.compress_type, path: &path, })?; + self.set_manifest_size_by_path(&path, data.len() as u64); self.object_store .write(&path, data) .await @@ -342,63 +354,66 @@ impl ManifestObjectStore { } pub async fn load_checkpoint( - &self, + &mut self, version: ManifestVersion, ) -> Result)>> { let path = self.checkpoint_file_path(version); // Due to backward compatibility, it is possible that the user's checkpoint not compressed, // so if we don't find file by compressed type. fall back to checkpoint not compressed find again. - let checkpoint_data = - match self.object_store.read(&path).await { - Ok(checkpoint) => { - let decompress_data = self.compress_type.decode(checkpoint).await.context( - DecompressObjectSnafu { + let checkpoint_data = match self.object_store.read(&path).await { + Ok(checkpoint) => { + let decompress_data = + self.compress_type + .decode(checkpoint) + .await + .context(DecompressObjectSnafu { compress_type: self.compress_type, - path, - }, - )?; - Ok(Some(decompress_data)) - } - Err(e) => { - if e.kind() == ErrorKind::NotFound { - if self.compress_type != FALL_BACK_COMPRESS_TYPE { - let fall_back_path = gen_path( - &self.path, - &checkpoint_file(version), - FALL_BACK_COMPRESS_TYPE, - ); - debug!( - "Failed to load checkpoint from path: {}, fall back to path: {}", - path, fall_back_path - ); - match self.object_store.read(&fall_back_path).await { - Ok(checkpoint) => { - let decompress_data = FALL_BACK_COMPRESS_TYPE - .decode(checkpoint) - .await - .context(DecompressObjectSnafu { - compress_type: FALL_BACK_COMPRESS_TYPE, - path, - })?; - Ok(Some(decompress_data)) - } - Err(e) if e.kind() == ErrorKind::NotFound => Ok(None), - Err(e) => Err(e).context(OpenDalSnafu), + path: path.clone(), + })?; + self.set_manifest_size_by_path(&path, decompress_data.len() as u64); + Ok(Some(decompress_data)) + } + Err(e) => { + if e.kind() == ErrorKind::NotFound { + if self.compress_type != FALL_BACK_COMPRESS_TYPE { + let fall_back_path = gen_path( + &self.path, + &checkpoint_file(version), + FALL_BACK_COMPRESS_TYPE, + ); + debug!( + "Failed to load checkpoint from path: {}, fall back to path: {}", + path, fall_back_path + ); + match self.object_store.read(&fall_back_path).await { + Ok(checkpoint) => { + let decompress_data = FALL_BACK_COMPRESS_TYPE + .decode(checkpoint) + .await + .context(DecompressObjectSnafu { + compress_type: FALL_BACK_COMPRESS_TYPE, + path: path.clone(), + })?; + self.set_manifest_size_by_path(&path, decompress_data.len() as u64); + Ok(Some(decompress_data)) } - } else { - Ok(None) + Err(e) if e.kind() == ErrorKind::NotFound => Ok(None), + Err(e) => Err(e).context(OpenDalSnafu), } } else { - Err(e).context(OpenDalSnafu) + Ok(None) } + } else { + Err(e).context(OpenDalSnafu) } - }?; + } + }?; Ok(checkpoint_data.map(|data| (version, data))) } /// Load the latest checkpoint. /// Return manifest version and the raw [RegionCheckpoint](crate::manifest::action::RegionCheckpoint) content if any - pub async fn load_last_checkpoint(&self) -> Result)>> { + pub async fn load_last_checkpoint(&mut self) -> Result)>> { let last_checkpoint_path = self.last_checkpoint_path(); let last_checkpoint_data = match self.object_store.read(&last_checkpoint_path).await { Ok(data) => data, @@ -411,6 +426,8 @@ impl ManifestObjectStore { }; let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data)?; + // set last checkpoint size + self.set_manifest_size_by_path(&last_checkpoint_path, last_checkpoint_data.len() as u64); debug!( "Load checkpoint in path: {}, metadata: {:?}", @@ -425,30 +442,58 @@ impl ManifestObjectStore { self.object_store.read(path).await.context(OpenDalSnafu) } + /// Compute the size(Byte) in manifest size map. + pub fn get_manifest_size(&self) -> u64 { + self.manifest_size_map.values().sum() + } + /// Get the total size(Byte) of all manifest files. - pub async fn get_total_manifest_size(&self) -> Result { + pub async fn set_total_manifest_size(&mut self) -> Result { + self.set_manifest_size_until(ManifestVersion::MAX).await + } + + /// Get the total size(Byte) of exist manifest files before end version(not included). + /// If end == ManifestVersion::MIN, return 0. + pub async fn set_manifest_size_until(&mut self, end: ManifestVersion) -> Result { let entries = self .get_paths(|entry| { let file_name = entry.name(); if is_delta_file(file_name) || is_checkpoint_file(file_name) { - return Some(entry); + let file_version = file_version(file_name); + if file_version < end { + return Some(entry); + } } None }) .await?; - let mut size = 0; for entry in entries { let bytes = self .object_store .read(entry.path()) .await .context(OpenDalSnafu)?; - size += bytes.len() as u64; + self.set_manifest_size_by_path(entry.path(), bytes.len() as u64); } - // last checkpoint file only contains size ,version and checksum, - // which is very tiny, maybe we don't need add it. - Ok(size) + Ok(self.get_manifest_size()) + } + + /// Set the size of the manifest file by path. + pub fn set_manifest_size_by_path(&mut self, path: &str, size: u64) { + self.manifest_size_map.insert(path.to_string(), size); + } + + /// Set the size of the delta file by delta version. + pub fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) { + self.manifest_size_map + .insert(self.delta_file_path(version), size); + } + + /// Set the size of the checkpoint file by checkpoint version. + pub fn set_checkpoint_file_size(&mut self, version: ManifestVersion, size: u64) { + self.manifest_size_map + .insert(self.checkpoint_file_path(version), size); } } @@ -515,7 +560,7 @@ mod tests { test_manifest_log_store_case(log_store).await; } - async fn test_manifest_log_store_case(log_store: ManifestObjectStore) { + async fn test_manifest_log_store_case(mut log_store: ManifestObjectStore) { for v in 0..5 { log_store .save(v, format!("hello, {v}").as_bytes()) @@ -628,9 +673,9 @@ mod tests { } #[tokio::test] - async fn test_get_manifest_files_size() { + async fn test_uncompressed_manifest_files_size() { let mut log_store = new_test_manifest_store(); - // write manifest files + // write 5 manifest files log_store.compress_type = CompressionType::Uncompressed; for v in 0..5 { log_store @@ -642,13 +687,63 @@ mod tests { .save_checkpoint(5, "checkpoint_uncompressed".as_bytes()) .await .unwrap(); - // get manifest files size - assert_eq!(log_store.get_total_manifest_size().await.unwrap(), 63); - // delete some manifest files + // manifest files size + assert_eq!(log_store.get_manifest_size(), 63); + + // delete 3 manifest files assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3); - // get manifest files size after delete - assert_eq!(log_store.get_total_manifest_size().await.unwrap(), 39); + // manifest files size after delete + assert_eq!(log_store.get_manifest_size(), 39); + + // delete all manifest files + assert_eq!( + log_store + .delete_until(ManifestVersion::MAX, false) + .await + .unwrap(), + 3 + ); + + assert_eq!(log_store.get_manifest_size(), 0); + } + + #[tokio::test] + async fn test_compressed_manifest_files_size() { + let mut log_store = new_test_manifest_store(); + // Test with compressed manifest files + log_store.compress_type = CompressionType::Gzip; + // write 5 manifest files + for v in 0..5 { + log_store + .save(v, format!("hello, {v}").as_bytes()) + .await + .unwrap(); + } + log_store + .save_checkpoint(5, "checkpoint_compressed".as_bytes()) + .await + .unwrap(); + + // manifest files size + assert_eq!(log_store.get_manifest_size(), 181); + + // delete 3 manifest files + assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3); + + // manifest files size after delete + assert_eq!(log_store.get_manifest_size(), 97); + + // delete all manifest files + assert_eq!( + log_store + .delete_until(ManifestVersion::MAX, false) + .await + .unwrap(), + 3 + ); + + assert_eq!(log_store.get_manifest_size(), 0); } } diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index 68c7063e1e63..c28f6cd6d598 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -202,7 +202,7 @@ async fn generate_checkpoint_with_compression_types( manager.update(action).await.unwrap(); } - RegionManifestManagerInner::last_checkpoint(&manager.store().await) + RegionManifestManagerInner::last_checkpoint(&mut manager.store().await) .await .unwrap() .unwrap() From c9c87dee3d8292c3042c709ce319253ad0c1519f Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Thu, 12 Oct 2023 15:08:10 +0800 Subject: [PATCH 03/11] refactor: manifest map key --- src/mito2/src/manifest/manager.rs | 7 +++- src/mito2/src/manifest/storage.rs | 58 +++++++++++++++++++++++++------ 2 files changed, 54 insertions(+), 11 deletions(-) diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index 975187dd8eb4..d3631efecf18 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -115,6 +115,7 @@ impl RegionManifestManager { /// Construct a region's manifest and persist it. pub async fn new(metadata: RegionMetadataRef, options: RegionManifestOptions) -> Result { let inner = RegionManifestManagerInner::new(metadata, options).await?; + debug!("new: total manifest size: {}", inner.manifest_size()); Ok(Self { inner: RwLock::new(inner), }) @@ -123,6 +124,7 @@ impl RegionManifestManager { /// Open an existing manifest. pub async fn open(options: RegionManifestOptions) -> Result> { if let Some(inner) = RegionManifestManagerInner::open(options).await? { + debug!("open: total manifest size: {}", inner.manifest_size()); Ok(Some(Self { inner: RwLock::new(inner), })) @@ -134,13 +136,16 @@ impl RegionManifestManager { /// Stop background tasks gracefully. pub async fn stop(&self) -> Result<()> { let mut inner = self.inner.write().await; + debug!("stop: total manifest size: {}", inner.manifest_size()); inner.stop().await } /// Update the manifest. Return the current manifest version number. pub async fn update(&self, action_list: RegionMetaActionList) -> Result { let mut inner = self.inner.write().await; - inner.update(action_list).await + let res = inner.update(action_list).await; + debug!("update: total manifest size: {}", inner.manifest_size()); + res } /// Retrieve the current [RegionManifest]. diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index 3c45490f23cc..5dbd0d6357d0 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -80,6 +80,19 @@ pub fn file_version(path: &str) -> ManifestVersion { s.parse().unwrap_or_else(|_| panic!("Invalid file: {path}")) } +/// Return's the file name from path +/// Just use for .json and .checkpoint file +/// ### Panics +/// Panics if the file path is not a valid delta or checkpoint file. +#[inline] +pub fn file_name(path: &str) -> String { + let name = path.rsplit('/').next().unwrap_or("").to_string(); + if !is_checkpoint_file(&name) && !is_delta_file(&name) { + panic!("Invalid file: {path}") + } + name +} + /// Return's the file compress algorithm by file extension. /// /// for example file @@ -188,6 +201,7 @@ impl ManifestObjectStore { .context(OpenDalSnafu) } + /// Scan the manifest files in the range of [start, end) and return the iterator. pub async fn scan( &self, start: ManifestVersion, @@ -216,6 +230,9 @@ impl ManifestObjectStore { }) } + /// Delete all manifest files in the range of [start, end). + /// If keep_last_checkpoint is true, the last checkpoint file will be kept. + /// Return the number of deleted files. pub async fn delete_until( &mut self, end: ManifestVersion, @@ -273,7 +290,7 @@ impl ManifestObjectStore { let ret = paths.len(); debug!( - "Deleting {} logs from manifest storage path {} until {}, checkpoint: {:?}, paths: {:?}", + "Deleting {} logs from manifest storage path {} until {}, checkpoint_version: {:?}, paths: {:?}", ret, self.path, end, @@ -281,10 +298,10 @@ impl ManifestObjectStore { paths, ); - // delete manifest size from paths + // delete the manifest'size in paths paths.iter().for_each(|path| { - let path = format!("{}{}", self.path, path); - self.manifest_size_map.remove(&path); + let name = file_name(path); + self.manifest_size_map.remove(&name); }); self.object_store @@ -295,6 +312,7 @@ impl ManifestObjectStore { Ok(ret) } + /// Save the delta manifest file. pub async fn save(&mut self, version: ManifestVersion, bytes: &[u8]) -> Result<()> { let path = self.delta_file_path(version); debug!("Save log to manifest storage, version: {}", version); @@ -313,6 +331,7 @@ impl ManifestObjectStore { .context(OpenDalSnafu) } + /// Save the checkpoint manifest file. pub async fn save_checkpoint(&mut self, version: ManifestVersion, bytes: &[u8]) -> Result<()> { let path = self.checkpoint_file_path(version); let data = self @@ -426,8 +445,6 @@ impl ManifestObjectStore { }; let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data)?; - // set last checkpoint size - self.set_manifest_size_by_path(&last_checkpoint_path, last_checkpoint_data.len() as u64); debug!( "Load checkpoint in path: {}, metadata: {:?}", @@ -481,19 +498,18 @@ impl ManifestObjectStore { /// Set the size of the manifest file by path. pub fn set_manifest_size_by_path(&mut self, path: &str, size: u64) { - self.manifest_size_map.insert(path.to_string(), size); + self.manifest_size_map.insert(file_name(path), size); } /// Set the size of the delta file by delta version. pub fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) { - self.manifest_size_map - .insert(self.delta_file_path(version), size); + self.manifest_size_map.insert(delta_file(version), size); } /// Set the size of the checkpoint file by checkpoint version. pub fn set_checkpoint_file_size(&mut self, version: ManifestVersion, size: u64) { self.manifest_size_map - .insert(self.checkpoint_file_path(version), size); + .insert(checkpoint_file(version), size); } } @@ -672,6 +688,28 @@ mod tests { assert!(it.next_log().await.unwrap().is_none()); } + #[tokio::test] + async fn test_file_name() { + let name = file_name( + "data/greptime/public/1054/1054_0000000000/manifest/00000000000000000007.json", + ); + assert_eq!(name, "00000000000000000007.json"); + + let name = file_name( + "/data/greptime/public/1054/1054_0000000000/manifest/00000000000000000007.checkpoint", + ); + assert_eq!(name, "00000000000000000007.checkpoint"); + + let version = file_version("00000000000000000007.checkpoint"); + assert_eq!(version, 7); + + let name = delta_file(version); + assert_eq!(name, "00000000000000000007.json"); + + let name = checkpoint_file(version); + assert_eq!(name, "00000000000000000007.checkpoint"); + } + #[tokio::test] async fn test_uncompressed_manifest_files_size() { let mut log_store = new_test_manifest_store(); From bd8989f04d8f4db3bf2f469eb4b51bbba87cd7a7 Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Thu, 12 Oct 2023 15:42:52 +0800 Subject: [PATCH 04/11] chore: comment and unit test --- src/mito2/src/manifest/manager.rs | 74 ++++++++++++++++++++++++------- src/mito2/src/manifest/storage.rs | 55 ++++++++++++++++------- src/mito2/src/test_util.rs | 9 ++++ 3 files changed, 107 insertions(+), 31 deletions(-) diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index d3631efecf18..fd2b76060ac5 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -115,7 +115,6 @@ impl RegionManifestManager { /// Construct a region's manifest and persist it. pub async fn new(metadata: RegionMetadataRef, options: RegionManifestOptions) -> Result { let inner = RegionManifestManagerInner::new(metadata, options).await?; - debug!("new: total manifest size: {}", inner.manifest_size()); Ok(Self { inner: RwLock::new(inner), }) @@ -124,7 +123,6 @@ impl RegionManifestManager { /// Open an existing manifest. pub async fn open(options: RegionManifestOptions) -> Result> { if let Some(inner) = RegionManifestManagerInner::open(options).await? { - debug!("open: total manifest size: {}", inner.manifest_size()); Ok(Some(Self { inner: RwLock::new(inner), })) @@ -136,16 +134,13 @@ impl RegionManifestManager { /// Stop background tasks gracefully. pub async fn stop(&self) -> Result<()> { let mut inner = self.inner.write().await; - debug!("stop: total manifest size: {}", inner.manifest_size()); inner.stop().await } /// Update the manifest. Return the current manifest version number. pub async fn update(&self, action_list: RegionMetaActionList) -> Result { let mut inner = self.inner.write().await; - let res = inner.update(action_list).await; - debug!("update: total manifest size: {}", inner.manifest_size()); - res + inner.update(action_list).await } /// Retrieve the current [RegionManifest]. @@ -163,7 +158,7 @@ impl RegionManifestManager { /// Returns total manifest size. pub async fn manifest_size(&self) -> u64 { let inner = self.inner.read().await; - inner.manifest_size() + inner.total_manifest_size() } } @@ -329,6 +324,7 @@ impl RegionManifestManagerInner { Ok(()) } + /// Update the manifest. Return the current manifest version number. async fn update(&mut self, action_list: RegionMetaActionList) -> Result { let version = self.increase_version(); self.store.save(version, &action_list.encode()?).await?; @@ -362,8 +358,8 @@ impl RegionManifestManagerInner { } /// Returns total manifest size. - pub(crate) fn manifest_size(&self) -> u64 { - self.store.get_manifest_size() + pub(crate) fn total_manifest_size(&self) -> u64 { + self.store.get_total_manifest_size() } } @@ -461,7 +457,7 @@ impl RegionManifestManagerInner { Ok(Some(checkpoint)) } - /// Fetch the last Checkpoint size and [RegionCheckpoint] from storage. + /// Fetch the last [RegionCheckpoint] from storage. pub(crate) async fn last_checkpoint( store: &mut ManifestObjectStore, ) -> Result> { @@ -478,14 +474,16 @@ impl RegionManifestManagerInner { #[cfg(test)] mod test { + use api::v1::SemanticType; use common_datasource::compression::CompressionType; + use common_test_util::temp_dir::create_temp_dir; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; use super::*; - use crate::manifest::action::RegionChange; + use crate::manifest::action::{RegionChange, RegionEdit}; use crate::manifest::tests::utils::basic_region_metadata; use crate::test_util::TestEnv; @@ -569,10 +567,36 @@ mod test { manager.validate_manifest(&new_metadata, 1).await; } + /// Just for test, the manifest size is the sum of all checkpoint files and delta files, + /// refer to wal_dir_usage in src/store-api/src/logstore.rs. + async fn manifest_dir_usage(path: &str) -> u64 { + let mut size = 0; + let mut read_dir = tokio::fs::read_dir(path).await.unwrap(); + while let Ok(dir_entry) = read_dir.next_entry().await { + let Some(entry) = dir_entry else { + break; + }; + if entry.file_type().await.unwrap().is_file() { + let file_name = entry.file_name().into_string().unwrap(); + if file_name.contains(".checkpoint") || file_name.contains(".json") { + let file_size = entry.metadata().await.unwrap().len() as usize; + debug!("File: {file_name:?}, size: {file_size}"); + size += file_size; + } + } + } + size as u64 + } + #[tokio::test] async fn test_manifest_size() { let metadata = Arc::new(basic_region_metadata()); - let env = TestEnv::new(); + let data_home = create_temp_dir(""); + let data_home_path = data_home.path().to_str().unwrap().to_string(); + let env = TestEnv::with_data_home(data_home); + + let manifest_dir = format!("{}/manifest", data_home_path); + let manager = env .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone())) .await @@ -598,7 +622,27 @@ mod test { // get manifest size let manifest_size = manager.manifest_size().await; - assert_eq!(manifest_size, 1557); + assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await); + + // update 10 times nop_action to trigger checkpoint + for _ in 0..10 { + manager + .update(RegionMetaActionList::new(vec![RegionMetaAction::Edit( + RegionEdit { + files_to_add: vec![], + files_to_remove: vec![], + compaction_time_window: None, + flushed_entry_id: None, + flushed_sequence: None, + }, + )])) + .await + .unwrap(); + } + + // check manifest size again + let manifest_size = manager.manifest_size().await; + assert_eq!(manifest_size, 1453); // Reopen the manager. manager.stop().await.unwrap(); @@ -607,10 +651,10 @@ mod test { .await .unwrap() .unwrap(); - manager.validate_manifest(&new_metadata, 1).await; + manager.validate_manifest(&new_metadata, 11).await; // get manifest size again let manifest_size = manager.manifest_size().await; - assert_eq!(manifest_size, 1557); + assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await); } } diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index 5dbd0d6357d0..37fe41343ea0 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -148,7 +148,8 @@ pub struct ManifestObjectStore { compress_type: CompressionType, path: String, /// Stores the size of each manifest file. - /// K is the path of the manifest file, V is the size. + /// K is the manifest file name(such as "0000000000000000006.checkpoint" + /// or "0000000000000000006.json"), and V is the file size. manifest_size_map: HashMap, } @@ -230,9 +231,10 @@ impl ManifestObjectStore { }) } - /// Delete all manifest files in the range of [start, end). + /// Delete manifest files that version < end. /// If keep_last_checkpoint is true, the last checkpoint file will be kept. - /// Return the number of deleted files. + /// ### Return + /// The number of deleted files. pub async fn delete_until( &mut self, end: ManifestVersion, @@ -459,25 +461,41 @@ impl ManifestObjectStore { self.object_store.read(path).await.context(OpenDalSnafu) } + /// Get the size(Byte) of the delta file by delta version. + /// If the delta file does not exist, return None. + pub fn delta_file_size(&self, version: ManifestVersion) -> Option { + self.manifest_size_map.get(&delta_file(version)).copied() + } + + /// Get the size(Byte) of the checkpoint file by checkpoint version. + /// If the checkpoint file does not exist, return None. + pub fn checkpoint_file_size(&self, version: ManifestVersion) -> Option { + self.manifest_size_map + .get(&checkpoint_file(version)) + .copied() + } + /// Compute the size(Byte) in manifest size map. - pub fn get_manifest_size(&self) -> u64 { + pub fn get_total_manifest_size(&self) -> u64 { self.manifest_size_map.values().sum() } /// Get the total size(Byte) of all manifest files. pub async fn set_total_manifest_size(&mut self) -> Result { - self.set_manifest_size_until(ManifestVersion::MAX).await + self.set_manifest_size_until(ManifestVersion::MAX).await?; + Ok(self.get_total_manifest_size()) } - /// Get the total size(Byte) of exist manifest files before end version(not included). - /// If end == ManifestVersion::MIN, return 0. - pub async fn set_manifest_size_until(&mut self, end: ManifestVersion) -> Result { + /// Count the total size(Byte) of exist manifest files which satisfy: + /// delta file version <= end and checkpoint file version < end. + /// Notice: this function will read files from object store. + pub async fn set_manifest_size_until(&mut self, end: ManifestVersion) -> Result<()> { let entries = self .get_paths(|entry| { let file_name = entry.name(); if is_delta_file(file_name) || is_checkpoint_file(file_name) { let file_version = file_version(file_name); - if file_version < end { + if file_version < end || (file_version == end && is_delta_file(file_name)) { return Some(entry); } } @@ -493,7 +511,7 @@ impl ManifestObjectStore { self.set_manifest_size_by_path(entry.path(), bytes.len() as u64); } - Ok(self.get_manifest_size()) + Ok(()) } /// Set the size of the manifest file by path. @@ -725,15 +743,20 @@ mod tests { .save_checkpoint(5, "checkpoint_uncompressed".as_bytes()) .await .unwrap(); + // single delta file size + assert_eq!(log_store.delta_file_size(0), Some(8)); + + // single checkpoint file size + assert_eq!(log_store.checkpoint_file_size(5), Some(23)); // manifest files size - assert_eq!(log_store.get_manifest_size(), 63); + assert_eq!(log_store.get_total_manifest_size(), 63); // delete 3 manifest files assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3); // manifest files size after delete - assert_eq!(log_store.get_manifest_size(), 39); + assert_eq!(log_store.get_total_manifest_size(), 39); // delete all manifest files assert_eq!( @@ -744,7 +767,7 @@ mod tests { 3 ); - assert_eq!(log_store.get_manifest_size(), 0); + assert_eq!(log_store.get_total_manifest_size(), 0); } #[tokio::test] @@ -765,13 +788,13 @@ mod tests { .unwrap(); // manifest files size - assert_eq!(log_store.get_manifest_size(), 181); + assert_eq!(log_store.get_total_manifest_size(), 181); // delete 3 manifest files assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3); // manifest files size after delete - assert_eq!(log_store.get_manifest_size(), 97); + assert_eq!(log_store.get_total_manifest_size(), 97); // delete all manifest files assert_eq!( @@ -782,6 +805,6 @@ mod tests { 3 ); - assert_eq!(log_store.get_manifest_size(), 0); + assert_eq!(log_store.get_total_manifest_size(), 0); } } diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index c9621249212c..d7cb13e5121b 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -99,6 +99,15 @@ impl TestEnv { } } + /// Returns a new env with specific `data_home` for test. + pub fn with_data_home(data_home: TempDir) -> TestEnv { + TestEnv { + data_home, + logstore: None, + object_store: None, + } + } + pub fn get_logstore(&self) -> Option> { self.logstore.clone() } From 1dfb019269c567512a6111d2089d3c7d2809b5ca Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Mon, 16 Oct 2023 13:16:52 +0800 Subject: [PATCH 05/11] chore: remove no-use function --- src/mito2/src/manifest/storage.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index 37fe41343ea0..2daf5278c345 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -480,12 +480,6 @@ impl ManifestObjectStore { self.manifest_size_map.values().sum() } - /// Get the total size(Byte) of all manifest files. - pub async fn set_total_manifest_size(&mut self) -> Result { - self.set_manifest_size_until(ManifestVersion::MAX).await?; - Ok(self.get_total_manifest_size()) - } - /// Count the total size(Byte) of exist manifest files which satisfy: /// delta file version <= end and checkpoint file version < end. /// Notice: this function will read files from object store. From 1fe889b2c7999813a2dbe7010312c4810d284fe5 Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Mon, 16 Oct 2023 15:39:41 +0800 Subject: [PATCH 06/11] chore: change style --- src/mito2/src/manifest/manager.rs | 15 ++++++--------- src/mito2/src/manifest/storage.rs | 1 + 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index fd2b76060ac5..81e3128b7fcd 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -246,6 +246,7 @@ impl RegionManifestManagerInner { // recover from storage // construct manifest builder + // calculate the manifest size from the latest checkpoint let mut version = MIN_VERSION; let checkpoint = Self::last_checkpoint(&mut store).await?; let last_checkpoint_version = checkpoint @@ -257,10 +258,6 @@ impl RegionManifestManagerInner { "Recover region manifest {} from checkpoint version {}", options.manifest_dir, checkpoint.last_version ); - // set manifest size before last checkpoint - store - .set_manifest_size_until(last_checkpoint_version) - .await?; version = version.max(checkpoint.last_version + 1); RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint) } else { @@ -567,8 +564,7 @@ mod test { manager.validate_manifest(&new_metadata, 1).await; } - /// Just for test, the manifest size is the sum of all checkpoint files and delta files, - /// refer to wal_dir_usage in src/store-api/src/logstore.rs. + /// Just for test, refer to wal_dir_usage in src/store-api/src/logstore.rs. async fn manifest_dir_usage(path: &str) -> u64 { let mut size = 0; let mut read_dir = tokio::fs::read_dir(path).await.unwrap(); @@ -642,9 +638,10 @@ mod test { // check manifest size again let manifest_size = manager.manifest_size().await; - assert_eq!(manifest_size, 1453); + assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await); - // Reopen the manager. + // Reopen the manager, + // we just calculate the size from the latest checkpoint file manager.stop().await.unwrap(); let manager = env .create_manifest_manager(CompressionType::Uncompressed, 10, None) @@ -655,6 +652,6 @@ mod test { // get manifest size again let manifest_size = manager.manifest_size().await; - assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await); + assert_eq!(manifest_size, 1312); } } diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index 2daf5278c345..7eeccdf57d32 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -391,6 +391,7 @@ impl ManifestObjectStore { compress_type: self.compress_type, path: path.clone(), })?; + // set the checkpoint size self.set_manifest_size_by_path(&path, decompress_data.len() as u64); Ok(Some(decompress_data)) } From 49583f3db6acdb3e9eb725ad386148ba19048f1c Mon Sep 17 00:00:00 2001 From: Wei <47681251+QuenKar@users.noreply.github.com> Date: Mon, 16 Oct 2023 17:57:30 +0800 Subject: [PATCH 07/11] Apply suggestions from code review Co-authored-by: Yingwen --- src/mito2/src/manifest/storage.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index 7eeccdf57d32..5179df743994 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -84,7 +84,6 @@ pub fn file_version(path: &str) -> ManifestVersion { /// Just use for .json and .checkpoint file /// ### Panics /// Panics if the file path is not a valid delta or checkpoint file. -#[inline] pub fn file_name(path: &str) -> String { let name = path.rsplit('/').next().unwrap_or("").to_string(); if !is_checkpoint_file(&name) && !is_delta_file(&name) { @@ -301,10 +300,10 @@ impl ManifestObjectStore { ); // delete the manifest'size in paths - paths.iter().for_each(|path| { + for path in &paths { let name = file_name(path); self.manifest_size_map.remove(&name); - }); + } self.object_store .remove(paths) From 33ae48542b35edc4c36009a28ea9493023ea04d4 Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Mon, 16 Oct 2023 22:13:58 +0800 Subject: [PATCH 08/11] chore: cr comment --- src/mito2/src/manifest/manager.rs | 2 +- src/mito2/src/manifest/storage.rs | 205 +++++++++++++++--------------- 2 files changed, 106 insertions(+), 101 deletions(-) diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index 81e3128b7fcd..653cd7511fc8 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -356,7 +356,7 @@ impl RegionManifestManagerInner { /// Returns total manifest size. pub(crate) fn total_manifest_size(&self) -> u64 { - self.store.get_total_manifest_size() + self.store.total_manifest_size() } } diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index 5179df743994..73e507978906 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -80,16 +80,14 @@ pub fn file_version(path: &str) -> ManifestVersion { s.parse().unwrap_or_else(|_| panic!("Invalid file: {path}")) } -/// Return's the file name from path -/// Just use for .json and .checkpoint file -/// ### Panics -/// Panics if the file path is not a valid delta or checkpoint file. -pub fn file_name(path: &str) -> String { +/// Return the file name from path. +/// Just for (`.json`) and (`.checkpoint`) file, other file will return None. +pub fn file_name(path: &str) -> Option { let name = path.rsplit('/').next().unwrap_or("").to_string(); if !is_checkpoint_file(&name) && !is_delta_file(&name) { - panic!("Invalid file: {path}") + return None; } - name + Some(name) } /// Return's the file compress algorithm by file extension. @@ -141,15 +139,36 @@ impl ObjectStoreLogIterator { } } +/// Key to identify a manifest file. +#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] +enum FileKey { + /// A delta file (`.json`). + Delta(ManifestVersion), + /// A checkpoint file (`.checkpoint`). + Checkpoint(ManifestVersion), +} + +impl FileKey { + /// New a FileKey, if the file name is not a valid delta + /// or checkpoint file, return None. + pub fn new(file_name: &str) -> Option { + if is_delta_file(file_name) { + Some(FileKey::Delta(file_version(file_name))) + } else if is_checkpoint_file(file_name) { + Some(FileKey::Checkpoint(file_version(file_name))) + } else { + None + } + } +} + #[derive(Clone, Debug)] pub struct ManifestObjectStore { object_store: ObjectStore, compress_type: CompressionType, path: String, /// Stores the size of each manifest file. - /// K is the manifest file name(such as "0000000000000000006.checkpoint" - /// or "0000000000000000006.json"), and V is the file size. - manifest_size_map: HashMap, + manifest_size_map: HashMap, } impl ManifestObjectStore { @@ -301,8 +320,11 @@ impl ManifestObjectStore { // delete the manifest'size in paths for path in &paths { - let name = file_name(path); - self.manifest_size_map.remove(&name); + if let Some(name) = file_name(path) { + if let Some(file_key) = FileKey::new(&name) { + self.manifest_size_map.remove(&file_key); + } + } } self.object_store @@ -325,11 +347,14 @@ impl ManifestObjectStore { compress_type: self.compress_type, path: &path, })?; - self.set_manifest_size_by_path(&path, data.len() as u64); - self.object_store + let delta_size = data.len(); + let _ = self + .object_store .write(&path, data) .await - .context(OpenDalSnafu) + .context(OpenDalSnafu); + self.set_manifest_size_by_path(&path, delta_size as u64); + Ok(()) } /// Save the checkpoint manifest file. @@ -343,11 +368,12 @@ impl ManifestObjectStore { compress_type: self.compress_type, path: &path, })?; - self.set_manifest_size_by_path(&path, data.len() as u64); + let checkpoint_size = data.len(); self.object_store .write(&path, data) .await .context(OpenDalSnafu)?; + self.set_manifest_size_by_path(&path, checkpoint_size as u64); // Because last checkpoint file only contain size and version, which is tiny, so we don't compress it. let last_checkpoint_path = self.last_checkpoint_path(); @@ -380,55 +406,56 @@ impl ManifestObjectStore { let path = self.checkpoint_file_path(version); // Due to backward compatibility, it is possible that the user's checkpoint not compressed, // so if we don't find file by compressed type. fall back to checkpoint not compressed find again. - let checkpoint_data = match self.object_store.read(&path).await { - Ok(checkpoint) => { - let decompress_data = - self.compress_type - .decode(checkpoint) - .await - .context(DecompressObjectSnafu { + let checkpoint_data = + match self.object_store.read(&path).await { + Ok(checkpoint) => { + let checkpoint_size = checkpoint.len(); + let decompress_data = self.compress_type.decode(checkpoint).await.context( + DecompressObjectSnafu { compress_type: self.compress_type, path: path.clone(), - })?; - // set the checkpoint size - self.set_manifest_size_by_path(&path, decompress_data.len() as u64); - Ok(Some(decompress_data)) - } - Err(e) => { - if e.kind() == ErrorKind::NotFound { - if self.compress_type != FALL_BACK_COMPRESS_TYPE { - let fall_back_path = gen_path( - &self.path, - &checkpoint_file(version), - FALL_BACK_COMPRESS_TYPE, - ); - debug!( - "Failed to load checkpoint from path: {}, fall back to path: {}", - path, fall_back_path - ); - match self.object_store.read(&fall_back_path).await { - Ok(checkpoint) => { - let decompress_data = FALL_BACK_COMPRESS_TYPE - .decode(checkpoint) - .await - .context(DecompressObjectSnafu { - compress_type: FALL_BACK_COMPRESS_TYPE, - path: path.clone(), - })?; - self.set_manifest_size_by_path(&path, decompress_data.len() as u64); - Ok(Some(decompress_data)) + }, + )?; + // set the checkpoint size + self.set_manifest_size_by_path(&path, checkpoint_size as u64); + Ok(Some(decompress_data)) + } + Err(e) => { + if e.kind() == ErrorKind::NotFound { + if self.compress_type != FALL_BACK_COMPRESS_TYPE { + let fall_back_path = gen_path( + &self.path, + &checkpoint_file(version), + FALL_BACK_COMPRESS_TYPE, + ); + debug!( + "Failed to load checkpoint from path: {}, fall back to path: {}", + path, fall_back_path + ); + match self.object_store.read(&fall_back_path).await { + Ok(checkpoint) => { + let checkpoint_size = checkpoint.len(); + let decompress_data = FALL_BACK_COMPRESS_TYPE + .decode(checkpoint) + .await + .context(DecompressObjectSnafu { + compress_type: FALL_BACK_COMPRESS_TYPE, + path: path.clone(), + })?; + self.set_manifest_size_by_path(&path, checkpoint_size as u64); + Ok(Some(decompress_data)) + } + Err(e) if e.kind() == ErrorKind::NotFound => Ok(None), + Err(e) => Err(e).context(OpenDalSnafu), } - Err(e) if e.kind() == ErrorKind::NotFound => Ok(None), - Err(e) => Err(e).context(OpenDalSnafu), + } else { + Ok(None) } } else { - Ok(None) + Err(e).context(OpenDalSnafu) } - } else { - Err(e).context(OpenDalSnafu) } - } - }?; + }?; Ok(checkpoint_data.map(|data| (version, data))) } @@ -464,64 +491,42 @@ impl ManifestObjectStore { /// Get the size(Byte) of the delta file by delta version. /// If the delta file does not exist, return None. pub fn delta_file_size(&self, version: ManifestVersion) -> Option { - self.manifest_size_map.get(&delta_file(version)).copied() + self.manifest_size_map + .get(&FileKey::Delta(version)) + .copied() } /// Get the size(Byte) of the checkpoint file by checkpoint version. /// If the checkpoint file does not exist, return None. pub fn checkpoint_file_size(&self, version: ManifestVersion) -> Option { self.manifest_size_map - .get(&checkpoint_file(version)) + .get(&FileKey::Checkpoint(version)) .copied() } /// Compute the size(Byte) in manifest size map. - pub fn get_total_manifest_size(&self) -> u64 { + pub fn total_manifest_size(&self) -> u64 { self.manifest_size_map.values().sum() } - /// Count the total size(Byte) of exist manifest files which satisfy: - /// delta file version <= end and checkpoint file version < end. - /// Notice: this function will read files from object store. - pub async fn set_manifest_size_until(&mut self, end: ManifestVersion) -> Result<()> { - let entries = self - .get_paths(|entry| { - let file_name = entry.name(); - if is_delta_file(file_name) || is_checkpoint_file(file_name) { - let file_version = file_version(file_name); - if file_version < end || (file_version == end && is_delta_file(file_name)) { - return Some(entry); - } - } - None - }) - .await?; - for entry in entries { - let bytes = self - .object_store - .read(entry.path()) - .await - .context(OpenDalSnafu)?; - self.set_manifest_size_by_path(entry.path(), bytes.len() as u64); - } - - Ok(()) - } - /// Set the size of the manifest file by path. pub fn set_manifest_size_by_path(&mut self, path: &str, size: u64) { - self.manifest_size_map.insert(file_name(path), size); + if let Some(name) = file_name(path) { + if let Some(file_key) = FileKey::new(&name) { + self.manifest_size_map.insert(file_key, size); + } + } } /// Set the size of the delta file by delta version. pub fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) { - self.manifest_size_map.insert(delta_file(version), size); + self.manifest_size_map.insert(FileKey::Delta(version), size); } /// Set the size of the checkpoint file by checkpoint version. pub fn set_checkpoint_file_size(&mut self, version: ManifestVersion, size: u64) { self.manifest_size_map - .insert(checkpoint_file(version), size); + .insert(FileKey::Checkpoint(version), size); } } @@ -705,12 +710,12 @@ mod tests { let name = file_name( "data/greptime/public/1054/1054_0000000000/manifest/00000000000000000007.json", ); - assert_eq!(name, "00000000000000000007.json"); + assert_eq!(name.unwrap(), "00000000000000000007.json"); let name = file_name( "/data/greptime/public/1054/1054_0000000000/manifest/00000000000000000007.checkpoint", ); - assert_eq!(name, "00000000000000000007.checkpoint"); + assert_eq!(name.unwrap(), "00000000000000000007.checkpoint"); let version = file_version("00000000000000000007.checkpoint"); assert_eq!(version, 7); @@ -744,13 +749,13 @@ mod tests { assert_eq!(log_store.checkpoint_file_size(5), Some(23)); // manifest files size - assert_eq!(log_store.get_total_manifest_size(), 63); + assert_eq!(log_store.total_manifest_size(), 63); // delete 3 manifest files assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3); // manifest files size after delete - assert_eq!(log_store.get_total_manifest_size(), 39); + assert_eq!(log_store.total_manifest_size(), 39); // delete all manifest files assert_eq!( @@ -761,7 +766,7 @@ mod tests { 3 ); - assert_eq!(log_store.get_total_manifest_size(), 0); + assert_eq!(log_store.total_manifest_size(), 0); } #[tokio::test] @@ -782,13 +787,13 @@ mod tests { .unwrap(); // manifest files size - assert_eq!(log_store.get_total_manifest_size(), 181); + assert_eq!(log_store.total_manifest_size(), 181); // delete 3 manifest files assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3); // manifest files size after delete - assert_eq!(log_store.get_total_manifest_size(), 97); + assert_eq!(log_store.total_manifest_size(), 97); // delete all manifest files assert_eq!( @@ -799,6 +804,6 @@ mod tests { 3 ); - assert_eq!(log_store.get_total_manifest_size(), 0); + assert_eq!(log_store.total_manifest_size(), 0); } } From 7ccf3d2bc8bf98d9ddbb3a98c8b9be6e2f16cf9c Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Tue, 17 Oct 2023 11:54:08 +0800 Subject: [PATCH 09/11] chore: cr comment --- src/mito2/src/manifest/storage.rs | 36 +++++-------------------------- 1 file changed, 5 insertions(+), 31 deletions(-) diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index 73e507978906..7f18d65b922b 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -488,29 +488,13 @@ impl ManifestObjectStore { self.object_store.read(path).await.context(OpenDalSnafu) } - /// Get the size(Byte) of the delta file by delta version. - /// If the delta file does not exist, return None. - pub fn delta_file_size(&self, version: ManifestVersion) -> Option { - self.manifest_size_map - .get(&FileKey::Delta(version)) - .copied() - } - - /// Get the size(Byte) of the checkpoint file by checkpoint version. - /// If the checkpoint file does not exist, return None. - pub fn checkpoint_file_size(&self, version: ManifestVersion) -> Option { - self.manifest_size_map - .get(&FileKey::Checkpoint(version)) - .copied() - } - /// Compute the size(Byte) in manifest size map. - pub fn total_manifest_size(&self) -> u64 { + pub(crate) fn total_manifest_size(&self) -> u64 { self.manifest_size_map.values().sum() } /// Set the size of the manifest file by path. - pub fn set_manifest_size_by_path(&mut self, path: &str, size: u64) { + pub(crate) fn set_manifest_size_by_path(&mut self, path: &str, size: u64) { if let Some(name) = file_name(path) { if let Some(file_key) = FileKey::new(&name) { self.manifest_size_map.insert(file_key, size); @@ -519,15 +503,9 @@ impl ManifestObjectStore { } /// Set the size of the delta file by delta version. - pub fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) { + pub(crate) fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) { self.manifest_size_map.insert(FileKey::Delta(version), size); } - - /// Set the size of the checkpoint file by checkpoint version. - pub fn set_checkpoint_file_size(&mut self, version: ManifestVersion, size: u64) { - self.manifest_size_map - .insert(FileKey::Checkpoint(version), size); - } } #[derive(Serialize, Deserialize, Debug)] @@ -730,7 +708,7 @@ mod tests { #[tokio::test] async fn test_uncompressed_manifest_files_size() { let mut log_store = new_test_manifest_store(); - // write 5 manifest files + // write 5 manifest files with uncompressed(8B per file) log_store.compress_type = CompressionType::Uncompressed; for v in 0..5 { log_store @@ -738,15 +716,11 @@ mod tests { .await .unwrap(); } + // write 1 checkpoint file with uncompressed(23B) log_store .save_checkpoint(5, "checkpoint_uncompressed".as_bytes()) .await .unwrap(); - // single delta file size - assert_eq!(log_store.delta_file_size(0), Some(8)); - - // single checkpoint file size - assert_eq!(log_store.checkpoint_file_size(5), Some(23)); // manifest files size assert_eq!(log_store.total_manifest_size(), 63); From 6a8067b7dc6e12f852153b55dec043e8a83bea6d Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Wed, 18 Oct 2023 15:16:17 +0800 Subject: [PATCH 10/11] chore: cr comment --- src/mito2/src/manifest/storage.rs | 68 ++++++++++++------------------- 1 file changed, 27 insertions(+), 41 deletions(-) diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index 7f18d65b922b..e5f2679375d3 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -148,20 +148,6 @@ enum FileKey { Checkpoint(ManifestVersion), } -impl FileKey { - /// New a FileKey, if the file name is not a valid delta - /// or checkpoint file, return None. - pub fn new(file_name: &str) -> Option { - if is_delta_file(file_name) { - Some(FileKey::Delta(file_version(file_name))) - } else if is_checkpoint_file(file_name) { - Some(FileKey::Checkpoint(file_version(file_name))) - } else { - None - } - } -} - #[derive(Clone, Debug)] pub struct ManifestObjectStore { object_store: ObjectStore, @@ -289,7 +275,7 @@ impl ManifestObjectStore { } else { None }; - let paths: Vec<_> = entries + let del_entries: Vec<_> = entries .iter() .filter(|(_e, is_checkpoint, version)| { if let Some(max_version) = checkpoint_version { @@ -305,8 +291,11 @@ impl ManifestObjectStore { true } }) - .map(|e| e.0.path().to_string()) .collect(); + let paths = del_entries + .iter() + .map(|(e, _, _)| e.path().to_string()) + .collect::>(); let ret = paths.len(); debug!( @@ -318,20 +307,21 @@ impl ManifestObjectStore { paths, ); - // delete the manifest'size in paths - for path in &paths { - if let Some(name) = file_name(path) { - if let Some(file_key) = FileKey::new(&name) { - self.manifest_size_map.remove(&file_key); - } - } - } - self.object_store .remove(paths) .await .context(OpenDalSnafu)?; + // delete the manifest'size + for (_, is_checkpoint, version) in &del_entries { + if *is_checkpoint { + self.manifest_size_map + .remove(&FileKey::Checkpoint(*version)); + } else { + self.manifest_size_map.remove(&FileKey::Delta(*version)); + } + } + Ok(ret) } @@ -348,12 +338,11 @@ impl ManifestObjectStore { path: &path, })?; let delta_size = data.len(); - let _ = self - .object_store + self.object_store .write(&path, data) .await - .context(OpenDalSnafu); - self.set_manifest_size_by_path(&path, delta_size as u64); + .context(OpenDalSnafu)?; + self.set_delta_file_size(version, delta_size as u64); Ok(()) } @@ -373,7 +362,7 @@ impl ManifestObjectStore { .write(&path, data) .await .context(OpenDalSnafu)?; - self.set_manifest_size_by_path(&path, checkpoint_size as u64); + self.set_checkpoint_file_size(version, checkpoint_size as u64); // Because last checkpoint file only contain size and version, which is tiny, so we don't compress it. let last_checkpoint_path = self.last_checkpoint_path(); @@ -417,7 +406,7 @@ impl ManifestObjectStore { }, )?; // set the checkpoint size - self.set_manifest_size_by_path(&path, checkpoint_size as u64); + self.set_checkpoint_file_size(version, checkpoint_size as u64); Ok(Some(decompress_data)) } Err(e) => { @@ -442,7 +431,7 @@ impl ManifestObjectStore { compress_type: FALL_BACK_COMPRESS_TYPE, path: path.clone(), })?; - self.set_manifest_size_by_path(&path, checkpoint_size as u64); + self.set_checkpoint_file_size(version, checkpoint_size as u64); Ok(Some(decompress_data)) } Err(e) if e.kind() == ErrorKind::NotFound => Ok(None), @@ -493,19 +482,16 @@ impl ManifestObjectStore { self.manifest_size_map.values().sum() } - /// Set the size of the manifest file by path. - pub(crate) fn set_manifest_size_by_path(&mut self, path: &str, size: u64) { - if let Some(name) = file_name(path) { - if let Some(file_key) = FileKey::new(&name) { - self.manifest_size_map.insert(file_key, size); - } - } - } - /// Set the size of the delta file by delta version. pub(crate) fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) { self.manifest_size_map.insert(FileKey::Delta(version), size); } + + /// Set the size of the checkpoint file by checkpoint version. + pub(crate) fn set_checkpoint_file_size(&mut self, version: ManifestVersion, size: u64) { + self.manifest_size_map + .insert(FileKey::Checkpoint(version), size); + } } #[derive(Serialize, Deserialize, Debug)] From 8340484ab33028b33408e1b6e0a8a1e36f90ee0b Mon Sep 17 00:00:00 2001 From: QuenKar <47681251+QuenKar@users.noreply.github.com> Date: Thu, 19 Oct 2023 11:12:09 +0800 Subject: [PATCH 11/11] chore: cr comment --- src/mito2/src/manifest/storage.rs | 28 ++++------------------------ 1 file changed, 4 insertions(+), 24 deletions(-) diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index e5f2679375d3..edd63ac52162 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -80,16 +80,6 @@ pub fn file_version(path: &str) -> ManifestVersion { s.parse().unwrap_or_else(|_| panic!("Invalid file: {path}")) } -/// Return the file name from path. -/// Just for (`.json`) and (`.checkpoint`) file, other file will return None. -pub fn file_name(path: &str) -> Option { - let name = path.rsplit('/').next().unwrap_or("").to_string(); - if !is_checkpoint_file(&name) && !is_delta_file(&name) { - return None; - } - Some(name) -} - /// Return's the file compress algorithm by file extension. /// /// for example file @@ -312,7 +302,7 @@ impl ManifestObjectStore { .await .context(OpenDalSnafu)?; - // delete the manifest'size + // delete manifest sizes for (_, is_checkpoint, version) in &del_entries { if *is_checkpoint { self.manifest_size_map @@ -402,7 +392,7 @@ impl ManifestObjectStore { let decompress_data = self.compress_type.decode(checkpoint).await.context( DecompressObjectSnafu { compress_type: self.compress_type, - path: path.clone(), + path, }, )?; // set the checkpoint size @@ -429,7 +419,7 @@ impl ManifestObjectStore { .await .context(DecompressObjectSnafu { compress_type: FALL_BACK_COMPRESS_TYPE, - path: path.clone(), + path, })?; self.set_checkpoint_file_size(version, checkpoint_size as u64); Ok(Some(decompress_data)) @@ -670,17 +660,7 @@ mod tests { } #[tokio::test] - async fn test_file_name() { - let name = file_name( - "data/greptime/public/1054/1054_0000000000/manifest/00000000000000000007.json", - ); - assert_eq!(name.unwrap(), "00000000000000000007.json"); - - let name = file_name( - "/data/greptime/public/1054/1054_0000000000/manifest/00000000000000000007.checkpoint", - ); - assert_eq!(name.unwrap(), "00000000000000000007.checkpoint"); - + async fn test_file_version() { let version = file_version("00000000000000000007.checkpoint"); assert_eq!(version, 7);