Skip to content

Commit

Permalink
fix(mito): fix region drop task runs multiple times but never clean t…
Browse files Browse the repository at this point in the history
…he dir (#2504)

fix: fix region drop task runs multiple times but never clean the directory
  • Loading branch information
evenyag authored Sep 27, 2023
1 parent e352fb4 commit db6ceda
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 42 deletions.
2 changes: 1 addition & 1 deletion src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ impl MitoEngine {
mut config: MitoConfig,
log_store: Arc<S>,
object_store: ObjectStore,
write_buffer_manager: crate::flush::WriteBufferManagerRef,
write_buffer_manager: Option<crate::flush::WriteBufferManagerRef>,
listener: Option<crate::engine::listener::EventListenerRef>,
) -> MitoEngine {
config.sanitize();
Expand Down
44 changes: 28 additions & 16 deletions src/mito2/src/engine/drop_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,31 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::time::Duration;

use api::v1::Rows;
use object_store::util::join_path;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionDropRequest, RegionRequest};
use store_api::storage::RegionId;

use crate::config::MitoConfig;
use crate::test_util::{CreateRequestBuilder, TestEnv};
use crate::engine::listener::DropListener;
use crate::test_util::{
build_rows_for_key, flush_region, put_rows, rows_schema, CreateRequestBuilder, TestEnv,
};
use crate::worker::DROPPING_MARKER_FILE;

#[tokio::test]
async fn test_engine_drop_region() {
common_telemetry::init_default_ut_logging();

let mut env = TestEnv::with_prefix("drop");
let engine = env.create_engine(MitoConfig::default()).await;
let listener = Arc::new(DropListener::new(Duration::from_millis(100)));
let engine = env
.create_engine_with(MitoConfig::default(), None, Some(listener.clone()))
.await;

let region_id = RegionId::new(1, 1);
// It's okay to drop a region doesn't exist.
Expand All @@ -34,13 +46,14 @@ async fn test_engine_drop_region() {
.unwrap_err();

let request = CreateRequestBuilder::new().build();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

let region = engine.get_region(region_id).unwrap();
let region_dir = region.access_layer.region_dir().to_owned();
let region_dir = region.access_layer.region_dir().to_string();
// no dropping marker file
assert!(!env
.get_object_store()
Expand All @@ -49,24 +62,23 @@ async fn test_engine_drop_region() {
.await
.unwrap());

// create a parquet file
env.get_object_store()
.unwrap()
.write(&join_path(&region_dir, "blabla.parquet"), vec![])
.await
.unwrap();
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("a", 0, 2, 0),
};
put_rows(&engine, region_id, rows).await;
flush_region(&engine, region_id).await;

// drop the created region.
engine
.handle_request(region_id, RegionRequest::Drop(RegionDropRequest {}))
.await
.unwrap();
assert!(!engine.is_region_exists(region_id));
// the drop marker is not removed yet
assert!(env
.get_object_store()
.unwrap()
.is_exist(&join_path(&region_dir, DROPPING_MARKER_FILE))
.await
.unwrap());

// Wait for drop task.
listener.wait().await;

let object_store = env.get_object_store().unwrap();
assert!(!object_store.is_exist(&region_dir).await.unwrap());
}
10 changes: 7 additions & 3 deletions src/mito2/src/engine/flush_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ async fn test_flush_engine() {
let engine = env
.create_engine_with(
MitoConfig::default(),
write_buffer_manager.clone(),
Some(write_buffer_manager.clone()),
Some(listener.clone()),
)
.await;
Expand Down Expand Up @@ -135,7 +135,7 @@ async fn test_write_stall() {
let engine = env
.create_engine_with(
MitoConfig::default(),
write_buffer_manager.clone(),
Some(write_buffer_manager.clone()),
Some(listener.clone()),
)
.await;
Expand Down Expand Up @@ -197,7 +197,11 @@ async fn test_flush_empty() {
let mut env = TestEnv::new();
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
let engine = env
.create_engine_with(MitoConfig::default(), write_buffer_manager.clone(), None)
.create_engine_with(
MitoConfig::default(),
Some(write_buffer_manager.clone()),
None,
)
.await;

let region_id = RegionId::new(1, 1);
Expand Down
56 changes: 55 additions & 1 deletion src/mito2/src/engine/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! Engine event listener for tests.
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use common_telemetry::info;
Expand All @@ -32,6 +33,19 @@ pub trait EventListener: Send + Sync {

/// Notifies the listener that the region starts to do flush.
async fn on_flush_begin(&self, region_id: RegionId);

/// Notifies the listener that the later drop task starts running.
/// Returns the gc interval if we want to override the default one.
fn on_later_drop_begin(&self, region_id: RegionId) -> Option<Duration> {
let _ = region_id;
None
}

/// Notifies the listener that the later drop task of the region is finished.
fn on_later_drop_end(&self, region_id: RegionId, removed: bool) {
let _ = region_id;
let _ = removed;
}
}

pub type EventListenerRef = Arc<dyn EventListener>;
Expand Down Expand Up @@ -102,7 +116,7 @@ impl EventListener for StallListener {

/// Listener to watch begin flush events.
///
/// Crate a background thread to execute flush region, and the main thread calls `wait_truncate()`
/// Creates a background thread to execute flush region, and the main thread calls `wait_truncate()`
/// to block and wait for `on_flush_region()`.
/// When the background thread calls `on_flush_begin()`, the main thread is notified to truncate
/// region, and background thread thread blocks and waits for `notify_flush()` to continue flushing.
Expand Down Expand Up @@ -150,3 +164,43 @@ impl EventListener for FlushTruncateListener {
self.notify_flush.notified().await;
}
}

/// Listener on dropping.
pub struct DropListener {
gc_duration: Duration,
notify: Notify,
}

impl DropListener {
/// Creates a new listener with specific `gc_duration`.
pub fn new(gc_duration: Duration) -> Self {
DropListener {
gc_duration,
notify: Notify::new(),
}
}

/// Waits until later drop task is done.
pub async fn wait(&self) {
self.notify.notified().await;
}
}

#[async_trait]
impl EventListener for DropListener {
fn on_flush_success(&self, _region_id: RegionId) {}

fn on_write_stall(&self) {}

async fn on_flush_begin(&self, _region_id: RegionId) {}

fn on_later_drop_begin(&self, _region_id: RegionId) -> Option<Duration> {
Some(self.gc_duration)
}

fn on_later_drop_end(&self, _region_id: RegionId, removed: bool) {
// Asserts result.
assert!(removed);
self.notify.notify_one();
}
}
2 changes: 1 addition & 1 deletion src/mito2/src/engine/truncate_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ async fn test_engine_truncate_during_flush() {
let engine = env
.create_engine_with(
MitoConfig::default(),
write_buffer_manager.clone(),
Some(write_buffer_manager),
Some(listener.clone()),
)
.await;
Expand Down
9 changes: 8 additions & 1 deletion src/mito2/src/region/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,17 @@ impl VersionControl {
}

/// Mark all opened files as deleted and set the delete marker in [VersionControlData]
pub(crate) fn mark_dropped(&self) {
pub(crate) fn mark_dropped(&self, memtable_builder: &MemtableBuilderRef) {
let version = self.current().version;
let new_mutable = memtable_builder.build(&version.metadata);

let mut data = self.data.write().unwrap();
data.is_dropped = true;
data.version.ssts.mark_all_deleted();
// Reset version so we can release the reference to memtables and SSTs.
let new_version =
Arc::new(VersionBuilder::new(version.metadata.clone(), new_mutable).build());
data.version = new_version;
}

/// Alter schema of the region.
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/sst/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ impl SstVersion {
}
}

/// Mark all SSTs in this version as deleted.
/// Marks all SSTs in this version as deleted.
pub(crate) fn mark_all_deleted(&self) {
for level_meta in self.levels.iter() {
for level_meta in &self.levels {
for file_handle in level_meta.files.values() {
file_handle.mark_deleted();
}
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl TestEnv {
pub async fn create_engine_with(
&mut self,
config: MitoConfig,
manager: WriteBufferManagerRef,
manager: Option<WriteBufferManagerRef>,
listener: Option<EventListenerRef>,
) -> MitoEngine {
let (log_store, object_store) = self.create_log_and_object_store().await;
Expand Down
29 changes: 28 additions & 1 deletion src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;

use common_runtime::JoinHandle;
use common_telemetry::{error, info, warn};
Expand Down Expand Up @@ -203,11 +204,16 @@ impl WorkerGroup {
config: MitoConfig,
log_store: Arc<S>,
object_store: ObjectStore,
write_buffer_manager: WriteBufferManagerRef,
write_buffer_manager: Option<WriteBufferManagerRef>,
listener: Option<crate::engine::listener::EventListenerRef>,
) -> WorkerGroup {
assert!(config.num_workers.is_power_of_two());
let config = Arc::new(config);
let write_buffer_manager = write_buffer_manager.unwrap_or_else(|| {
Arc::new(WriteBufferManagerImpl::new(
config.global_write_buffer_size.as_bytes() as usize,
))
});
let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let cache_manager = Arc::new(CacheManager::new(config.sst_meta_cache_size.as_bytes()));

Expand Down Expand Up @@ -608,6 +614,27 @@ impl WorkerListener {
// Avoid compiler warning.
let _ = region_id;
}

pub(crate) fn on_later_drop_begin(&self, region_id: RegionId) -> Option<Duration> {
#[cfg(test)]
if let Some(listener) = &self.listener {
return listener.on_later_drop_begin(region_id);
}
// Avoid compiler warning.
let _ = region_id;
None
}

/// On later drop task is finished.
pub(crate) fn on_later_drop_end(&self, region_id: RegionId, removed: bool) {
#[cfg(test)]
if let Some(listener) = &self.listener {
listener.on_later_drop_end(region_id, removed);
}
// Avoid compiler warning.
let _ = region_id;
let _ = removed;
}
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit db6ceda

Please sign in to comment.