From db6ceda5f0855ce7fbba16812abff0a2475d919c Mon Sep 17 00:00:00 2001 From: Yingwen Date: Wed, 27 Sep 2023 10:58:17 +0800 Subject: [PATCH] fix(mito): fix region drop task runs multiple times but never clean the dir (#2504) fix: fix region drop task runs multiple times but never clean the directory --- src/mito2/src/engine.rs | 2 +- src/mito2/src/engine/drop_test.rs | 44 +++++++++++++-------- src/mito2/src/engine/flush_test.rs | 10 +++-- src/mito2/src/engine/listener.rs | 56 ++++++++++++++++++++++++++- src/mito2/src/engine/truncate_test.rs | 2 +- src/mito2/src/region/version.rs | 9 ++++- src/mito2/src/sst/version.rs | 4 +- src/mito2/src/test_util.rs | 2 +- src/mito2/src/worker.rs | 29 +++++++++++++- src/mito2/src/worker/handle_drop.rs | 53 ++++++++++++++++++------- 10 files changed, 169 insertions(+), 42 deletions(-) diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 987ecbf7e593..437e8e3da30d 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -230,7 +230,7 @@ impl MitoEngine { mut config: MitoConfig, log_store: Arc, object_store: ObjectStore, - write_buffer_manager: crate::flush::WriteBufferManagerRef, + write_buffer_manager: Option, listener: Option, ) -> MitoEngine { config.sanitize(); diff --git a/src/mito2/src/engine/drop_test.rs b/src/mito2/src/engine/drop_test.rs index d42c37d05b7e..35c86c184371 100644 --- a/src/mito2/src/engine/drop_test.rs +++ b/src/mito2/src/engine/drop_test.rs @@ -12,19 +12,31 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; +use std::time::Duration; + +use api::v1::Rows; use object_store::util::join_path; use store_api::region_engine::RegionEngine; use store_api::region_request::{RegionDropRequest, RegionRequest}; use store_api::storage::RegionId; use crate::config::MitoConfig; -use crate::test_util::{CreateRequestBuilder, TestEnv}; +use crate::engine::listener::DropListener; +use crate::test_util::{ + build_rows_for_key, flush_region, put_rows, rows_schema, CreateRequestBuilder, TestEnv, +}; use crate::worker::DROPPING_MARKER_FILE; #[tokio::test] async fn test_engine_drop_region() { + common_telemetry::init_default_ut_logging(); + let mut env = TestEnv::with_prefix("drop"); - let engine = env.create_engine(MitoConfig::default()).await; + let listener = Arc::new(DropListener::new(Duration::from_millis(100))); + let engine = env + .create_engine_with(MitoConfig::default(), None, Some(listener.clone())) + .await; let region_id = RegionId::new(1, 1); // It's okay to drop a region doesn't exist. @@ -34,13 +46,14 @@ async fn test_engine_drop_region() { .unwrap_err(); let request = CreateRequestBuilder::new().build(); + let column_schemas = rows_schema(&request); engine .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); let region = engine.get_region(region_id).unwrap(); - let region_dir = region.access_layer.region_dir().to_owned(); + let region_dir = region.access_layer.region_dir().to_string(); // no dropping marker file assert!(!env .get_object_store() @@ -49,12 +62,12 @@ async fn test_engine_drop_region() { .await .unwrap()); - // create a parquet file - env.get_object_store() - .unwrap() - .write(&join_path(®ion_dir, "blabla.parquet"), vec![]) - .await - .unwrap(); + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("a", 0, 2, 0), + }; + put_rows(&engine, region_id, rows).await; + flush_region(&engine, region_id).await; // drop the created region. engine @@ -62,11 +75,10 @@ async fn test_engine_drop_region() { .await .unwrap(); assert!(!engine.is_region_exists(region_id)); - // the drop marker is not removed yet - assert!(env - .get_object_store() - .unwrap() - .is_exist(&join_path(®ion_dir, DROPPING_MARKER_FILE)) - .await - .unwrap()); + + // Wait for drop task. + listener.wait().await; + + let object_store = env.get_object_store().unwrap(); + assert!(!object_store.is_exist(®ion_dir).await.unwrap()); } diff --git a/src/mito2/src/engine/flush_test.rs b/src/mito2/src/engine/flush_test.rs index 636ce7df2ddd..84eb8b0eec44 100644 --- a/src/mito2/src/engine/flush_test.rs +++ b/src/mito2/src/engine/flush_test.rs @@ -76,7 +76,7 @@ async fn test_flush_engine() { let engine = env .create_engine_with( MitoConfig::default(), - write_buffer_manager.clone(), + Some(write_buffer_manager.clone()), Some(listener.clone()), ) .await; @@ -135,7 +135,7 @@ async fn test_write_stall() { let engine = env .create_engine_with( MitoConfig::default(), - write_buffer_manager.clone(), + Some(write_buffer_manager.clone()), Some(listener.clone()), ) .await; @@ -197,7 +197,11 @@ async fn test_flush_empty() { let mut env = TestEnv::new(); let write_buffer_manager = Arc::new(MockWriteBufferManager::default()); let engine = env - .create_engine_with(MitoConfig::default(), write_buffer_manager.clone(), None) + .create_engine_with( + MitoConfig::default(), + Some(write_buffer_manager.clone()), + None, + ) .await; let region_id = RegionId::new(1, 1); diff --git a/src/mito2/src/engine/listener.rs b/src/mito2/src/engine/listener.rs index e591d2b5a576..f0b5def366e0 100644 --- a/src/mito2/src/engine/listener.rs +++ b/src/mito2/src/engine/listener.rs @@ -15,6 +15,7 @@ //! Engine event listener for tests. use std::sync::Arc; +use std::time::Duration; use async_trait::async_trait; use common_telemetry::info; @@ -32,6 +33,19 @@ pub trait EventListener: Send + Sync { /// Notifies the listener that the region starts to do flush. async fn on_flush_begin(&self, region_id: RegionId); + + /// Notifies the listener that the later drop task starts running. + /// Returns the gc interval if we want to override the default one. + fn on_later_drop_begin(&self, region_id: RegionId) -> Option { + let _ = region_id; + None + } + + /// Notifies the listener that the later drop task of the region is finished. + fn on_later_drop_end(&self, region_id: RegionId, removed: bool) { + let _ = region_id; + let _ = removed; + } } pub type EventListenerRef = Arc; @@ -102,7 +116,7 @@ impl EventListener for StallListener { /// Listener to watch begin flush events. /// -/// Crate a background thread to execute flush region, and the main thread calls `wait_truncate()` +/// Creates a background thread to execute flush region, and the main thread calls `wait_truncate()` /// to block and wait for `on_flush_region()`. /// When the background thread calls `on_flush_begin()`, the main thread is notified to truncate /// region, and background thread thread blocks and waits for `notify_flush()` to continue flushing. @@ -150,3 +164,43 @@ impl EventListener for FlushTruncateListener { self.notify_flush.notified().await; } } + +/// Listener on dropping. +pub struct DropListener { + gc_duration: Duration, + notify: Notify, +} + +impl DropListener { + /// Creates a new listener with specific `gc_duration`. + pub fn new(gc_duration: Duration) -> Self { + DropListener { + gc_duration, + notify: Notify::new(), + } + } + + /// Waits until later drop task is done. + pub async fn wait(&self) { + self.notify.notified().await; + } +} + +#[async_trait] +impl EventListener for DropListener { + fn on_flush_success(&self, _region_id: RegionId) {} + + fn on_write_stall(&self) {} + + async fn on_flush_begin(&self, _region_id: RegionId) {} + + fn on_later_drop_begin(&self, _region_id: RegionId) -> Option { + Some(self.gc_duration) + } + + fn on_later_drop_end(&self, _region_id: RegionId, removed: bool) { + // Asserts result. + assert!(removed); + self.notify.notify_one(); + } +} diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs index c39f3a0e374b..03386369bd2d 100644 --- a/src/mito2/src/engine/truncate_test.rs +++ b/src/mito2/src/engine/truncate_test.rs @@ -270,7 +270,7 @@ async fn test_engine_truncate_during_flush() { let engine = env .create_engine_with( MitoConfig::default(), - write_buffer_manager.clone(), + Some(write_buffer_manager), Some(listener.clone()), ) .await; diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index c7d84fd913df..471a85c1e38f 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -115,10 +115,17 @@ impl VersionControl { } /// Mark all opened files as deleted and set the delete marker in [VersionControlData] - pub(crate) fn mark_dropped(&self) { + pub(crate) fn mark_dropped(&self, memtable_builder: &MemtableBuilderRef) { + let version = self.current().version; + let new_mutable = memtable_builder.build(&version.metadata); + let mut data = self.data.write().unwrap(); data.is_dropped = true; data.version.ssts.mark_all_deleted(); + // Reset version so we can release the reference to memtables and SSTs. + let new_version = + Arc::new(VersionBuilder::new(version.metadata.clone(), new_mutable).build()); + data.version = new_version; } /// Alter schema of the region. diff --git a/src/mito2/src/sst/version.rs b/src/mito2/src/sst/version.rs index b066af7ba3b6..1f56c8b940d8 100644 --- a/src/mito2/src/sst/version.rs +++ b/src/mito2/src/sst/version.rs @@ -75,9 +75,9 @@ impl SstVersion { } } - /// Mark all SSTs in this version as deleted. + /// Marks all SSTs in this version as deleted. pub(crate) fn mark_all_deleted(&self) { - for level_meta in self.levels.iter() { + for level_meta in &self.levels { for file_handle in level_meta.files.values() { file_handle.mark_deleted(); } diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 2ad545825de2..25994fa0e601 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -120,7 +120,7 @@ impl TestEnv { pub async fn create_engine_with( &mut self, config: MitoConfig, - manager: WriteBufferManagerRef, + manager: Option, listener: Option, ) -> MitoEngine { let (log_store, object_store) = self.create_log_and_object_store().await; diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 4b119c69959f..6b3ea02c1896 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -28,6 +28,7 @@ use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use std::time::Duration; use common_runtime::JoinHandle; use common_telemetry::{error, info, warn}; @@ -203,11 +204,16 @@ impl WorkerGroup { config: MitoConfig, log_store: Arc, object_store: ObjectStore, - write_buffer_manager: WriteBufferManagerRef, + write_buffer_manager: Option, listener: Option, ) -> WorkerGroup { assert!(config.num_workers.is_power_of_two()); let config = Arc::new(config); + let write_buffer_manager = write_buffer_manager.unwrap_or_else(|| { + Arc::new(WriteBufferManagerImpl::new( + config.global_write_buffer_size.as_bytes() as usize, + )) + }); let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs)); let cache_manager = Arc::new(CacheManager::new(config.sst_meta_cache_size.as_bytes())); @@ -608,6 +614,27 @@ impl WorkerListener { // Avoid compiler warning. let _ = region_id; } + + pub(crate) fn on_later_drop_begin(&self, region_id: RegionId) -> Option { + #[cfg(test)] + if let Some(listener) = &self.listener { + return listener.on_later_drop_begin(region_id); + } + // Avoid compiler warning. + let _ = region_id; + None + } + + /// On later drop task is finished. + pub(crate) fn on_later_drop_end(&self, region_id: RegionId, removed: bool) { + #[cfg(test)] + if let Some(listener) = &self.listener { + listener.on_later_drop_end(region_id, removed); + } + // Avoid compiler warning. + let _ = region_id; + let _ = removed; + } } #[cfg(test)] diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs index f7aa5d15dc6a..0b0431180fd0 100644 --- a/src/mito2/src/worker/handle_drop.rs +++ b/src/mito2/src/worker/handle_drop.rs @@ -56,7 +56,7 @@ impl RegionWorkerLoop { self.compaction_scheduler.on_region_dropped(region_id); // mark region version as dropped - region.version_control.mark_dropped(); + region.version_control.mark_dropped(&self.memtable_builder); info!( "Region {} is dropped logically, but some files are not deleted yet", region_id @@ -66,8 +66,20 @@ impl RegionWorkerLoop { let region_dir = region.access_layer.region_dir().to_owned(); let object_store = self.object_store.clone(); let dropping_regions = self.dropping_regions.clone(); + let listener = self.listener.clone(); common_runtime::spawn_bg(async move { - later_drop_task(region_id, region_dir, object_store, dropping_regions).await; + let gc_duration = listener + .on_later_drop_begin(region_id) + .unwrap_or(Duration::from_secs(GC_TASK_INTERVAL_SEC)); + let removed = later_drop_task( + region_id, + region_dir, + object_store, + dropping_regions, + gc_duration, + ) + .await; + listener.on_later_drop_end(region_id, removed); }); Ok(Output::AffectedRows(0)) @@ -75,7 +87,7 @@ impl RegionWorkerLoop { } /// Background GC task to remove the entire region path once it find there is no -/// parquet file left. +/// parquet file left. Returns whether the path is removed. /// /// This task will keep running until finished. Any resource captured by it will /// not be released before then. Be sure to only pass weak reference if something @@ -85,18 +97,24 @@ async fn later_drop_task( region_path: String, object_store: ObjectStore, dropping_regions: RegionMapRef, -) { + gc_duration: Duration, +) -> bool { for _ in 0..MAX_RETRY_TIMES { - sleep(Duration::from_secs(GC_TASK_INTERVAL_SEC)).await; + sleep(gc_duration).await; let result = remove_region_dir_once(®ion_path, &object_store).await; - if let Err(err) = result { - warn!( - "Error occurs during trying to GC region dir {}: {}", - region_path, err - ); - } else { - dropping_regions.remove_region(region_id); - info!("Region {} is dropped", region_path); + match result { + Err(err) => { + warn!( + "Error occurs during trying to GC region dir {}: {}", + region_path, err + ); + } + Ok(true) => { + dropping_regions.remove_region(region_id); + info!("Region {} is dropped", region_path); + return true; + } + Ok(false) => (), } } @@ -104,13 +122,16 @@ async fn later_drop_task( "Failed to GC region dir {} after {} retries, giving up", region_path, MAX_RETRY_TIMES ); + + false } // TODO(ruihang): place the marker in a separate dir +/// Removes region dir if there is no parquet files, returns whether the directory is removed. pub(crate) async fn remove_region_dir_once( region_path: &str, object_store: &ObjectStore, -) -> Result<()> { +) -> Result { // list all files under the given region path to check if there are un-deleted parquet files let mut has_parquet_file = false; // record all paths that neither ends with .parquet nor the marker file @@ -143,6 +164,8 @@ pub(crate) async fn remove_region_dir_once( .remove_all(region_path) .await .context(OpenDalSnafu)?; + Ok(true) + } else { + Ok(false) } - Ok(()) }